PAI Python SDK提供了更易用的HighLevel API,支援您在PAI完成模型的訓練和部署。本文介紹如何使用PAI Python SDK訓練和部署PyTorch模型。
背景資訊
PyTorch是一個非常流行的深度學習架構,提供了較高的靈活性和優越的效能,能夠與Python豐富的生態無縫結合,被廣泛應用於映像分類、語音辨識、自然語言處理、推薦和AIGC等領域。本樣本為您介紹如何使用PAI Python SDK在PAI完成一個PyTorch模型的訓練,然後使用訓練獲得的模型部署推理服務。主要流程包括:
安裝PAI Python SDK,並配置存取金鑰AccessKey、使用的工作空間和OSS Bucket。
下載MNIST資料集,並上傳到OSS上供訓練作業使用。
採用PyTorch官方樣本庫中的MNIST訓練指令碼作為基礎,並進行適當修改後作為訓練指令碼。
使用PAI Python SDK提供的Estimator API,建立一個訓練作業,提交到PAI上執行。
將以上訓練作業輸出的模型,分別使用Processor和鏡像部署的方式部署到EAS,建立為線上推理服務。
前提條件
已擷取阿里雲帳號的鑒權AccessKey ID和AccessKey Secret,詳情請參見:擷取AccessKey。
已建立工作空間,詳情請參見:建立及管理工作空間。
已建立OSS Bucket,詳情請參見:控制台建立儲存空間。
已準備Python環境,要求3.7及其以上版本。
安裝和配置SDK
首先需要在命令列終端中安裝PAI Python SDK以運行本樣本。
python -m pip install "alipai>=0.4.0"如果在回顯資訊中出現ModuleNotFoundError類型的錯誤,請嘗試執行pip install --upgrade pip命令來解決該問題。
在SDK安裝完成後,通過在命令列終端中執行以下命令進行配置。
python -m pai.toolkit.config詳細的安裝和配置過程,請參見安裝和配置。
準備訓練資料
在本樣本中,將使用MNIST資料集訓練一個圖片分類模型。當您提交訓練作業至PAI平台時,需要準備資料集,並上傳到OSS Bucket中。
下載MNIST資料集。
使用以下Shell指令碼,將MNIST資料集下載到本地目錄
data。#!/bin/sh set -e url_prefix="https://ossci-datasets.s3.amazonaws.com/mnist/" # 如果上述地址下載速度較慢,可以使用以下地址。 # url_prefix="http://yann.lecun.com/exdb/mnist/" mkdir -p data/MNIST/raw/ wget -nv ${url_prefix}train-images-idx3-ubyte.gz -P data/MNIST/raw/ wget -nv ${url_prefix}train-labels-idx1-ubyte.gz -P data/MNIST/raw/ wget -nv ${url_prefix}t10k-images-idx3-ubyte.gz -P data/MNIST/raw/ wget -nv ${url_prefix}t10k-labels-idx1-ubyte.gz -P data/MNIST/raw/將資料集上傳到OSS Bucket中。
您可以使用OSS提供的命令列工具
ossutil上傳相應的檔案,關於ossutil的安裝和使用,請參見命令列工具ossutil 1.0。您也可以使用PAI Python SDK提供的便利方法,將本地訓練資料上傳到OSS Bucket的/mnist/data/路徑下。通過
ossutil上傳檔案:ossutil cp -rf ./data oss://<YourOssBucket>/mnist/data/使用PAI Python SDK上傳檔案:
from pai.common.oss_utils import upload from pai.session import get_default_session sess = get_default_session() data_uri = upload("./data/", oss_path="mnist/data/", bucket=sess.oss_bucket) print(data_uri)
準備訓練指令碼
在提交訓練作業之前,需要通過PyTorch編寫訓練指令碼。在本樣本中,以PyTorch官方提供的MNIST樣本為基礎,並適當修改了資料載入和模型儲存的邏輯後,作為訓練指令碼。
使用環境變數獲得輸入資料路徑
通過
estimator.fit(inputs={"train_data":data_uri})傳遞上述的OSS資料URI,相應的資料會被掛載到訓練容器中。訓練指令碼可以通過讀取本地檔案的方式,讀取到掛載的資料。對於訓練作業,
estimator.fit方法的inputs是字典,對應的每一個輸入資料都是一個 Channel,Key是Channel名,Value是資料存放區路徑。訓練作業指令碼可以通過PAI_INPUT_{ChannelNameUpperCase}環境變數擷取到輸入資料掛載到工作容器內的資料路徑。對資料載入部分的代碼進行了如下修改:
- dataset1 = datasets.MNIST("../data", train=True, download=True, transform=transform) - dataset2 = datasets.MNIST("../data", train=False, transform=transform) + # 通過環境變數獲得輸入資料路徑 + data_path = os.environ.get("PAI_INPUT_TRAIN_DATA", "../data") + dataset1 = datasets.MNIST(data_path, train=True, download=True, transform=transform) + dataset2 = datasets.MNIST(data_path, train=False, transform=transform)使用環境變數擷取模型的儲存路徑:
您需要將模型儲存在訓練環境的指定路徑中,此路徑由環境變數PAI_OUTPUT_MODEL指定(預設路徑為/ml/output/model)。位於該路徑下的資料和模型將自動儲存到您的OSS Bucket中。
對模型儲存部分的代碼進行了如下修改:
- if args.save_model: - torch.save(model.state_dict(), "mnist_cnn.pt") + # 儲存模型 + save_model(model) + + def save_model(model): + """將模型轉為TorchScript,儲存到指定路徑。""" + output_model_path = os.environ.get("PAI_OUTPUT_MODEL") + os.makedirs(output_model_path, exist_ok=True) + + m = torch.jit.script(model) + m.save(os.path.join(output_model_path, "mnist_cnn.pt"))
PAI提供的預置PyTorch Processor在建立服務時,要求輸入的模型是TorchScript格式。本樣本將模型匯出為TorchScript格式。完整的訓練作業指令碼如下:
# source: https://github.com/pytorch/examples/blob/main/mnist/main.py
from __future__ import print_function
import argparse
import os
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.optim.lr_scheduler import StepLR
from torchvision import datasets, transforms
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(1, 32, 3, 1)
self.conv2 = nn.Conv2d(32, 64, 3, 1)
self.dropout1 = nn.Dropout(0.25)
self.dropout2 = nn.Dropout(0.5)
self.fc1 = nn.Linear(9216, 128)
self.fc2 = nn.Linear(128, 10)
def forward(self, x):
x = self.conv1(x)
x = F.relu(x)
x = self.conv2(x)
x = F.relu(x)
x = F.max_pool2d(x, 2)
x = self.dropout1(x)
x = torch.flatten(x, 1)
x = self.fc1(x)
x = F.relu(x)
x = self.dropout2(x)
x = self.fc2(x)
output = F.log_softmax(x, dim=1)
return output
def train(args, model, device, train_loader, optimizer, epoch):
model.train()
for batch_idx, (data, target) in enumerate(train_loader):
data, target = data.to(device), target.to(device)
optimizer.zero_grad()
output = model(data)
loss = F.nll_loss(output, target)
loss.backward()
optimizer.step()
if batch_idx % args.log_interval == 0:
print(
"Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}".format(
epoch,
batch_idx * len(data),
len(train_loader.dataset),
100.0 * batch_idx / len(train_loader),
loss.item(),
)
)
if args.dry_run:
break
def test(model, device, test_loader):
model.eval()
test_loss = 0
correct = 0
with torch.no_grad():
for data, target in test_loader:
data, target = data.to(device), target.to(device)
output = model(data)
test_loss += F.nll_loss(
output, target, reduction="sum"
).item() # sum up batch loss
pred = output.argmax(
dim=1, keepdim=True
) # get the index of the max log-probability
correct += pred.eq(target.view_as(pred)).sum().item()
test_loss /= len(test_loader.dataset)
print(
"\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n".format(
test_loss,
correct,
len(test_loader.dataset),
100.0 * correct / len(test_loader.dataset),
)
)
def main():
# Training settings
parser = argparse.ArgumentParser(description="PyTorch MNIST Example")
parser.add_argument(
"--batch-size",
type=int,
default=64,
metavar="N",
help="input batch size for training (default: 64)",
)
parser.add_argument(
"--test-batch-size",
type=int,
default=1000,
metavar="N",
help="input batch size for testing (default: 1000)",
)
parser.add_argument(
"--epochs",
type=int,
default=14,
metavar="N",
help="number of epochs to train (default: 14)",
)
parser.add_argument(
"--lr",
type=float,
default=1.0,
metavar="LR",
help="learning rate (default: 1.0)",
)
parser.add_argument(
"--gamma",
type=float,
default=0.7,
metavar="M",
help="Learning rate step gamma (default: 0.7)",
)
parser.add_argument(
"--no-cuda", action="store_true", default=False, help="disables CUDA training"
)
parser.add_argument(
"--dry-run",
action="store_true",
default=False,
help="quickly check a single pass",
)
parser.add_argument(
"--seed", type=int, default=1, metavar="S", help="random seed (default: 1)"
)
parser.add_argument(
"--log-interval",
type=int,
default=10,
metavar="N",
help="how many batches to wait before logging training status",
)
parser.add_argument(
"--save-model",
action="store_true",
default=False,
help="For Saving the current Model",
)
args = parser.parse_args()
use_cuda = not args.no_cuda and torch.cuda.is_available()
torch.manual_seed(args.seed)
device = torch.device("cuda" if use_cuda else "cpu")
train_kwargs = {"batch_size": args.batch_size}
test_kwargs = {"batch_size": args.test_batch_size}
if use_cuda:
cuda_kwargs = {"num_workers": 1, "pin_memory": True, "shuffle": True}
train_kwargs.update(cuda_kwargs)
test_kwargs.update(cuda_kwargs)
transform = transforms.Compose(
[transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))]
)
data_path = os.environ.get("PAI_INPUT_DATA")
dataset1 = datasets.MNIST(data_path, train=True, download=True, transform=transform)
dataset2 = datasets.MNIST(data_path, train=False, transform=transform)
train_loader = torch.utils.data.DataLoader(dataset1, **train_kwargs)
test_loader = torch.utils.data.DataLoader(dataset2, **test_kwargs)
model = Net().to(device)
optimizer = optim.Adadelta(model.parameters(), lr=args.lr)
scheduler = StepLR(optimizer, step_size=1, gamma=args.gamma)
for epoch in range(1, args.epochs + 1):
train(args, model, device, train_loader, optimizer, epoch)
test(model, device, test_loader)
scheduler.step()
# 儲存模型
save_model(model)
def save_model(model):
"""將模型轉為TorchScript,儲存到指定路徑."""
output_model_path = os.environ.get("PAI_OUTPUT_MODEL")
os.makedirs(output_model_path, exist_ok=True)
m = torch.jit.script(model)
m.save(os.path.join(output_model_path, "mnist_cnn.pt"))
if __name__ == "__main__":
main()您需要將上述訓練代碼儲存到本地目錄中,後續使用Estimator提交到PAI上執行。本樣本將建立一個train_src目錄,將訓練指令碼儲存為train_src/train.py。
|-- train_src # 待上傳的訓練指令碼目錄
|-- requirements.txt # 可選:訓練作業的第三方包依賴
`-- train.py # 儲存的訓練作業指令碼提交訓練作業
Estimator支援使用本地的訓練指令碼、指定的鏡像在PAI上執行訓練作業。
訓練作業指令碼和命令:
您的訓練指令碼所在目錄(參數source_dir)會被上傳到OSS,在作業啟動之前準備到作業容器中,預設為
/ml/usercode目錄。您指定的啟動命令(command參數)的工作目錄同樣是/ml/usercode。
訓練作業鏡像:
本樣本使用PAI提供的PyTorch鏡像運行訓練作業。
訓練作業超參:
您可以通過讀取
${PAI_CONFIG_DIR}/hyperparameters.json檔案擷取訓練作業的超參 ,也可以通過環境變數擷取訓練作業超參,詳情請參見訓練作業預置環境變數。本樣本執行的命令是
python train.py $PAI_USER_ARGS,其中PAI_USER_ARGS環境變數是作業超參以命令列參數的方式拼接獲得的字串。訓練作業最終的啟動命令是python train.py --epochs 5 --batch-size 256 --lr 0.5。通過
metric_definitions指定需要採集的Metrics:PAI的訓練服務支援從訓練作業輸出日誌中(訓練指令碼列印的標準輸出和標準錯誤輸出),以Regex匹配的方式捕獲訓練作業Metrics資訊。通過SDK列印的作業的詳情頁連結,您可以查看作業的詳情配置、輸出日誌以及訓練作業的Metrics。
通過
instance_type指定作業使用的機器執行個體類型:PAI的訓練作業支援的機器執行個體類型,請參見附錄:公用資源規格列表。
在本樣本中,Estimator的範例程式碼如下:
from pai.estimator import Estimator
from pai.image import retrieve
# 使用PAI提供的1.18PAI版本的PyTorch GPU鏡像運行訓練指令碼。
image_uri = retrieve(
"PyTorch", framework_version="1.8PAI", accelerator_type="GPU"
).image_uri
print(image_uri)
est = Estimator(
# 訓練作業啟動命令,預設工作目錄為/ml/usercode/。
command="python train.py $PAI_USER_ARGS",
# 需要上傳的訓練代碼目錄的相對路徑或絕對路徑。
# 預設會準備到訓練作業環境的/ml/usercode目錄下。
source_dir="./train_src/",
# 訓練作業鏡像。
image_uri=image_uri,
# 機器配置。
instance_type="ecs.gn6i-c4g1.xlarge", # 4vCPU 15GB 1*NVIDIA T4
# 訓練作業超參。
hyperparameters={
"epochs": 5,
"batch-size": 64 * 4,
"lr": 0.5,
},
# 訓練作業的Metric捕獲配置。
metric_definitions=[
{
"Name": "loss",
"Regex": r".*loss=([-+]?[0-9]*.?[0-9]+(?:[eE][-+]?[0-9]+)?).*",
},
],
base_job_name="pytorch_mnist",
)
將上傳到OSS的訓練資料作為訓練輸入資料,並執行訓練作業。
# 如果使用ossutil上傳訓練資料,您需要顯式賦值輸入資料的OSS URI路徑。
# data_uri = "oss://<YourOssBucket>/mnist/data/"
# 提交訓練作業
est.fit(
inputs={
"train_data": data_uri,
}
)
# 訓練作業產出的模型路徑
print("TrainingJob output model data:")
print(est.model_data())est.fit方法將您的訓練作業提交到PAI上執行。任務提交之後,SDK會列印工作詳情頁連結,並持續列印訓練作業的日誌,直到作業執行結束。
當您需要直接使用OSS上的資料,可以通過estimator.fit方法的inputs參數傳遞。通過inputs傳遞資料存放區路徑會被掛載到工作目錄中,您的訓練指令碼可以通過讀取本地檔案的方式載入資料。
對於提交訓練作業的詳細介紹,請參見提交訓練作業。
部署推理服務
在訓練作業執行成功後,您可以使用estimator.model_data()方法擷取訓練作業產出模型的OSS路徑。以下內容為您介紹如何將訓練產出的模型部署到PAI建立為線上推理服務。
通過
InferenceSpec定義模型推理的配置。您可以選擇使用Processor或自訂鏡像的模式進行模型部署。在以下樣本中,將分別使用兩種方式部署獲得的PyTorch模型。
通過
Model.deploy方法,佈建服務的使用資源、服務名稱等資訊,建立推理服務。
對於部署推理服務的詳細介紹,請參見部署推理服務。
Processor模式部署
Processor是PAI對於推理服務程式包的抽象描述,負責載入模型並啟動模型推理服務。模型推理服務支援使用者使用API方式進行調用。PAI提供了預置PyTorch Processor,支援使用者方便地將TorchScript格式的模型部署到PAI,建立推理服務。對於PyTorch Processor的詳細介紹,請參見:PyTorch Processor。
部署服務
本樣本通過PyTorch Processor將訓練產出的模型部署為一個推理服務,範例程式碼如下:
from pai.model import Model, InferenceSpec from pai.predictor import Predictor from pai.common.utils import random_str m = Model( model_data=est.model_data(), # 使用PAI提供的PyTorch Processor inference_spec=InferenceSpec(processor="pytorch_cpu_1.10"), ) p: Predictor = m.deploy( service_name="tutorial_pt_mnist_proc_{}".format(random_str(6)), instance_type="ecs.c6.xlarge", ) print(p.service_name) print(p.service_status)Model.deploy方法返回Predictor對象並連結到新建立的推理服務。您可以通過Predictor.predict方法向該服務發送預測請求並擷取預測結果。推理服務
使用NumPy構建一個測試樣本資料,發送給推理服務。
import numpy as np # 以上儲存TorchScritp模型要求輸入為 Float32, 資料格式的形狀為 (BatchSize, Channel, Weight, Height) dummy_input = np.random.rand(2, 1, 28, 28).astype(np.float32) # np.random.rand(1, 1, 28, 28).dtype res = p.predict(dummy_input) print(res) print(np.argmax(res, 1))刪除推理服務
在測試完成之後,可以通過
Predictor.delete_service刪除推理服務。p.delete_service()
鏡像部署
Processor模式啟動的推理服務效能優越,適用於效能較為敏感的情境。對於一些需要靈活自訂的情境,例如模型使用了一些第三方的依賴,或是推理服務需要有前處理和後處理,則可以通過鏡像部署的方式實現。SDK提供了pai.model.container_serving_spec()方法,支援您使用本地的推理服務代碼配合PAI提供的基礎鏡像的方式建立推理服務。
準備推理服務的代碼檔案。
在使用鏡像部署之前,您需要準備推理服務的代碼,該代碼負責載入模型、拉起HTTP Server、處理使用者的推理請求。使用Flask編寫一個模型服務的代碼,樣本如下:
import json from flask import Flask, request from PIL import Image import os import torch import torchvision.transforms as transforms import numpy as np import io app = Flask(__name__) # 使用者指定模型,預設會被載入到當前路徑下。 MODEL_PATH = "/eas/workspace/model/" device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") model = torch.jit.load(os.path.join(MODEL_PATH, "mnist_cnn.pt"), map_location=device).to(device) transform = transforms.Compose( [transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))] ) @app.route("/", methods=["POST"]) def predict(): # 預先處理圖片資料 im = Image.open(io.BytesIO(request.data)) input_tensor = transform(im).to(device) input_tensor.unsqueeze_(0) # 使用模型進行推理 output_tensor = model(input_tensor) pred_res =output_tensor.detach().cpu().numpy()[0] return json.dumps(pred_res.tolist()) if __name__ == '__main__': app.run(host="0.0.0.0", port=int(os.environ.get("LISTENING_PORT", 8000)))您需要將以上的代碼儲存到本地,供後續上傳使用。在本樣本中,您需要在本地建立目錄
infer_src,將上述推理服務代碼儲存為infer_src/run.py,目錄結構如下:|-- infer_src # 待上傳的推理服務代碼目錄 |-- requirements.txt # 可選:推理服務的第三方包依賴 `-- run.py # 儲存的推理服務指令碼通過
pai.model.container_serving_spec,基於本地指令碼和PAI提供的PyTorch鏡像建立一個InferenceSpec對象。from pai.model import InferenceSpec, container_serving_spec from pai.image import retrieve, ImageScope torch_image_uri = retrieve("PyTorch", framework_version="latest", image_scope=ImageScope.INFERENCE).image_uri inf_spec = container_serving_spec( command="python run.py", source_dir="./infer_src/", image_uri=torch_image_uri, requirements=["flask==2.0.0", "Werkzeug==2.2.2", "pillow", "torchvision"], ) print(inf_spec.to_dict())模型服務的代碼和啟動命令:
使用者指定的本地指令碼目錄
source_dir參數會被上傳到OSS,然後掛載到服務容器(預設到/ml/usercode目錄)。推理服務鏡像:
PAI提供了基礎的推理鏡像支援使用者使用,您可以通過
pai.image.retrieve方法,指定參數image_scope=ImageScope.INFERENCE擷取PAI提供的推理鏡像。模型服務的第三方依賴包:
模型服務代碼或是模型的依賴,可以通過
requirements參數指定,相應的依賴會在服務程式啟動前被安裝到環境中。
使用訓練作業輸出的模型和上述的InferenceSpec,通過
Model.deployAPI部署一個線上推理服務。from pai.model import Model from pai.common.utils import random_str import numpy as np m = Model( model_data=est.model_data(), inference_spec=inf_spec, ) predictor = m.deploy( service_name="torch_mnist_script_container_{}".format(random_str(6)), instance_type="ecs.c6.xlarge", )推理服務。
準備一張MNIST測試圖片。
import base64 from PIL import Image from IPython import display import io !pip install -q pillow # raw_data是一張MNIST圖片,對應數字9。 raw_data = base64.b64decode(b"/9j/4AAQSkZJRgABAQAAAQABAAD/2wBDAAgGBgcGBQgHBwcJCQgKDBQNDAsLDBkSEw8UHRofHh0aHBwgJC4nICIsIxwcKDcpLDAxNDQ0Hyc5PTgyPC4zNDL/wAALCAAcABwBAREA/8QAHwAAAQUBAQEBAQEAAAAAAAAAAAECAwQFBgcICQoL/8QAtRAAAgEDAwIEAwUFBAQAAAF9AQIDAAQRBRIhMUEGE1FhByJxFDKBkaEII0KxwRVS0fAkM2JyggkKFhcYGRolJicoKSo0NTY3ODk6Q0RFRkdISUpTVFVWV1hZWmNkZWZnaGlqc3R1dnd4eXqDhIWGh4iJipKTlJWWl5iZmqKjpKWmp6ipqrKztLW2t7i5usLDxMXGx8jJytLT1NXW19jZ2uHi4+Tl5ufo6erx8vP09fb3+Pn6/9oACAEBAAA/APn+rVhpmoarP5GnWNzeTYz5dvE0jfkoJovNMv8ATmK3tjc2zByhE8TIQw6jkdR6VVq9oumPrWuWGlxyLG95cRwK7dFLMFyfzr3aXwp4ltAfB3gWwudI01JNuoa7eZhku5AMHafvFOw2Dn6ZJ4z4yeLk1HUbXwrZSSy2Oh5heeaQu88wG1mLHk4wR9c+1eXUqsVYMpIIOQR2r1D4QazqOs/FnSG1fVLi9ZI5vL+2TNKc+U2ApYnB7/hXml5LLNfXEsxLSvIzOSMEsTk1DRVnT7+60vULe/spmhureQSRSL1Vh0NWNd1mXX9ZuNUuLe2gmuCGkS2QohbABbBJwTjJ9yelZ1f/2Q==") im = Image.open(io.BytesIO(raw_data)) display.display(im)將請求發送給推理服務。
推理服務使用HTTP請求體內的資料作為輸入的圖片,SDK的
raw_predict方法接受bytes資料類型的請求,通過POST方法,在請求體(HTTP Request Body)帶上使用者推理資料,發送給推理服務。from pai.predictor import RawResponse import numpy as np resp: RawResponse = predictor.raw_predict(data=raw_data) print(resp.json()) print(np.argmax(resp.json()))
測試完成之後可以刪除服務。
predictor.delete_service()
附件
本樣本的Jupyter Notebook:使用PAI Python SDK訓練和部署PyTorch模型。