すべてのプロダクト
Search
ドキュメントセンター

Platform For AI:Python を使用したカスタムプロセッサの開発

最終更新日:Oct 30, 2025

Python を使用してカスタムプロセッサを開発し、オンプレミスマシンでモデルファイルと共にプロセッサをデバッグできます。デバッグが完了したら、プロセッサパッケージとモデルファイルを Object Storage Service (OSS) にアップロードし、モデルをサービスとしてデプロイするときにファイルをマウントできます。このトピックでは、Python を使用してカスタムプロセッサを開発する方法について説明します。

背景情報

Python を使用してカスタムプロセッサを開発するには、次のステップを実行します:

  1. ステップ 1: Python 環境の作成

    Elastic Algorithm Service (EAS) SDK for Python は、複数の機械学習フレームワークをサポートし、Pandas などのさまざまなデータ分析および操作フレームワークと統合できます。このトピックでは、カスタムプロセッサを開発するためのオンプレミス Python 環境を作成およびアップロードする 2 つの方法について説明します。

  2. ステップ 2: 予測ロジックの追加

    EAS SDK for Python は、パフォーマンス専有型のリモートプロシージャコール (RPC) フレームワークを採用しており、EAS クラスター間の相互作用を容易にする API を含んでいます。モデルを EAS にデプロイするには、予測ロジックでいくつかの関数を実装するだけで済みます。

  3. ステップ 3: オンプレミステストの実行

    オンプレミステストを実行して予測ロジックを検証し、デプロイ後にサービスが期待どおりに動作することを確認します。

  4. ステップ 4: Python コードと環境のパッケージ化

    Python コードと環境を必要なフォーマットでパッケージ化します。

  5. ステップ 5: パッケージとモデルファイルのアップロード

    パッケージとモデルファイルを OSS にアップロードします。

  6. ステップ 6: モデルサービスのデプロイとテスト

    カスタムプロセッサを使用してモデルサービスをデプロイします。

説明
  • モデルファイルとプロセッサパッケージを分離することを推奨します。これにより、モデルが変更された場合に、将来のモデルデプロイメントでプロセッサを再利用できます。get_model_path() メソッドを呼び出して、モデルファイルのストレージパスを取得できます。このパスは、予測ロジックでモデルをロードするために使用されます。

  • カスタムプロセッサに多数の依存関係があり、パッケージが大きい場合は、イメージを使用してモデルをデプロイすることを推奨します。2 つのデプロイメント方法の詳細については、「EAS の概要」トピックの「デプロイメント方法」セクションをご参照ください。

前提条件

モデルファイルが準備されています。

説明

管理を容易にするために、モデルファイルとカスタムプロセッサを分離することを推奨します。開発が完了したら、モデルファイルとプロセッサパッケージを OSS にアップロードし、モデルをデプロイするときにファイルをマウントします。

ステップ 1: Python 環境の作成

pyenv などのパッケージ管理ツールを使用して Python 環境を作成できます。EAS が提供する EASCMD クライアントは、EAS SDK for Python の初期化プロセスをカプセル化します。ツールをダウンロードした後、コマンドを実行するだけで Python 環境を初期化し、関連するファイルテンプレートを生成できます。このクライアントは Linux オペレーティングシステムに適しています。コマンド例:

# EASCMD をインストールし、EAS SDK for Python を初期化します。
$ wget https://eas-data.oss-cn-shanghai.aliyuncs.com/tools/eascmd/v2/eascmd64
# EASCMD をインストールした後、AccessKey ペアを設定してアクセス権限を変更します。
$ chmod +x eascmd64
$ ./eascmd64 config -i <access_id> -k <access_key>

# 環境を初期化します。
$ ./eascmd64 pysdk init ./pysdk_demo

コマンド出力に使用したい Python のバージョンを入力します。デフォルトのバージョンは 3.6 です。バージョンを選択すると、./pysdk_demo ディレクトリ内に次のディレクトリとファイルが自動的に作成されます: ENV という名前のディレクトリ (Python 環境変数を格納)、app.py ファイル (予測ロジックのテンプレートを含む)、および app.json ファイル (サービスデプロイメントのテンプレートを含む)。

ステップ 2: 予測ロジックの追加

予測ロジックを追加するには、ENV ディレクトリを含むディレクトリに app.py という名前のファイルを作成します。サンプルコード:

説明
  • EASCMD クライアントを使用して Python 環境を作成する場合、app.py ファイルは自動的に作成されます。ビジネス要件に基づいてファイルを変更できます。

  • ビルド済みイメージを使用して Python 環境を作成する場合、app.py ファイルは自動的に作成されます。ビジネス要件に基づいてファイルを変更できます。

# -*- coding: utf-8 -*-
import allspark


class MyProcessor(allspark.BaseProcessor):
    """ MyProcessor は一例です
        このようにメッセージを送信して予測できます
        curl -v http://127.0.0.1:8080/api/predict/service_name -d '2 105'
    """
    def initialize(self):
        """ モジュールをロードし、サービスの開始時に一度実行されます
             この関数でサービスの初期化とモデルのロードを行います。
        """
        self.module = {'w0': 100, 'w1': 2}
        # model_dir = self.get_model_path().decode()
        # load_model 関数を定義します。model.pt モデルファイルをロードしたい場合、関数を torch.load(model_dir + "/model.pt") として実装できます。
        # self.model = load_model(model_dir)

    def pre_process(self, data):
        """ データ形式の前処理
        """
        x, y = data.split(b' ')
        return int(x), int(y)

    def post_process(self, data):
        """ 処理後の処理
        """
        return bytes(data, encoding='utf8')

    def process(self, data):
        """ リクエストデータを処理します
        """
        x, y = self.pre_process(data)
        w0 = self.module['w0']
        w1 = self.module['w1']
        y1 = w1 * x + w0
        if y1 >= y:
            return self.post_process("True"), 200
        else:
            return self.post_process("False"), 400


if __name__ == '__main__':
    # allspark.default_properties().put('rpc.keepalive', '10000')
    # タイムアウト時間を 10 秒に設定します。デフォルトでは、時間は 5 秒です。
    # パラメーター worker_threads は処理の同時実行性を示します
    runner = MyProcessor(worker_threads=10)
    runner.run()

上記のサンプルコードは、EAS SDK for Python の使用方法の例を示しています。サンプルコードは、BaseProcessor 基本クラスを継承するクラスを作成し、initialize() および process() 関数を実装します。次の表に、関連する関数について説明します。

関数

説明

備考

initialize()

プロセッサを初期化します。この関数は、サービス起動時にモデルをロードするために呼び出されます。

initialize() 関数に次のコードを追加して、モデルファイルのロードをプロセッサの実装から分離できます。

model_dir = self.get_model_path().decode()
                                    self.model = load_model(model_dir)
  • get_model_path() メソッドは、サービスインスタンス上のモデルファイルのストレージパスを取得するために使用されます。パスは bytes オブジェクトとして返されます。

  • load_model() 関数は、サービスデプロイメントのためにモデルファイルをロードするために使用されます。model.pt モデルファイルをロードしたい場合、関数を torch.load(model_dir + "/model.pt") として実装できます。

get_model_path()

モデルファイルのストレージパスを取得します。パスは bytes オブジェクトとして返されます。

JSON ファイルの model_path パラメーターを指定してモデルファイルをアップロードした場合、get_model_path() メソッドを呼び出して、サービスインスタンス上のモデルファイルのストレージパスを取得できます。

process(data)

リクエストを処理します。この関数は、リクエスト本文を引数として受け入れ、クライアントに応答を返します。

data 入力パラメーターはリクエスト本文を指定します。パラメーターは BYTES データの型です。response_data 出力パラメーターは BYTES データの型で、status_code 出力パラメーターは INT データの型です。成功応答では、status_code の戻り値は 0 または 200 です。

_init_(worker_threads=5, worker_processes=1,endpoint=None)

プロセッサのコンストラクターです。

  • worker_threads: ワーカースレッドの数。デフォルト値: 5。

  • worker_processes: プロセスの数。デフォルト値: 1。worker_processes の値を 1 に設定すると、シングルプロセス・マルチスレッドモードが使用されます。worker_processes を 1 より大きい値に設定すると、複数のプロセスが同時にリクエストを処理し、すべてのスレッドはリクエストデータのみを読み取ります。各 プロセスinitialize() 関数を呼び出します。

  • endpoint: サービスがリッスンするエンドポイント。サービスがリッスンする IP アドレスとポート番号を指定できます。例: endpoint='0.0.0.0:8079'

    説明

    EAS はこれらのポートをリッスンするため、ポート 8080 とポート 9090 は使用しないでください。

run()

サービスを開始します。

N/A

ステップ 3: オンプレミステストの実行

  1. ターミナルウィンドウを開き、app.py ファイルを含むディレクトリで次のコマンドを実行して Python プロジェクトを起動します:

    ./ENV/bin/python app.py

    次の出力は、プロジェクトが起動したことを示します:

    [INFO] waiting for service initialization to complete...
    [INFO] service initialization complete
    [INFO] create service
    [INFO] rpc binds to predefined port 8080
    [INFO] install builtin handler call to /api/builtin/call
    [INFO] install builtin handler eastool to /api/builtin/eastool
    [INFO] install builtin handler monitor to /api/builtin/monitor
    [INFO] install builtin handler ping to /api/builtin/ping
    [INFO] install builtin handler prop to /api/builtin/prop
    [INFO] install builtin handler realtime_metrics to /api/builtin/realtime_metrics
    [INFO] install builtin handler tell to /api/builtin/tell
    [INFO] install builtin handler term to /api/builtin/term
    [INFO] Service start successfully
  2. 新しいターミナルウィンドウを開き、次のコマンドを実行して 2 つのリクエストを送信します。

    このトピックの「ステップ 2: 予測ロジックの追加」セクションのサンプルコードに基づいて応答を確認します。

    curl http://127.0.0.1:8080/test -d '10 20'

ステップ 4: Python コードと環境のパッケージ化

EASCMD クライアントは、Python コードを迅速にパッケージ化するためのコマンドを提供します。EASCMD クライアントを使用してカスタムプロセッサを開発しない場合は、手動で完全な環境をパッケージ化できます。次のいずれかの方法を使用して、Python コードと環境をパッケージ化できます:

  • EASCMD クライアントが提供する pack コマンドを実行します (Linux のみ)。

    $ ./eascmd64 pysdk pack ./pysdk_demo

    次の出力は、コマンドが実行されたことを示します:

    [PYSDK] Creating package: /home/xi****.lwp/code/test/pysdk_demo.tar.gz
  • EASCMD クライアントを使用してプロセッサを開発しなかった場合は、手動で環境をパッケージ化します。

    要件

    説明

    フォーマット

    パッケージは .zip または .tar.gz 形式で圧縮する必要があります。

    内容

    • パッケージのルートディレクトリは /ENV である必要があり、パッケージには app.py ファイルが含まれている必要があります。

    • 例: .tar.gz パッケージ

ステップ 5: パッケージとモデルファイルのアップロード

Python コードと環境をパッケージ化した後、パッケージ (.zip または .tar.gz 形式) とモデルファイルを OSS にアップロードします。サービスをデプロイするときにファイルをマウントできます。OSS にファイルをアップロードする方法の詳細については、「ossutil コマンドリファレンス」をご参照ください。

ステップ 6: モデルサービスのデプロイとテスト

PAI コンソールまたは EASCMD クライアントを使用してモデルサービスをデプロイできます。

  1. サービスをデプロイします。

    PAI コンソールの使用

    1. PAI コンソールにログインします。リージョンとワークスペースを選択し、[Elastic Algorithm Service (EAS)] をクリックします。

    2. [サービスのデプロイ] をクリックし、[カスタムモデルのデプロイ] セクションで [カスタムデプロイ] を選択します。

    3. 次の主要なパラメーターを設定します。その他のパラメーターの詳細については、「カスタムデプロイメント」をご参照ください。

      パラメーター

      説明

      デプロイ方法

      [プロセッサベースのデプロイメント] を選択します。

      モデル設定

      [タイプ][OSS] を選択し、モデルファイルが格納されている OSS パスを指定します。

      プロセッサタイプ

      [カスタムプロセッサ] を選択します。

      プロセッサ言語

      [python] を選択します。

      プロセッサパッケージ

      [タイプ][OSS] を選択し、パッケージファイルが格納されている OSS パスを指定します。

      プロセッサメインファイル

      値を ./app.py に設定します。

    4. (オプション) [サービス設定の編集] セクションで data_image パラメーターを追加します。値は、ファイルをパッケージ化したときに指定したイメージパスに設定します。

      説明

      data_image パラメーターは、「ステップ 4: Python コードと環境のパッケージ化」でイメージを使用して開発環境をアップロードした場合にのみ設定します。

    5. [デプロイ] をクリックします。

    EASCMD の使用

    次のセクションでは、Linux オペレーティングシステムを例として使用します。

    1. EASCMD クライアントをダウンロードし、身分認証を実行します。詳細については、「EASCMD クライアントをダウンロードして身分認証を完了する」をご参照ください。

    2. EASCMD クライアントが格納されているディレクトリに app.json という名前の JSON ファイルを作成します。プロセッサが手動または EASCMD クライアントを使用してパッケージ化されている場合のサンプルファイル:

      {
        "name": "pysdk_demo",
        "processor_entry": "./app.py",
        "processor_type": "python",
        "processor_path": "oss://examplebucket/exampledirectory/pysdk_demo.tar.gz",
        "model_path": "oss://examplebucket/exampledirectory/model",
        "cloud": {
              "computing": {
                  "instance_type": "ecs.c7.large"
              }
        },
        "metadata": {
          "instance": 1,
          }
      }
    3. ターミナルウィンドウを開き、JSON ファイルが格納されているディレクトリで次のコマンドを実行してサービスをデプロイします:

      $ ./eascmd64 create app.json

      次の出力は、サービスがデプロイされたことを示します:

      [RequestId]: 1202D427-8187-4BCB-8D32-D7096E95B5CA
      +-------------------+-------------------------------------------------------------------+
      | Intranet Endpoint | http://182848887922****.vpc.cn-beijing.pai-eas.aliyuncs.com/api/predict/pysdk_demo |
      |             Token | ZTBhZTY3ZjgwMmMyMTQ5OTgyMTQ5YmM0NjdiMmNiNmJkY2M5ODI0****          |
      +-------------------+-------------------------------------------------------------------+
      [OK] Waiting task server to be ready
      [OK] Fetching processor from [oss://eas-model-beijing/195557026392****/pysdk_demo.tar.gz]
      [OK] Building image [registry-vpc.cn-beijing.aliyuncs.com/eas/pysdk_demo_cn-beijing:v0.0.1-20190806082810]
      [OK] Pushing image [registry-vpc.cn-beijing.aliyuncs.com/eas/pysdk_demo_cn-beijing:v0.0.1-20190806082810]
      [OK] Waiting [Total: 1, Pending: 1, Running: 0]
      [OK] Service is running
  2. サービスをテストします。

    1. PAI コンソールにログインします。リージョンとワークスペースを選択し、[Elastic Algorithm Service (EAS)] をクリックします。

    2. テストするサービスを見つけ、[サービスタイプ] 列の [呼び出し方法] をクリックして、パブリックエンドポイントと [トークン] を取得します。

    3. ターミナルウィンドウで次のコマンドを実行してサービスを呼び出します:

      $ curl <service_url> -H 'Authorization: <token>' -d '10 20'

      次のパラメーターを変更します:

      • <service_url> をステップ b で取得したパブリックエンドポイントに置き換えます。例: http://182848887922****.vpc.cn-beijing.pai-eas.aliyuncs.com/api/predict/pysdk_demo

      • <token> をステップ b で取得したトークンに置き換えます。例: ZTBhZTY3ZjgwMmMyMTQ5OTgyMTQ5YmM0NjdiMmNiNmJkY2M5ODI0****

      • -d オプションは、サービスの入力パラメーターを指定します。

リファレンス