すべてのプロダクト
Search
ドキュメントセンター

Platform For AI:Python を使用したカスタムプロセッサの開発

最終更新日:Apr 01, 2026

モデルファイルとともに、カスタム Python プロセッサをローカルで開発・デバッグし、その後、Object Storage Service (OSS) に別々にアップロードしてサービスをデプロイできます。本トピックでは、Python を使用したカスタムプロセッサの開発手順について説明します。

背景情報

Python を使用したカスタムプロセッサを開発するには、以下の手順に従います:

  1. ステップ 1:開発環境のセットアップ

    EAS が提供する Python SDK は、さまざまな Python 機械学習フレームワークをサポートしており、Pandas などのデータ分析・処理フレームワークとも統合可能です。本トピックでは、カスタムプロセッサの開発のためのローカル Python 開発環境の構築方法、およびその環境のパッケージ化とアップロード方法について説明します。

  2. ステップ 2:予測ロジックの記述

    EAS の Python SDK は、高性能な RPC フレームワークおよび EAS クラスターとの連携に必要な内部インターフェイスを提供します。モデルサービスを EAS にデプロイするには、予測ロジックのみを記述し、シンプルなインターフェイスを実装すれば十分です。

  3. ステップ 3:サービスのローカルテスト

    予測ロジックの記述後、サービスをローカルでテストする必要があります。ローカルテストにより、予測ロジックの正しさを検証でき、デプロイ後のサービスが期待通りに機能することを保証できます。

  4. ステップ 4:コードおよび Python 環境のパッケージ化

    サービスデプロイ用にコードおよび環境をパッケージ化します。

  5. ステップ 5:プロセッサパッケージおよびモデルファイルのアップロード

    プロセッサパッケージおよびモデルファイルを、Object Storage Service (OSS) に別々にアップロードします。

  6. ステップ 6:サービスのデプロイおよびテスト

    作成したカスタムプロセッサをモデルサービスとしてデプロイします。

説明
  • モデルファイルとプロセッサを分離してください。これにより、モデルのファインチューニングおよび再デプロイ時にプロセッサパッケージを再利用できます。予測ロジックの記述時には、モデルのストレージパスを取得するために get_model_path() メソッドを使用し、ロジックがモデルを読み込めるようにします。

  • カスタムプロセッサに多数の依存関係がある場合、またはプロセッサパッケージが大規模な場合は、サービスデプロイにイメージを使用することを推奨します。2 つのデプロイ方法の比較については、「仕組み(イメージデプロイ)」をご参照ください。

前提条件

モデルファイルが必要です。

説明

メンテナンスの簡素化のため、モデルファイルとカスタムプロセッサを別々に開発してください。開発後は、両者を Object Storage Service (OSS) に別々にアップロードし、サービスデプロイ時にマウントできるようにします。

ステップ 1:開発環境のセットアップ

Pyenv などの Python パッケージ管理ツールを使用して、開発環境をセットアップできます。EAS は、Python SDK の初期化プロセスをラップしたクライアントツール EASCMD を提供しています。このツールをダウンロードすると、単一のコマンドで Python SDK 環境を初期化し、関連するファイルテンプレートを生成できます。このツールは Linux オペレーティングシステムでのみ使用可能です。以下にコマンド例を示します:

# EASCMD のインストールおよび初期化。この例では、Linux 向け EASCMD クライアントのインストール方法を示します。
$ wget https://eas-data.oss-cn-shanghai.aliyuncs.com/tools/eascmd/v2/eascmd64
# ダウンロード完了後、アクセス権限を変更し、Alibaba Cloud AccessKey ペアを設定します。
$ chmod +x eascmd64
$ ./eascmd64 config -i <access_id> -k <access_key>

# 環境を初期化します。
$ ./eascmd64 pysdk init ./pysdk_demo

プロンプトが表示されたら、Python バージョン(デフォルトは 3.6)を入力します。その後、システムが自動的に、ENV という名前の Python 環境ディレクトリ、app.py という名前の予測サービスコードテンプレート、および app.json という名前のサービスデプロイテンプレートを、指定した ./pysdk_demo ディレクトリ内に作成します。

ステップ 2:予測ロジックの記述

ENV と同じディレクトリ内に、予測サービスのメインファイル app.py を作成します。以下にコード例を示します:

説明

EASCMD を使用して Python 環境を初期化すると、app.py ファイルが自動生成されます。必要に応じて、ファイル内容を変更できます。

# -*- coding: utf-8 -*-
import allspark


class MyProcessor(allspark.BaseProcessor):
    """ MyProcessor はサンプルです。
        予測を実行するには、次のようなメッセージを送信します。
        curl -v http://127.0.0.1:8080/api/predict/service_name -d '2 105'
    """
    def initialize(self):
        """ モジュールのロード。サービス起動時に 1 回実行されます。
             サービスの初期化およびモデルのロードをこの関数内で実行します。
        """
        self.module = {'w0': 100, 'w1': 2}
        # model_dir = self.get_model_path().decode()
        # load_model 関数を自ら実装する必要があります。たとえば、model.pt ファイルをロードするには、torch.load(model_dir + "/model.pt") のように実装します。
        # self.model = load_model(model_dir)

    def pre_process(self, data):
        """ データ形式の前処理
        """
        x, y = data.split(b' ')
        return int(x), int(y)

    def post_process(self, data):
        """ 処理後の後処理
        """
        return bytes(data, encoding='utf8')

    def process(self, data):
        """ リクエストデータの処理
        """
        x, y = self.pre_process(data)
        w0 = self.module['w0']
        w1 = self.module['w1']
        y1 = w1 * x + w0
        if y1 >= y:
            return self.post_process("True"), 200
        else:
            return self.post_process("False"), 400


if __name__ == '__main__':
    # allspark.default_properties().put('rpc.keepalive', '10000')
    # サービスのタイムアウトを 10 秒に設定します(デフォルトは 5 秒)。
    # worker_threads パラメーターは、処理の同時実行数を示します。
    runner = MyProcessor(worker_threads=10)
    runner.run()

上記のコードは、Python SDK の簡単な例です。このコードでは、EAS が提供する BaseProcessor 基底クラスを継承し、initialize() および process() 関数を実装しています。

メソッド

説明

使用方法

initialize()

プロセッサの初期化メソッドです。サービス起動時に、モデルのロードなど初期化タスクを実行します。

プロセッサ実装からモデルファイルの取得を分離するために、initialize() メソッドに以下のコードを追加します。

model_dir = self.get_model_path().decode()
self.model = load_model(model_dir)
  • get_model_path() メソッドを使用すると、モデルファイルのストレージディレクトリ(バイトオブジェクト)を取得できます。これは、サービスインスタンス内におけるアップロード済みモデルファイルの実際のストレージディレクトリです。

  • カスタムの load_model() 関数を実装して、モデルファイルをロードし、サービスデプロイで使用します。たとえば、model.pt ファイルをロードするには、torch.load(model_dir + "/model.pt") のように実装します。

get_model_path()

モデルファイルのストレージディレクトリをバイトオブジェクトとして取得します。

デプロイ JSON で model_path パラメーターを指定した場合、get_model_path() メソッドを呼び出して、サービスインスタンス内のモデルのストレージディレクトリを取得し、モデルをロードします。

process(data)

リクエスト処理メソッドです。各リクエストに対して、リクエストボディが process() メソッドのパラメーターとして渡され、その戻り値がクライアントに送信されます。

data パラメーターは、BYTES 型のリクエストボディです。戻り値は BYTES 型および INT 型であり、それぞれ response_data および status_code の出力パラメーターに対応します。通常のリクエストでは、status_code0 または 200 になります。

_init_(worker_threads=5, worker_processes=1,endpoint=None)

プロセッサのコンストラクターです。

  • worker_threads:ワーカースレッド数。デフォルト値は 5 です。

  • worker_processes: プロセスの数。デフォルト値: 1。worker_processes が 1 の場合、サービスはシングルプロセス、マルチスレッドモードで実行されます。worker_processes が 1 より大きい場合、worker_threads はデータの読み取りのみを行い、複数のプロセスがリクエストを同時に処理します。各プロセスは initialize() メソッドを実行します。

  • endpoint:サービスがリッスンするエンドポイントです。このパラメーターを使用して、サービスがリッスンするアドレスおよびポート(例:endpoint='0.0.0.0:8079')を指定できます。

    説明

    ここで設定するポートは、EAS によって予約されているシステムリッスンポート(8080、9090)であってはなりません。

run()

サービスを起動します。

パラメーターはありません。

ステップ 3:サービスのローカルテスト

  1. ターミナルで、app.py ファイルを含むディレクトリに移動し、以下のコマンドを実行して Python アプリケーションを起動します。

    ./ENV/bin/python app.py

    以下の出力が表示された場合、アプリケーションが正常に起動したことを示します。

    [INFO] サービス初期化の完了を待機中...
    [INFO] サービス初期化完了
    [INFO] サービスを作成中
    [INFO] RPC が事前定義ポート 8080 にバインドされました
    [INFO] /api/builtin/call への組み込みハンドラ call をインストールしました
    [INFO] /api/builtin/eastool への組み込みハンドラ eastool をインストールしました
    [INFO] /api/builtin/monitor への組み込みハンドラ monitor をインストールしました
    [INFO] /api/builtin/ping への組み込みハンドラ ping をインストールしました
    [INFO] /api/builtin/prop への組み込みハンドラ prop をインストールしました
    [INFO] /api/builtin/realtime_metrics への組み込みハンドラ realtime_metrics をインストールしました
    [INFO] /api/builtin/tell への組み込みハンドラ tell をインストールしました
    [INFO] /api/builtin/term への組み込みハンドラ term をインストールしました
    [INFO] サービスが正常に起動しました
  2. 新しいターミナルを開き、以下のコマンドを実行してアプリケーションの応答をテストします。

    ステップ 2 のコード例に基づき、以下のコマンドを実行してリクエストデータをアプリケーションに送信します。返される結果は、コードのロジックを反映します。

    curl http://127.0.0.1:8080/test  -d '10 20'

ステップ 4:コードおよび環境のパッケージ化

EASCMD は、環境をワンステップでパッケージ化できるパッケージングコマンドを提供しています。EASCMD を使用せずにカスタムプロセッサを開発している場合でも、完全な環境を手動でパッケージ化できます。以下に、両方の方法について説明します。

  • EASCMD が提供するパッケージングコマンドを使用します(Linux のみ)。

    $ ./eascmd64 pysdk pack ./pysdk_demo

    コマンドが正常に実行された場合、以下の出力が表示されます:

    [PYSDK] パッケージを作成中: /home/xi****.lwp/code/test/pysdk_demo.tar.gz
  • 手動によるパッケージ化

    要件

    詳細

    パッケージ形式

    パッケージは、.zip または .tar.gz 形式の圧縮アーカイブである必要があります。

    パッケージの内容

    • パッケージのルートディレクトリは /ENV でなければならず、app.py ファイルを含む必要があります。

    • パッケージ例:.tar.gz パッケージ例

ステップ 5:パッケージおよびモデルファイルのアップロード

パッケージ化後、.zip または .tar.gz 形式の完全なパッケージファイルおよびモデルファイルを、Object Storage Service (OSS) に別々にアップロードし、サービスデプロイ時にマウントできるようにします。OSS へのファイルアップロード方法については、「ossutil のクイックスタート」をご参照ください。

ステップ 6:サービスのデプロイおよびテスト

PAI コンソールまたは EASCMD クライアントを使用して、モデルサービスをデプロイできます。

  1. サービスをデプロイします。

    PAI コンソール

    1. PAI コンソール にログインします。ページ上部からリージョンを選択し、目的のワークスペースを選択して、Elastic Algorithm Service (EAS) をクリックします。

    2. Deploy Service をクリックします。Custom Model Deployment セクションで、Custom Deployment をクリックします。

    3. Custom Deployment ページで、以下の主要パラメーターを設定します。その他のパラメーターについては、「カスタムデプロイ」をご参照ください。

      パラメーター

      説明

      Deployment Method

      Processor-based Deployment を選択します。

      Model Settings

      TypeOSS を選択し、モデルファイルを含む OSS パスを選択します。

      Processor Type

      Custom Processor を選択します。

      Processor Language

      python を選択します。

      Processor Package

      TypeOSS を選択し、前のステップで作成したパッケージファイルの OSS パスを選択します。

      Processor Main File

      ./app.py を設定します。

    4. (任意)Service Configurations セクションで、data_image パラメーターを追加し、その値をパッケージ化時に設定したイメージパスに設定します。

      説明

      ステップ 4 で環境をイメージでパッケージ化した場合、data_image パラメーターを設定する必要があります。それ以外の場合は、このステップは不要です。

    5. Deploy をクリックします。

    EASCMD

    以下の手順は、Linux オペレーティングシステム上でサービスをデプロイする方法を示します:

    1. EASCMD クライアントのダウンロードおよび認証を行います。詳細については、「クライアントのダウンロードおよび認証」をご参照ください。

    2. クライアントファイルが配置されているディレクトリで、JSON 形式の新規ファイルを作成し、app.json という名前を付けます。以下に、EASCMD または手動で作成したパッケージ向けのファイル内容の例を示します:

      {
        "name": "pysdk_demo",
        "processor_entry": "./app.py",
        "processor_type": "python",
        "processor_path": "oss://examplebucket/exampledirectory/pysdk_demo.tar.gz",
        "model_path": "oss://examplebucket/exampledirectory/model",
        "cloud": {
              "computing": {
                  "instance_type": "ecs.c7.large"
              }
        },
        "metadata": {
          "instance": 1,
          }
      }
    3. ターミナルを開き、JSON ファイルのディレクトリに移動して、以下のコマンドを実行してサービスをデプロイします。

      $ ./eascmd64 create app.json

      以下の出力が表示された場合、サービスが正常にデプロイされたことを示します。

      [RequestId]: 1202D427-8187-4BCB-8D32-D7096E95B5CA
      +-------------------+-------------------------------------------------------------------+
      | イントラネットエンドポイント | http://182848887922****.vpc.cn-beijing.pai-eas.aliyuncs.com/api/predict/pysdk_demo |
      |             トークン | ZTBhZTY3ZjgwMmMyMTQ5OTgyMTQ5YmM0NjdiMmNiNmJkY2M5ODI0****          |
      +-------------------+-------------------------------------------------------------------+
      [OK] タスクサーバーの準備を待機中
      [OK] [oss://eas-model-beijing/195557026392****/pysdk_demo.tar.gz] からプロセッサを取得中
      [OK] イメージ [registry-vpc.cn-beijing.aliyuncs.com/eas/pysdk_demo_cn-beijing:v0.0.1-20190806082810] をビルド中
      [OK] イメージ [registry-vpc.cn-beijing.aliyuncs.com/eas/pysdk_demo_cn-beijing:v0.0.1-20190806082810] をプッシュ中
      [OK] 待機中 [合計: 1、保留中: 1、実行中: 0]
      [OK] サービスが実行中です
  2. サービスをテストします。

    1. PAI コンソール にログインします。ページ上部からリージョンを選択し、目的のワークスペースを選択して、Elastic Algorithm Service (EAS) をクリックします。

    2. 対象サービスの行で、Invocation InformationService Type 列からクリックし、パブリックエンドポイントおよび Token を取得します。

    3. ターミナルで、前のステップで取得したエンドポイントおよびトークンを使用して、以下のコマンドを実行してサービスを呼び出します。

      $ curl <service_url> -H 'Authorization: <token>' -d '10 20'

      パラメーター:

      • <service_url>:前のステップで取得したパブリックエンドポイントに置き換えます。例:http://182848887922****.vpc.cn-beijing.pai-eas.aliyuncs.com/api/predict/pysdk_demo

      • <token>:前のステップで取得したトークンに置き換えます。例:ZTBhZTY3ZjgwMmMyMTQ5OTgyMTQ5YmM0NjdiMmNiNmJkY2M5ODI0****

      • -d の後に指定する内容は、サービスの入力パラメーターです。

参考資料