すべてのプロダクト
Search
ドキュメントセンター

Platform For AI:Python を使用したカスタムプロセッサの開発

最終更新日:Feb 28, 2026

Python でカスタムプロセッサを開発し、Elastic Algorithm Service (EAS) のモデルサービスにカスタム推論ロジックを実装します。プロセッサコードをモデルファイルから分離することで、ローカルで独立して開発・テストを行い、Object Storage Service (OSS) にアップロードして柔軟にデプロイできます。このガイドでは、Python ベースのカスタムプロセッサのビルド、テスト、デプロイに関する完全なワークフローを説明します。

背景情報

以下のワークフローは、Python カスタムプロセッサの完全な開発プロセスを説明しています:

  1. ステップ 1:開発環境の構築

    Elastic Algorithm Service (EAS) の Python SDK は、複数の機械学習フレームワークをサポートし、Pandas などのデータ処理ライブラリと統合されています。このセクションでは、ローカルの Python 開発環境をセットアップし、必要なプロジェクト構造を作成する方法について説明します。

  2. ステップ 2:予測ロジックの作成

    EAS の Python SDK は、高性能な RPC フレームワークとクラスター通信用の内部インターフェイスを提供します。モデルサービスをデプロイするには、リクエストの処理方法を定義する簡単な予測ロジックインターフェイスを実装するだけです。

  3. ステップ 3:ローカルテストの実行

    予測ロジックを実装した後、デプロイ前にサービスの正確性を検証し、正常に機能することを確認するためにローカルでテストします。

  4. ステップ 4:コードファイルと Python 環境のパッケージング

    EAS のデプロイ要件に従って、コードと Python 環境をパッケージ化します。

  5. ステップ 5:パッケージとモデルファイルのアップロード

    パッケージファイルとモデルファイルを個別に OSS にアップロードします。

  6. ステップ 6:モデルサービスのデプロイとテスト

    作成したカスタムプロセッサをモデルサービスとしてデプロイします。

説明
  • プロセッサとモデルファイルの分離。 これにより、モデルのファインチューニングやサービスの再デプロイ時にプロセッサを再利用できます。予測ロジックで get_model_path() メソッドを使用してモデルのストレージパスを取得し、モデルを動的にロードします。

  • 大規模プロセッサ向けのランタイムイメージの使用。 カスタムプロセッサに多くの依存関係があり、パッケージが大きくなる場合は、代わりにランタイムイメージを使用してデプロイします。デプロイ方法の比較については、「ランタイムイメージデプロイの原則」をご参照ください。

前提条件

プロセッサ開発を開始する前に、トレーニング済みのモデルファイルを準備してください。

説明

モデルファイルとプロセッサを別々に管理すると、更新や置き換えが容易になります。これらを個別に OSS にアップロードし、サービスデプロイ時にマウントしてください。

ステップ 1:開発環境の構築

Pyenv などの Python パッケージマネージャーを使用して、開発環境をセットアップします。EAS が提供する EASCMD クライアントを使用すると、Python SDK の初期化プロセスが簡素化されます。ツールをダウンロードした後、1 つのコマンドを実行するだけで SDK 環境を初期化し、必要なファイルテンプレートを生成できます。このツールは Linux システムでのみ利用可能です。例:

# EASCMD をインストールして初期化します。この例では、Linux 用の EASCMD ツールのインストール方法を示します。
$ wget https://eas-data.oss-cn-shanghai.aliyuncs.com/tools/eascmd/v2/eascmd64
# ダウンロードが完了したら、アクセス権限を変更し、Alibaba Cloud の AccessKey 情報を設定できます。
$ chmod +x eascmd64
$ ./eascmd64 config -i <access_id> -k <access_key>

# 環境を初期化します。
$ ./eascmd64 pysdk init ./pysdk_demo

プロンプトで Python のバージョン (デフォルトは 3.6) を入力すると、システムは ./pysdk_demo ディレクトリに Python 環境用の ENV ディレクトリ、app.py コードテンプレート、および app.json デプロイ設定を自動的に作成します。

ステップ 2:予測ロジックの作成

ENV ディレクトリと同じディレクトリに、メインの予測サービスファイル app.py を作成します。コード例:

説明

EASCMD クライアントを使用して Python 環境を初期化すると、システムは app.py ファイルを自動生成します。ユースケースに応じて、このテンプレートを必要に応じて変更してください。

# -*- coding: utf-8 -*-
import allspark


class MyProcessor(allspark.BaseProcessor):
    """ MyProcessor は一例です
        このようにメッセージを送信して予測できます
        curl -v http://127.0.0.1:8080/api/predict/service_name -d '2 105'
    """
    def initialize(self):
        """ モジュールのロード、サービスの開始時に一度だけ実行されます
             この関数でサービスの初期化とモデルのロードを行います。
        """
        self.module = {'w0': 100, 'w1': 2}
        # model_dir = self.get_model_path().decode()
        # load_model 関数はご自身で実装する必要があります。model.pt モデルファイルをロードするには、torch.load(model_dir + "/model.pt") のように実装できます。
        # self.model = load_model(model_dir)

    def pre_process(self, data):
        """ データ形式の前処理
        """
        x, y = data.split(b' ')
        return int(x), int(y)

    def post_process(self, data):
        """ 処理後の後処理
        """
        return bytes(data, encoding='utf8')

    def process(self, data):
        """ リクエストデータを処理します
        """
        x, y = self.pre_process(data)
        w0 = self.module['w0']
        w1 = self.module['w1']
        y1 = w1 * x + w0
        if y1 >= y:
            return self.post_process("True"), 200
        else:
            return self.post_process("False"), 400


if __name__ == '__main__':
    # allspark.default_properties().put('rpc.keepalive', '10000')
    # サービスのコンピューティングタイムアウトを 10 秒に設定します。デフォルトは 5 秒です。
    # パラメーター worker_threads は処理の同時実行数を示します
    runner = MyProcessor(worker_threads=10)
    runner.run()

上記のコードは、Python SDK の簡単な実装例です。EAS が提供する BaseProcessor ベースクラスを継承し、initialize() メソッドと process() メソッドを実装しています。

メソッド

説明

使用方法

initialize()

サービス開始時にプロセッサを初期化します。このメソッドを使用して、モデルのロードなどの一度きりのセットアップタスクを実行します。

次のコードを initialize() メソッドに追加して、モデルファイルの取得をプロセッサの実装から分離します:

model_dir = self.get_model_path().decode()
self.model = load_model(model_dir)
  • get_model_path() メソッドは、モデルのストレージディレクトリを bytes オブジェクトとして返します。これは、アップロードしたモデルファイルがサービスインスタンス内に存在するディレクトリです。

  • モデルファイルをロードするために、カスタムの load_model() メソッドを実装します。たとえば、PyTorch を使用して `model.pt` ファイルをロードする場合: torch.load(model_dir + "/model.pt")

get_model_path()

モデルのストレージディレクトリを bytes オブジェクトとして返します。

サービスデプロイの JSON 設定で `model_path` パラメーターを指定してモデルファイルをアップロードする場合、get_model_path() を呼び出して、サービスインスタンス内のモデルファイルのストレージディレクトリを取得してロードします。

process(data)

各受信リクエストを処理します。リクエストボディは、処理のために process() メソッドにパラメーターとして渡されます。メソッドの戻り値はクライアントに返送されます。

data パラメーターはリクエストボディを `BYTES` 型として受け取ります。このメソッドは、response_data (BYTES) と status_code (INT) の 2 つの値を返します。リクエストが成功した場合、status_code0 または 200 である必要があります。

_init_(worker_threads=5, worker_processes=1,endpoint=None)

プロセッサのコンストラクターです。

  • worker_threads:ワーカースレッドの数。デフォルト値は 5 です。

  • worker_processes:プロセスの数。デフォルト値は 1 です。worker_processes が 1 の場合、サービスはシングルプロセス・マルチスレッドモードで実行されます。worker_processes が 1 より大きい場合、worker_threads はデータの読み取りのみを行い、リクエストは複数のプロセスによって同時に処理されます。各プロセスは initialize() メソッドを実行します。

  • endpoint:サービスがリッスンするエンドポイント。このパラメーターを使用して、サービスがリッスンするアドレスとポートを指定します。例: endpoint='0.0.0.0:8079'

    説明

    ポート 8080 または 9090 は EAS のシステムリスニングポートであるため、使用しないでください。

run()

サービスを開始します。

パラメーターはありません。

ステップ 3:サービスのローカルテスト

  1. ターミナルで `app.py` ファイルを含むディレクトリに移動し、次のコマンドを実行して Python アプリケーションを開始します:

    ./ENV/bin/python app.py

    次の出力は、アプリケーションが正常に起動したことを示します:

    [INFO] waiting for service initialization to complete...
    [INFO] service initialization complete
    [INFO] create service
    [INFO] rpc binds to predefined port 8080
    [INFO] install builtin handler call to /api/builtin/call
    [INFO] install builtin handler eastool to /api/builtin/eastool
    [INFO] install builtin handler monitor to /api/builtin/monitor
    [INFO] install builtin handler ping to /api/builtin/ping
    [INFO] install builtin handler prop to /api/builtin/prop
    [INFO] install builtin handler realtime_metrics to /api/builtin/realtime_metrics
    [INFO] install builtin handler tell to /api/builtin/tell
    [INFO] install builtin handler term to /api/builtin/term
    [INFO] Service start successfully
  2. 新しいターミナルを開き、次のコマンドを実行してサービスをテストします:

    ステップ 2 のサンプルコードに基づいて、2 つのデータ値をサービスに送信します。応答は実装されたロジックと一致します:

    curl http://127.0.0.1:8080/test  -d '10 20'

ステップ 4:コードファイルと Python 環境のパッケージング

EASCMD はワンクリックのパッケージングコマンドを提供します。プロセッサ開発に EASCMD を使用しなかった場合でも、手動でパッケージングできます。以下に両方の方法を説明します。

  • EASCMD パッケージングコマンドの使用 (Linux のみ):

    $ ./eascmd64 pysdk pack ./pysdk_demo

    コマンドを実行すると、次の出力が表示されます:

    [PYSDK] Creating package: /home/xi****.lwp/code/test/pysdk_demo.tar.gz
  • 手動パッケージング

    パッケージング要件

    詳細

    パッケージ形式の要件

    パッケージは .zip または .tar.gz 圧縮形式である必要があります。

    パッケージ内容の要件

    • パッケージのルートディレクトリは /ENV ディレクトリである必要があり、app.py ファイルを含んでいる必要があります。

    • パッケージ例: .tar.gz パッケージ例

ステップ 5:プロセッサパッケージとコードファイルのアップロード

パッケージング後、プロセッサパッケージ (.zip または .tar.gz) とモデルファイルの両方を OSS にアップロードします。これらのファイルはサービスデプロイ時にマウントできます。アップロード手順については、「ossutil コマンドリファレンス」をご参照ください。

ステップ 6:サービスのデプロイとテスト

コンソールまたは EASCMD クライアントのいずれかを使用してモデルサービスをデプロイします。

  1. サービスのデプロイ

    コンソール経由でのサービスのデプロイ

    1. PAI コンソールにログインします。ページ上部でリージョンを選択します。次に、目的のワークスペースを選択し、[Elastic Algorithm Service (EAS)] をクリックします。

    2. [サービスをデプロイ] をクリックします。[カスタムモデルのデプロイ] セクションで、[カスタムデプロイ] をクリックします。

    3. [カスタムデプロイ] ページで、次の主要なパラメーターを設定します。パラメーター設定の詳細については、「カスタムデプロイ」をご参照ください。

      パラメーター

      説明

      [デプロイ方法]

      [プロセッサベースのデプロイ] を選択します。

      [モデル設定]

      [設定タイプ][Object Storage Service (OSS)] に設定し、モデルファイルが配置されている OSS パスを選択します。

      プロセッサータイプ

      [カスタムプロセッサ] を選択します。

      プロセッサ言語

      [python] を選択します。

      プロセッサ パッケージ

      [設定タイプ][Object Storage Service (OSS)] に設定し、前のステップでパッケージ化したファイルの OSS パスを選択します。

      [プロセッサのメインファイル]

      ./app.py に設定します。

    4. (オプション) [サービス設定] セクションで、data_image パラメーターを追加します。パラメーターをステップ 4 のパッケージング時に設定したイメージパスに設定します。

      説明

      ステップ 4 でランタイムイメージを使用して環境をアップロードした場合にのみ、data_image パラメーターを設定してください。

    5. [デプロイ] をクリックします。

    EASCMD を使用したサービスのデプロイ

    以下の手順では、Linux オペレーティングシステムを例として、サービスをデプロイする方法について説明します:

    1. EASCMD クライアントをダウンロードし、身分認証を実行します。詳細については、「クライアントのダウンロードと認証」をご参照ください。

    2. クライアントファイルが配置されているディレクトリで、JSON 形式のファイルを作成し、app.json という名前を付けます。次のコードは、EASCMD または手動でパッケージ化された環境のファイル内容の例です:

      {
        "name": "pysdk_demo",
        "processor_entry": "./app.py",
        "processor_type": "python",
        "processor_path": "oss://examplebucket/exampledirectory/pysdk_demo.tar.gz",
        "model_path": "oss://examplebucket/exampledirectory/model",
        "cloud": {
              "computing": {
                  "instance_type": "ecs.c7.large"
              }
        },
        "metadata": {
          "instance": 1,
          }
      }
    3. ターミナルを開きます。JSON ファイルが配置されているディレクトリで、次のコマンドを実行してサービスをデプロイします。

      $ ./eascmd64 create app.json

      次の出力は、サービスが正常にデプロイされたことを示します。

      [RequestId]: 1202D427-8187-4BCB-8D32-D7096E95B5CA
      +-------------------+-------------------------------------------------------------------+
      | Intranet Endpoint | http://182848887922****.vpc.cn-beijing.pai-eas.aliyuncs.com/api/predict/pysdk_demo |
      |             Token | ZTBhZTY3ZjgwMmMyMTQ5OTgyMTQ5YmM0NjdiMmNiNmJkY2M5ODI0****          |
      +-------------------+-------------------------------------------------------------------+
      [OK] Waiting task server to be ready
      [OK] Fetching processor from [oss://eas-model-beijing/195557026392****/pysdk_demo.tar.gz]
      [OK] Building image [registry-vpc.cn-beijing.aliyuncs.com/eas/pysdk_demo_cn-beijing:v0.0.1-20190806082810]
      [OK] Pushing image [registry-vpc.cn-beijing.aliyuncs.com/eas/pysdk_demo_cn-beijing:v0.0.1-20190806082810]
      [OK] Waiting [Total: 1, Pending: 1, Running: 0]
      [OK] Service is running
  2. サービスのテスト

    1. PAI コンソールにログインします。ページ上部でリージョンを選択します。次に、目的のワークスペースを選択し、[Elastic Algorithm Service (EAS)] をクリックします。

    2. 対象サービスの [サービスメソッド] 列で、[呼び出し情報] をクリックして、呼び出し用のパブリックエンドポイントと [トークン] を取得します。

    3. ターミナルで、前のステップで取得した情報に基づいて呼び出しを行います。

      $ curl <service_url> -H 'Authorization: <token>' -d '10 20'

      場所:

      • <service_url>:前のステップで取得したパブリックエンドポイントに置き換えます。例: http://182848887922****.vpc.cn-beijing.pai-eas.aliyuncs.com/api/predict/pysdk_demo

      • <token>:前のステップで取得したトークンに置き換えます。例: ZTBhZTY3ZjgwMmMyMTQ5OTgyMTQ5YmM0NjdiMmNiNmJkY2M5ODI0****

      • -d の後の内容は、サービス呼び出しの入力パラメーターです。

参考文献