PAI SDK for Python provides easy-to-use HighLevel APIs that allow you to train and deploy models in Platform for AI (PAI). This topic describes how to use PAI SDK for Python to train and deploy a PyTorch model.
Background information
PyTorch is a flexible and high-performing deep learning framework that can be seamlessly integrated with Python ecosystem. PyTorch is widely used in image classification, speech recognition, Natural Language Processing (NLP), recommendation, and AI-generated content (AIGC). This topic describes how to use PAI SDK for Python to train and deploy a PyTorch model, and use the trained model to deploy an inference service. Perform the following steps:
Install PAI SDK for Python, and configure the AccessKey pair, PAI workspace, and Object Storage Service (OSS) bucket.
Download an MNIST dataset and upload it to OSS for the training job.
In this example, an MNIST script in the PyTorch sample repository is used as a template. Perform simple modifications on the template and use it as the training script.
Use the Estimator API provided by PAI SDK for Python to create a training job and submit it to PAI.
Deploy the model output from the preceding steps to Elastic Algorithm Service (EAS) by using a processor and an image separately and create an online inference service.
Prerequisites
You have obtained an AccessKey pair.
You have created a workspace.
You have created an OSS bucket.
You have prepared the environment of Python 3.7 or later.
Install and configure SDK
Run the following command on the CLI to install PAI SDK for Python:
python -m pip install "alipai>=0.4.0"If the ModuleNotFoundError error occurs, run the pip install --upgrade pip command to resolve it.
Run the following command on the CLI to configure PAI SDK for Python:
python -m pai.toolkit.configFor more information, see Install and configure PAI SDK for Python.
Prepare training data
In this example, the MNIST dataset is used to train an image classification model. To submit a training job in PAI, you need to prepare and upload a dataset to an OSS bucket.
Download the MNIST dataset.
Run the following Shell script to download the MNIST dataset to an on-premises directory named
data.#!/bin/sh set -e url_prefix="https://ossci-datasets.s3.amazonaws.com/mnist/" # You can use the following address if the download takes too long. # url_prefix="http://yann.lecun.com/exdb/mnist/" mkdir -p data/MNIST/raw/ wget -nv ${url_prefix}train-images-idx3-ubyte.gz -P data/MNIST/raw/ wget -nv ${url_prefix}train-labels-idx1-ubyte.gz -P data/MNIST/raw/ wget -nv ${url_prefix}t10k-images-idx3-ubyte.gz -P data/MNIST/raw/ wget -nv ${url_prefix}t10k-labels-idx1-ubyte.gz -P data/MNIST/raw/Upload the dataset to an OSS bucket.
You can use the CLI tool
ossutilprovided by OSS to upload data. For information about how to install and use ossutil, see ossutil 1.0. You can also use the method provided by PAI SDK for Python to upload the training data to the/mnist/data/path of the OSS bucket.Use
ossutil:ossutil cp -rf ./data oss://<YourOssBucket>/mnist/data/Use PAI SDK for Python:
from pai.common.oss_utils import upload from pai.session import get_default_session sess = get_default_session() data_uri = upload("./data/", oss_path="mnist/data/", bucket=sess.oss_bucket) print(data_uri)
Prepare a training script
You need to write a training script by using PyTorch before you submit a training job. The training script used in this example is modified based on the MNIST example provided by PyTorch. The modification includes modifying the logic of data loading and model saving.
Obtain the input data path by using environment variables
Use
estimator.fit(inputs={"train_data":data_uri})to mount the data stored in OSS to the training container. The training script can obtain the mounted data by reading the local file.The
inputsof theestimator.fitmethod is of the DICT type. Each input is a channel, where the key is the name of the channel, and the value is the path of the stored data. The training script can obtain the path of the mounted data in the working container by thePAI_INPUT_{ChannelNameUpperCase}variable.Modify the code of data loading based on the following content:
- dataset1 = datasets.MNIST("../data", train=True, download=True, transform=transform) - dataset2 = datasets.MNIST("../data", train=False, transform=transform) + # Obtain the input data path by using environment variables. + data_path = os.environ.get("PAI_INPUT_TRAIN_DATA", "../data") + dataset1 = datasets.MNIST(data_path, train=True, download=True, transform=transform) + dataset2 = datasets.MNIST(data_path, train=False, transform=transform)Obtain the output model path by using environment variables
You need to save the model to a specified path specified by the
PAI_OUTPUT_MODELvariable in the training environment. The default path is/ml/output/model. The data and model in the path are saved to your OSS bucket.Modify the code of model output based on the following content:
- if args.save_model: - torch.save(model.state_dict(), "mnist_cnn.pt") + # Save the model. + save_model(model) + + def save_model(model): + """Convert the model to TorchScript and save it to the specified path.""" + output_model_path = os.environ.get("PAI_OUTPUT_MODEL") + os.makedirs(output_model_path, exist_ok=True) + + m = torch.jit.script(model) + m.save(os.path.join(output_model_path, "mnist_cnn.pt"))
When you use a built-in PyTorch processor provided by PAI to create a service, the input model must be in the TorchScript format. In this example, the model is exported in the TorchScript format. Sample training script:
# source: https://github.com/pytorch/examples/blob/main/mnist/main.py
from __future__ import print_function
import argparse
import os
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.optim.lr_scheduler import StepLR
from torchvision import datasets, transforms
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(1, 32, 3, 1)
self.conv2 = nn.Conv2d(32, 64, 3, 1)
self.dropout1 = nn.Dropout(0.25)
self.dropout2 = nn.Dropout(0.5)
self.fc1 = nn.Linear(9216, 128)
self.fc2 = nn.Linear(128, 10)
def forward(self, x):
x = self.conv1(x)
x = F.relu(x)
x = self.conv2(x)
x = F.relu(x)
x = F.max_pool2d(x, 2)
x = self.dropout1(x)
x = torch.flatten(x, 1)
x = self.fc1(x)
x = F.relu(x)
x = self.dropout2(x)
x = self.fc2(x)
output = F.log_softmax(x, dim=1)
return output
def train(args, model, device, train_loader, optimizer, epoch):
model.train()
for batch_idx, (data, target) in enumerate(train_loader):
data, target = data.to(device), target.to(device)
optimizer.zero_grad()
output = model(data)
loss = F.nll_loss(output, target)
loss.backward()
optimizer.step()
if batch_idx % args.log_interval == 0:
print(
"Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}".format(
epoch,
batch_idx * len(data),
len(train_loader.dataset),
100.0 * batch_idx / len(train_loader),
loss.item(),
)
)
if args.dry_run:
break
def test(model, device, test_loader):
model.eval()
test_loss = 0
correct = 0
with torch.no_grad():
for data, target in test_loader:
data, target = data.to(device), target.to(device)
output = model(data)
test_loss += F.nll_loss(
output, target, reduction="sum"
).item() # sum up batch loss
pred = output.argmax(
dim=1, keepdim=True
) # get the index of the max log-probability
correct += pred.eq(target.view_as(pred)).sum().item()
test_loss /= len(test_loader.dataset)
print(
"\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n".format(
test_loss,
correct,
len(test_loader.dataset),
100.0 * correct / len(test_loader.dataset),
)
)
def main():
# Training settings
parser = argparse.ArgumentParser(description="PyTorch MNIST Example")
parser.add_argument(
"--batch-size",
type=int,
default=64,
metavar="N",
help="input batch size for training (default: 64)",
)
parser.add_argument(
"--test-batch-size",
type=int,
default=1000,
metavar="N",
help="input batch size for testing (default: 1000)",
)
parser.add_argument(
"--epochs",
type=int,
default=14,
metavar="N",
help="number of epochs to train (default: 14)",
)
parser.add_argument(
"--lr",
type=float,
default=1.0,
metavar="LR",
help="learning rate (default: 1.0)",
)
parser.add_argument(
"--gamma",
type=float,
default=0.7,
metavar="M",
help="Learning rate step gamma (default: 0.7)",
)
parser.add_argument(
"--no-cuda", action="store_true", default=False, help="disables CUDA training"
)
parser.add_argument(
"--dry-run",
action="store_true",
default=False,
help="quickly check a single pass",
)
parser.add_argument(
"--seed", type=int, default=1, metavar="S", help="random seed (default: 1)"
)
parser.add_argument(
"--log-interval",
type=int,
default=10,
metavar="N",
help="how many batches to wait before logging training status",
)
parser.add_argument(
"--save-model",
action="store_true",
default=False,
help="For Saving the current Model",
)
args = parser.parse_args()
use_cuda = not args.no_cuda and torch.cuda.is_available()
torch.manual_seed(args.seed)
device = torch.device("cuda" if use_cuda else "cpu")
train_kwargs = {"batch_size": args.batch_size}
test_kwargs = {"batch_size": args.test_batch_size}
if use_cuda:
cuda_kwargs = {"num_workers": 1, "pin_memory": True, "shuffle": True}
train_kwargs.update(cuda_kwargs)
test_kwargs.update(cuda_kwargs)
transform = transforms.Compose(
[transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))]
)
data_path = os.environ.get("PAI_INPUT_DATA")
dataset1 = datasets.MNIST(data_path, train=True, download=True, transform=transform)
dataset2 = datasets.MNIST(data_path, train=False, transform=transform)
train_loader = torch.utils.data.DataLoader(dataset1, **train_kwargs)
test_loader = torch.utils.data.DataLoader(dataset2, **test_kwargs)
model = Net().to(device)
optimizer = optim.Adadelta(model.parameters(), lr=args.lr)
scheduler = StepLR(optimizer, step_size=1, gamma=args.gamma)
for epoch in range(1, args.epochs + 1):
train(args, model, device, train_loader, optimizer, epoch)
test(model, device, test_loader)
scheduler.step()
# Save the model.
save_model(model)
def save_model(model):
"""Convert the model to TorchScript and save it to the specified path."""
output_model_path = os.environ.get("PAI_OUTPUT_MODEL")
os.makedirs(output_model_path, exist_ok=True)
m = torch.jit.script(model)
m.save(os.path.join(output_model_path, "mnist_cnn.pt"))
if __name__ == "__main__":
main()Save the preceding training code to an on-premises directory and use the Estimator to submit it to PAI. In this example, a new directory named train_src is created and the training script is saved to train_src/train.py.
|-- train_src # The directory of the training script to be uploaded.
|-- requirements.txt # Optional. The third-party dependencies of the training job.
'-- train.py # The saved training script Submit a training job
Estimator allows you to use an on-premises training script and an image to run training jobs in PAI.
Scripts and commands of the training job
The directory of the training script, which is specified by the source_dir parameter, is uploaded to OSS and prepared in the job container before the job is started. The default directory is
/ml/usercode. The working directory of the startup command, which is specified by the command parameter, is also/ml/usercode.Image of the training job
In this example, a PyTorch image provided by PAI is used to run the training job.
Hyperparameters of the training job
You can obtain the hyperparameters of the training job by reading the
${PAI_CONFIG_DIR}/hyperparameters.jsonfile or by using environment variables. For more information, see Preset environment variables of training jobs.In this example, the command executed is
python train.py $PAI_USER_ARGS, where thePAI_USER_ARGSvariable is a string obtained by the hyperparameter. The final startup command for the training job ispython train.py --epochs 5 --batch-size 256 --lr 0.5.Specify training metrics by using
metric_definitionsPAI allows you to obtain training metrics by matching regular expressions from the output training logs that contain standard outputs and standard errors. The system also prints a link to a details page. You can view the detailed configuration, output logs, and metrics of the training job on the details page.
Specify the instance type for the training job by using
instance_typeFor more information about the instance types supported by training jobs in PAI, see Appendix: Pricing details of the public resource group.
Sample Estimator code:
from pai.estimator import Estimator
from pai.image import retrieve
# Use the PyTorch 1.18 image for GPU-based trainings to run the training script.
image_uri = retrieve(
"PyTorch", framework_version="1.8PAI", accelerator_type="GPU"
).image_uri
print(image_uri)
est = Estimator(
# Startup command of the training job. The default working directory is /ml/usercode/.
command="python train.py $PAI_USER_ARGS",
# The relative path or absolute path of the training code directory to be uploaded.
# By default, the /ml/usercode directory of the training environment is used.
source_dir="./train_src/",
# The image of the training job.
image_uri=image_uri,
# Instance configuration.
instance_type="ecs.gn6i-c4g1.xlarge", # 4vCPU 15GB 1*NVIDIA T4
# Hyperparameters of the training job.
hyperparameters={
"epochs": 5,
"batch-size": 64 * 4,
"lr": 0.5,
},
# Metrics configuration of the training job.
metric_definitions=[
{
"Name": "loss",
"Regex": r".*loss=([-+]?[0-9]*.?[0-9]+(?:[eE][-+]?[0-9]+)?).*",
},
],
base_job_name="pytorch_mnist",
)
Use training data that is uploaded to OSS as input data and run the training job.
# If you use ossutil to upload training data, you need to explicitly specify the OSS URI of the input data.
# data_uri = "oss://<YourOssBucket>/mnist/data/"
# Submit the training job.
est.fit(
inputs={
"train_data": data_uri,
}
)
# The output path of the trained model.
print("TrainingJob output model data:")
print(est.model_data())You can use the est.fit method to submit your training job to PAI for running. After the training job is submitted, SDK prints the link of the job details page and the logs of the training job until the job running is complete.
If you need to use data in OSS, use the inputs parameter to pass data by using the estimator.fit method. If you use the inputs parameter to pass data, the data storage path is attached to the working directory. Your training script can load data by reading local files.
For more information about submitting a training job, see Submit a training job.
Deploy an inference service
After the training job is complete, you can use the estimator.model_data() method to obtain the OSS path of the model generated by the training job. The following section describes how to deploy the trained model to PAI as an online inference service.
Use
InferenceSpecto describe how to use the model to build an inference service.You can use a processor or a custom image to deploy the model. Both methods are described in the following example.
Use the
Model.deploymethod to configure information such as the resources used by the service and the service name, and create an inference service.
For more information, see Deploy inference services.
Deploy a model service by using a processor
A processor contains the abstract description of the inference package. It is used to load models and start model inference services. A model inference service provides APIs for users to call. PAI provides built-in PyTorch processors that allow you to deploy models in the TorchScript format to PAI and create an inference service.
Deploy a service.
In the following example, a PyTorch processor is used to deploy the trained model as an inference service. Sample code:
from pai.model import Model, InferenceSpec from pai.predictor import Predictor from pai.common.utils import random_str m = Model( model_data=est.model_data(), # Use the PyTorch processor provided by PAI. inference_spec=InferenceSpec(processor="pytorch_cpu_1.10"), ) p: Predictor = m.deploy( service_name="tutorial_pt_mnist_proc_{}".format(random_str(6)), instance_type="ecs.c6.xlarge", ) print(p.service_name) print(p.service_status)Model.deploycreates a new inference service and returns aPredictorobject. You can use thePredictor.predictmethod to send requests to the inference service and obtain the prediction results.
Run the inference service.
In this example, a test sample is created by using numpy and sent to the inference service.
import numpy as np # The input is of the Float32 type and in the format of (BatchSize, Channel, Weight, Height). dummy_input = np.random.rand(2, 1, 28, 28).astype(np.float32) # np.random.rand(1, 1, 28, 28).dtype res = p.predict(dummy_input) print(res) print(np.argmax(res, 1))Delete the interface service
You can delete the inference service by using
Predictor.delete_serviceafter the prediction is complete.p.delete_service()
Deploy a model by using an image
In scenarios that are performance-sensitive, you can deploy the model by using a processor. In scenarios that have custom requirements, such as when the model has third-party dependencies, or when the inference service requires preprocessing and post-processing, you can deploy the model by using an image. PAI Python SDK provides the pai.model.container_serving_spec() method that lets you create an inference service by using the on-premises code and an image provided by PAI.
Prepare the code file of the inference service.
You need to prepare the code that is used to load the model, start the HTTP server, and process the inference request before you deploy the model. In this example, the code is written by using Flask. Sample code:
import json from flask import Flask, request from PIL import Image import os import torch import torchvision.transforms as transforms import numpy as np import io app = Flask(__name__) # The model is loaded to the current path by default. MODEL_PATH = "/eas/workspace/model/" device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") model = torch.jit.load(os.path.join(MODEL_PATH, "mnist_cnn.pt"), map_location=device).to(device) transform = transforms.Compose( [transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))] ) @app.route("/", methods=["POST"]) def predict(): # Preprocess the image data. im = Image.open(io.BytesIO(request.data)) input_tensor = transform(im).to(device) input_tensor.unsqueeze_(0) # Perform inference with the model. output_tensor = model(input_tensor) pred_res =output_tensor.detach().cpu().numpy()[0] return json.dumps(pred_res.tolist()) if __name__ == '__main__': app.run(host="0.0.0.0", port=int(os.environ.get("LISTENING_PORT", 8000)))Save the preceding code to the on-premises machine so that you can upload it for later use. In this example, create a directory
infer_srcand save the preceding code toinfer_src/run.py. Sample directory structure:|-- infer_src # The code directory of the inference service to be uploaded. |-- requirements.txt # Optional. The third-party dependencies of the inference service. '-- run.py # The inference service script.Create an InferenceSpec object based on the on-premises script and the PyTorch image provided by PAI through
pai.model.container_serving_spec.from pai.model import InferenceSpec, container_serving_spec from pai.image import retrieve, ImageScope torch_image_uri = retrieve("PyTorch", framework_version="latest", image_scope=ImageScope.INFERENCE).image_uri inf_spec = container_serving_spec( command="python run.py", source_dir="./infer_src/", image_uri=torch_image_uri, requirements=["flask==2.0.0", "Werkzeug==2.2.2", "pillow", "torchvision"], ) print(inf_spec.to_dict())Code and startup command of the model service
The on-premises script directory specified by the
source_dirparameter is uploaded to OSS and then mounted to the service container. The/ml/usercodedirectory is used by default.Image used for the inference service
You can use the
pai.image.retrievemethod to obtain the images provided by PAI. Specify theimage_scopeparameter to ImageScope.INFERENCE when you obtain the images.Third-party dependency of the model services
You can use the
requirementsparameter to specify the code or dependencies of the model service. In this way, the dependencies are installed in the environment before the service starts.
Call the
Model.deployAPI to deploy an online inference service by using the trained model and the InferenceSpec.from pai.model import Model from pai.common.utils import random_str import numpy as np m = Model( model_data=est.model_data(), inference_spec=inf_spec, ) predictor = m.deploy( service_name="torch_mnist_script_container_{}".format(random_str(6)), instance_type="ecs.c6.xlarge", )Call the inference service.
Prepare an MNIST image.
import base64 from PIL import Image from IPython import display import io !pip install -q pillow # raw_data is an MNIST image, which corresponds to the number 9. raw_data = base64.b64decode(b"/9j/4AAQSkZJRgABAQAAAQABAAD/2wBDAAgGBgcGBQgHBwcJCQgKDBQNDAsLDBkSEw8UHRofHh0aHBwgJC4nICIsIxwcKDcpLDAxNDQ0Hyc5PTgyPC4zNDL/wAALCAAcABwBAREA/8QAHwAAAQUBAQEBAQEAAAAAAAAAAAECAwQFBgcICQoL/8QAtRAAAgEDAwIEAwUFBAQAAAF9AQIDAAQRBRIhMUEGE1FhByJxFDKBkaEII0KxwRVS0fAkM2JyggkKFhcYGRolJicoKSo0NTY3ODk6Q0RFRkdISUpTVFVWV1hZWmNkZWZnaGlqc3R1dnd4eXqDhIWGh4iJipKTlJWWl5iZmqKjpKWmp6ipqrKztLW2t7i5usLDxMXGx8jJytLT1NXW19jZ2uHi4+Tl5ufo6erx8vP09fb3+Pn6/9oACAEBAAA/APn+rVhpmoarP5GnWNzeTYz5dvE0jfkoJovNMv8ATmK3tjc2zByhE8TIQw6jkdR6VVq9oumPrWuWGlxyLG95cRwK7dFLMFyfzr3aXwp4ltAfB3gWwudI01JNuoa7eZhku5AMHafvFOw2Dn6ZJ4z4yeLk1HUbXwrZSSy2Oh5heeaQu88wG1mLHk4wR9c+1eXUqsVYMpIIOQR2r1D4QazqOs/FnSG1fVLi9ZI5vL+2TNKc+U2ApYnB7/hXml5LLNfXEsxLSvIzOSMEsTk1DRVnT7+60vULe/spmhureQSRSL1Vh0NWNd1mXX9ZuNUuLe2gmuCGkS2QohbABbBJwTjJ9yelZ1f/2Q==") im = Image.open(io.BytesIO(raw_data)) display.display(im)Send a request to the inference service.
The inference service uses the data in the HTTP request body as the input image. The
raw_predictmethod accepts the request whose data is of thebytestype. Then, PAI Python SDK uses thePOSTmethod to include the inference data in the request body and send the data to the inference service.from pai.predictor import RawResponse import numpy as np resp: RawResponse = predictor.raw_predict(data=raw_data) print(resp.json()) print(np.argmax(resp.json()))
Delete the service after the test is complete.
predictor.delete_service()
Appendix
Jupyter Notebook of this example: Train and deploy a PyTorch model