PAI Python SDK の高レベル API を使用して、モデルを PAI にデプロイし、推論サービスを作成します。
デプロイワークフロー
この SDK は、モデルを EAS にデプロイしてサービスをテストするための高レベル API である pai.model.Model と pai.predictor.Predictor を提供します。
推論サービスを作成するための基本的なフローは次のとおりです:
-
プロセッサやランタイムイメージ情報など、推論サービスの構成を
pai.model.InferenceSpecオブジェクトで定義します。 -
デプロイ用の
InferenceSpecオブジェクトとモデルファイルを使用して、pai.model.Modelオブジェクトを作成します。 -
pai.model.Model.deploy()を呼び出して、PAI に推論サービスを作成します。リソース、サービス名、およびその他の構成を指定します。 -
deployメソッドはpai.predictor.Predictorオブジェクトを返します。そのpredictメソッドを使用して、推論リクエストをサービスに送信します。
例:
from pai.model import InferenceSpec, Model, container_serving_spec
from pai.image import retrieve, ImageScope
# 1. PAI が提供する PyTorch 推論ランタイムイメージを取得します。
torch_image = retrieve("PyTorch", framework_version="latest",
image_scope=ImageScope.INFERENCE)
# 2. InferenceSpec を使用して推論構成を定義します。
inference_spec = container_serving_spec(
# 推論サービスの起動コマンド。
command="python app.py",
source_dir="./src/"
# 推論ランタイムイメージ。
image_uri=torch_image.image_uri,
)
# 3. デプロイ用の Model オブジェクトをビルドします。
model = Model(
# OSS バケットからのモデルファイル。
model_data="oss://<YourBucket>/path-to-model-data",
inference_spec=inference_spec,
)
# 4. モデルを PAI-EAS にデプロイしてオンライン推論サービスを作成し、Predictor オブジェクトを返します。
predictor = model.deploy(
service_name="example_torch_service",
instance_type="ecs.c6.xlarge",
)
# 5. 推論サービスをテストします。
res = predictor.predict(data=data)
以降のセクションでは、推論サービスをデプロイするためのコード構成について説明します。
InferenceSpec の構成
プロセッサまたはランタイムイメージを使用して推論サービスをデプロイします。pai.model.InferenceSpec オブジェクトは、デプロイ方法、ストレージ設定、ウォームアップ設定、RPC バッチ処理などのサービス構成を定義します。構成済みの InferenceSpec オブジェクトを使用してサービスを作成します。
組み込みプロセッサの使用
プロセッサは、ユーザーが提供したモデルから直接推論サービスをビルドする推論サービスパッケージの PAI 抽象化です。PAI は、TensorFlow SavedModel、PyTorch TorchScript、XGBoost、LightGBM、PMML などの一般的な機械学習モデルフォーマットをサポートする組み込みプロセッサを提供します。完全なリストについては、「組み込みプロセッサ」をご参照ください。
-
プロセッサを使用してモデルをデプロイするには、以下のように
InferenceSpecを構成します:# 組み込みの TensorFlow プロセッサを使用します。 tf_infer_spec = InferenceSpec(processor="tensorflow_cpu_2.3") # 組み込みの PyTorch プロセッサを使用します。 tf_infer_spec = InferenceSpec(processor="pytorch_cpu_1.10") # 組み込みの XGBoost プロセッサを使用します。 xgb_infer_spec = InferenceSpec(processor="xgboost") -
ウォームアップデータファイルや RPC 設定など、推論サービスの追加機能を
InferenceSpecインスタンスで構成します。サービスパラメーターの完全なリストについては、「JSON ベースのデプロイ」をご参照ください。# InferenceSpec プロパティを直接構成します。 tf_infer_spec.warm_up_data_path = "oss://<YourOssBucket>/path/to/warmup-data" # ウォームアップデータファイルへのパス。 tf_infer_spec.metadata.rpc.keepalive = 1000 # リクエスト接続のキープアライブ期間。 print(tf_infer_spec.warm_up_data_path) print(tf_infer_spec.metadata.rpc.keepalive)
ランタイムイメージの使用
プロセッサを使用したデプロイは簡単ですが、柔軟なカスタム構成はサポートされていません。複雑な依存関係を持つモデルや推論サービスの場合、PAI はランタイムイメージを使用したデプロイをサポートしており、より高い柔軟性を提供します。
-
モデルサービスコードと依存関係を Docker イメージにパッケージ化し、Alibaba Cloud ACR イメージリポジトリにプッシュします。次に、デプロイ用に Docker イメージから InferenceSpec をビルドします。
from pai.model import InferenceSpec, container_serving_spec # ランタイムイメージによって提供されるモデルの InferenceSpec をビルドします。 container_infer_spec = container_serving_spec( # 推論サービスを実行するためのランタイムイメージ。 image_uri="<CustomImageUri>", # コンテナ内で推論サービスがリッスンするポート。PAI は予測リクエストをこのポートに転送します。 port=8000, environment_variables=environment_variables, # 推論サービスの起動コマンド。 command=command, # 推論サービスが依存する Python パッケージ。 requirements=[ "scikit-learn", "fastapi==0.87.0", ], ) print(container_infer_spec.to_dict()) m = Model( model_data="oss://<YourOssBucket>/path-to-tensorflow-saved-model", inference_spec=custom_container_infer_spec, ) p = m.deploy( instance_type="ecs.c6.xlarge" ) -
カスタムイメージを使用したデプロイでは、通常、推論コードの準備、コンテナへの統合、イメージのビルドとリポジトリへのプッシュが必要です。PAI SDK は、イメージを手動でビルドすることなく、ローカルコードとベースイメージから推論サービスをビルドすることで、このプロセスを簡素化します。
pai.model.container_serving_spec()のsource_dirパラメーターを使用して、ローカルコードディレクトリを指定します。SDK はこのディレクトリを自動的にパッケージ化して OSS バケットにアップロードし、実行中のコンテナ内のパスにマウントします。指定された起動コマンドを使用してサービスを開始します。from pai.model import InferenceSpec inference_spec = container_serving_spec( # 推論プログラムを含むローカルディレクトリへのパス。これは OSS バケットにアップロードされ、実行中のコンテナにマウントされます。デフォルトのマウントパスは /ml/usercode/ です。 source_dir="./src", # サービスの起動コマンド。source_dir が指定されている場合、コマンドは /ml/usercode をデフォルトの作業ディレクトリとして実行されます。 command="python run.py", image_uri="<ServingImageUri>", requirements=[ "fastapi", "uvicorn", ] ) print(inference_spec.to_dict()) -
追加のデータ、コード、またはモデルを推論サービスコンテナにインポートするには、
pai.model.InferenceSpec.mount()を使用して、ローカルディレクトリまたは OSS データパスをオンラインサービスコンテナにマウントします。# ローカルデータを OSS にアップロードし、コンテナ内の /ml/tokenizers/ ディレクトリにマウントします。 inference_spec.mount("./bert_tokenizers/", "/ml/tokenizers/") # OSS に保存されているデータをコンテナ内の /ml/data/ ディレクトリにマウントします。 inference_spec.mount("oss://<YourOssBucket>/path/to/data/", "/ml/data/") -
PAI パブリックイメージの取得
PAI は、
TensorFlow、PyTorch、XGBoostなどの一般的なフレームワーク用の推論ランタイムイメージを提供します。pai.image.list_imagesおよびpai.image.retrieveにimage_scope=ImageScope.INFERENCEを渡して、モデルデプロイに対応する推論ランタイムイメージを取得します。from pai.image import retrieve, ImageScope, list_images # PAI が提供するすべての PyTorch 推論ランタイムイメージを取得します。 for image_info in list_images(framework_name="PyTorch", image_scope=ImageScope.INFERENCE): print(image_info) # PAI が提供する PyTorch 1.12 CPU 推論ランタイムイメージを取得します。 retrieve(framework_name="PyTorch", framework_version="1.12", image_scope=ImageScope.INFERENCE) # PAI が提供する PyTorch 1.12 GPU 推論ランタイムイメージを取得します。 retrieve(framework_name="PyTorch", framework_version="1.12", accelerator_type="GPU", image_scope=ImageScope.INFERENCE) # PAI が提供する最新の PyTorch GPU 推論ランタイムイメージを取得します。 retrieve(framework_name="PyTorch", framework_version="latest", accelerator_type="GPU", image_scope=ImageScope.INFERENCE)
オンライン推論サービスのデプロイと呼び出し
推論サービスのデプロイ
pai.model.InferenceSpec オブジェクトと model_data で指定されたモデルデータアドレスを使用して、pai.model.Model オブジェクトをビルドします。.deploy を呼び出してモデルをデプロイします。model_data パラメーターは、OSS URI またはローカルパスを受け入れます。ローカルパスが指定された場合、モデルファイルは OSS バケットにアップロードされ、推論サービスでロードして使用できるように準備されます。
.deploy を呼び出してモデルをデプロイする際に、必要なリソース構成、サービスインスタンス数、サービス名などのサービスパラメーターを指定します。詳細パラメーターについては、「JSON ベースのデプロイ」をご参照ください。
from pai.model import Model, InferenceSpec
from pai.predictor import Predictor
model = Model(
# model_data へのパス。OSS URI またはローカルパスを指定できます。ローカルパスの場合、デフォルトで OSS バケットにアップロードされます。
model_data="oss://<YourBucket>/path-to-model-data",
inference_spec=inference_spec,
)
# EAS にデプロイします。
predictor = m.deploy(
# 推論サービスの名前。
service_name="example_xgb_service",
# サービスが使用するマシンタイプ。
instance_type="ecs.c6.xlarge",
# マシンインスタンス/サービス数。
instance_count=2,
# 専用リソースグループ。これはオプションです。デフォルトではパブリックリソースグループが使用されます。
# resource_id="<YOUR_EAS_RESOURCE_GROUP_ID>",
options={
"metadata.rpc.batching": True,
"metadata.rpc.keepalive": 50000,
"metadata.rpc.max_batch_size": 16,
"warm_up_data_path": "oss://<YourOssBucketName>/path-to-warmup-data",
},
)
CPU やメモリなどの特定のリソース要件に基づいてサービスを構成するには、resource_config を使用して各サービスインスタンスにリソースを割り当てます:
from pai.model import ResourceConfig
predictor = m.deploy(
service_name="dedicated_rg_service",
# 単一のサービスインスタンスが使用する CPU およびメモリリソースを指定します。
# この例では、各サービスは 2 CPU コアと 4000 MB のメモリを使用します。
resource_config=ResourceConfig(
cpu=2,
memory=4000,
),
)
推論サービスの呼び出し
pai.model.Model.deploy メソッドは EAS API を呼び出して新しい推論サービスを作成し、新しく作成されたサービスを指す pai.predictor.Predictor オブジェクトを返します。Predictor オブジェクトは、予測リクエストをサービスに送信するための predict および raw_predict メソッドを提供します。
pai.predictor.Predictor.raw_predict の入出力は、シリアライザーによる処理を必要としません。
from pai.predictor import Predictor, EndpointType
# 新しい推論サービスを作成します。
predictor = model.deploy(
instance_type="ecs.c6.xlarge",
service_name="example_xgb_service",
)
# 既存の推論サービスを使用します。
predictor = Predictor(
service_name="example_xgb_service",
# デフォルトでは、サービスはインターネット経由でアクセスされます。VPC ネットワーク経由のアクセスを構成します。クライアントコードは VPC 環境で実行する必要があります。
# endpoint_type=EndpointType.INTRANET
)
# .predict は、対応するサービスにデータリクエストを送信し、応答を取得します。入力データと応答はシリアライザーによって処理されます。
res = predictor.predict(data_in_nested_list)
# .raw_predict は、推論サービスにリクエストを送信するためのより柔軟な方法を提供します。
response: RawResponse = predictor.raw_predict(
# 入力データがバイトまたはファイルライクオブジェクトの場合、リクエストデータは HTTP リクエストボディで直接渡されます。
# それ以外の場合は、まず JSON にシリアル化されてから、HTTP リクエストボディで渡されます。
data=data_in_nested_list
# path="predict" # カスタム HTTP リクエストパス。デフォルトでは、リクエストは "/" パスに送信されます。
# headers=dict(), # カスタムリクエストヘッダー。
# method="POST" # カスタム HTTP メソッド。
# timeout=30, # カスタムリクエストタイムアウト。
)
# 返されたボディとヘッダーを取得します。
print(response.content, response.headers)
# 返された結果を JSON から Python オブジェクトに逆シリアル化します。
print(response.json())
# 推論サービスを停止します。
predictor.stop_service()
# 推論サービスを開始します。
predictor.start_service()
# 推論サービスを削除します。
predictor.delete_service()
シリアライザーによる入出力の処理
SDK の pai.predictor.Predictor.predict メソッドを使用して推論サービスを呼び出す場合、入力 Python データ構造を、サービスが処理できるフォーマットにシリアル化する必要があります。同様に、サービスからの応答データは、読み取り可能な Python オブジェクトに逆シリアル化する必要があります。Predictor は serializer パラメーターを使用して、予測データをシリアル化し、予測応答を逆シリアル化します。
-
predict(data=<PredictionData>)を呼び出すと、dataパラメーターはserializer.serializeによってシリアル化されます。これによりbytesオブジェクトが生成され、HTTP リクエストボディで推論サービスに渡されます。 -
推論サービスが HTTP 応答を返すと、
Predictorオブジェクトはserializer.deserializeを使用して HTTP レスポンスボディを逆シリアル化します。その結果がpredictから返されます。
SDK には、一般的なデータの型をサポートし、PAI の組み込みディープ ラーニング プロセッサの入出力データを処理する組み込みシリアライザーが用意されています。
-
JsonSerializer
JsonSerializerオブジェクトは、JSON フォーマットによるデータのシリアル化および逆シリアル化をサポートします。predictに渡されるdataは、numpy.ndarrayまたはListです。JsonSerializer.serializeは配列を JSON 文字列にシリアル化します。JsonSerializer.deserializeは受信した JSON 文字列を Python オブジェクトに逆シリアル化します。PAI の組み込みプロセッサ(例:XGBoost や PMML)は、JSON フォーマットでデータを受け取り、返します。これらのプロセッサを使用して作成されたサービスでは、Predictor はデフォルトで JsonSerializer を使用します。
from pai.serializers import JsonSerializer
# ".deploy" メソッドで返される Predictor にシリアライザを指定します。
p = Model(
inference_spec=InferenceSpec(processor="xgboost"),
model_data="oss://<YourOssBucket>/path-to-xgboost-model"
).deploy(
instance_type="ecs.c6.xlarge",
# 任意: XGBoost プロセッサを使用するサービスはデフォルトで JsonSerializer を使用します。
serializer=JsonSerializer()
)
# または、Predictor 作成時に直接シリアライザを指定します。
p = Predictor(
service_name="example_xgb_service",
serializer=JsonSerializer(),
)
# 予測結果もリスト形式です。
res = p.predict([[2,3,4], [4,5,6]])
TensorFlowSerializer
PAI の組み込み TensorFlow プロセッサを使用して、TensorFlow SavedModel を PAI にデプロイし、推論サービスを作成します。このサービスの入出力メッセージフォーマットは Protocol Buffers です。ファイル形式の詳細については、tf_predict.proto をご参照ください。
SDK には、numpy.ndarray 型のデータを送信するための組み込み TensorFlowSerializer が用意されています。このシリアライザは、numpy.ndarray データを対応する Protocol Buffers メッセージに変換し、受信した Protocol Buffers メッセージを numpy.ndarray データ型に逆シリアル化します。
# TensorFlow プロセッササービスを作成します。
tf_predictor = Model(
inference_spec=InferenceSpec(processor="tensorflow_cpu_2.7"),
model_data="oss://<YourOssBucket>/path-to-tensorflow-saved-model"
).deploy(
instance_type="ecs.c6.xlarge",
# 任意: TensorFlow プロセッサを使用するサービスはデフォルトで TensorFlowSerializer を使用します。
# serializer=TensorFlowSerializer(),
)
# TensorFlow プロセッサで起動したサービスでは、API 経由でモデルのサービス署名を取得できます。
print(tf_predictor.inspect_signature_def())
# TensorFlow プロセッサの入力には Dict が必要です。キーはモデルの入力署名の名前、値は具体的な入力データです。
tf_result = tf_predictor.predict(data={
"flatten_input": numpy.zeros(28*28*2).reshape((-1, 28, 28))
})
assert result["dense_1"].shape == (2, 10)
PyTorchSerializer
PAI の組み込み PyTorch プロセッサを使用して、TorchScript フォーマット のモデルを推論サービスとしてデプロイします。このサービスの入出力データ形式は Protocol Buffers です。ファイル形式の詳細については、pytorch_predict_proto をご参照ください。
SDK には、numpy.ndarray 型のデータを送信し、予測結果を numpy.ndarray に変換するための組み込み PyTorchSerializer が用意されています。PyTorchSerializer は、Protocol Buffers メッセージと numpy.ndarray 間の変換を処理します。
# PyTorch プロセッサを使用するサービスを作成します。
torch_predictor = Model(
inference_spec=InferenceSpec(processor="pytorch_cpu_1.10"),
model_data="oss://<YourOssBucket>/path-to-torch_script-model"
).deploy(
instance_type="ecs.c6.xlarge",
# 任意: PyTorch プロセッサを使用するサービスはデフォルトで PyTorchSerializer を使用します。
# serializer=PyTorchSerializer(),
)
# 1. 入力データをモデルがサポートする形状に再整形します。
# 2. 複数の入力がある場合は、リストまたはタプルとして渡します。リスト内の各要素は numpy.ndarray です。
torch_result = torch_predictor.predict(data=numpy.zeros(28 * 28 * 2).reshape((-1, 28, 28)))
assert torch_result.shape == (2, 10)
カスタムシリアライザ
推論サービスがサポートするデータ形式に基づいて、Serializer クラスをカスタマイズします。カスタム Serializer クラスは pai.serializers.SerializerBase を継承し、serialize メソッドおよび deserialize メソッドを実装する必要があります。
以下の例は、カスタム NumpySerializer を示しています。predict が呼び出された際の全体的なフローは次のとおりです。
-
クライアントは、
numpy.ndarrayまたはpandas.DataFrameをpredictへの入力として渡します。NumpySerializer.serializerメソッドは、この入力をnpy フォーマットにシリアル化し、サーバーに送信します。 -
サーバーが
npyフォーマットのデータを受信し、逆シリアル化して推論結果を生成します。その後、サービスが結果をnpyフォーマットにシリアル化して返します。 -
クライアントが
npyフォーマットで返されたデータを受信します。NumpySerializer.deserializeメソッドがデータをnumpy.ndarrayに逆シリアル化します。
ローカル環境でのサービスのデプロイと呼び出し
カスタムイメージデプロイメントの場合、SDK にはローカル実行モードが提供されます。このモードは、プロセッサを使用してデプロイされたサービスには該当しません。推論サービスをローカルで実行するには、instance_type="local" を model.deploy メソッドに渡します。SDK は docker を使用してローカルでモデルサービスを起動し、必要なモデルデータを OSS から自動的にダウンロードしてローカルコンテナにマウントします。
from pai.predictor import LocalPredictor
p: LocalPredictor = model.deploy(
# ローカルで実行するように指定します。
instance_type="local",
serializer=JsonSerializer()
)
p.predict(data)
# 対応する Docker コンテナを削除します。
p.delete_service()関連情報
PAI Python SDK を使用した PyTorch モデルのトレーニングとデプロイの完全なプロセスについては、「PAI Python SDK を使用して PyTorch モデルをトレーニングおよびデプロイする」をご参照ください。