すべてのプロダクト
Search
ドキュメントセンター

Platform For AI:PyAlink Script

最終更新日:Mar 14, 2026

PyAlink スクリプトは、コードを通じて Alink アルゴリズムを呼び出し、分類、回帰、レコメンデーションタスクを実行します。このコンポーネントは、他の Designer コンポーネントと統合して、ビジネスワークフローを構築および検証します。

概要

PyAlink スクリプトは、スタンドアロンモード、または他の Designer コンポーネントと組み合わせたモードの 2 つのモードで動作します。このコンポーネントは、数百の Alink アルゴリズムをサポートし、コードを通じてさまざまなデータ型を読み書きします。詳細は、「データの読み書きタイプ」をご参照ください。PyAlink スクリプトによって生成された PipelineModel モデルは、Elastic Algorithm Service (EAS) サービスとしてデプロイできます。詳細は、「モデルを EAS サービスとしてデプロイ」をご参照ください。

用語

PyAlink スクリプトを使用する前に、これらの用語を理解しておいてください。

機能モジュール

説明

オペレーター

Alink の各アルゴリズム機能はオペレーターです。オペレーターはバッチタイプとストリーミングタイプに分かれています。例えば、ロジスティック回帰には以下が含まれます。

  • LogisticRegressionTrainBatchOp:ロジスティック回帰モデルをトレーニングします。

  • LogisticRegressionPredictBatchOp:バッチ予測を実行します。

  • LogisticRegressionPredictStreamhOp:ストリーム予測を実行します。

オペレーターは Link または LinkFrom を使用して接続します。

# データを定義します。
data = CsvSourceBatchOp()
# ロジスティック回帰モデルをトレーニングします。
lrTrain = LogisticRegressionTrainBatchOp()
# ロジスティック回帰モデルを使用して予測します。
LrPredict = LogisticRegressionPredictBatchOp()
# モデルをトレーニングします。
data.link(lrTrain)
# 予測を実行します。
LrPredict.linkFrom(lrTrain, data)

各オペレーターには設定可能なパラメーターがあります。ロジスティック回帰のパラメーターには以下が含まれます。

  • labelCol:ターゲット列名 (必須)。型:String。

  • featureCols:特徴量列名。型:String[]。デフォルト:NULL (すべての列)。

パラメーターは `set` の後にパラメーター名を続けて設定します。

lr = LogisticRegressionTrainBatchOp()\
            .setFeatureCols(colnames)\
            .setLabelCol("label")

データのインポート (ソース) とエクスポート (シンク) は、特殊なオペレータータイプです。これらは Link または LinkFrom を使用してアルゴリズムコンポーネントに接続します。

image

Alink には、一般的なストリーミングおよびバッチデータソースが含まれています。例:

df_data = pd.DataFrame([
    [2, 1, 1],
    [3, 2, 1],
    [4, 3, 2],
    [2, 4, 1],
    [2, 2, 1],
    [4, 3, 2],
    [1, 2, 1],
    [5, 3, 2]
])
input = BatchOperator.fromDataframe(df_data, schemaStr='f0 int, f1 int, label int')
# データをロードします
dataTest = input
colnames = ["f0","f1"]
lr = LogisticRegressionTrainBatchOp().setFeatureCols(colnames).setLabelCol("label")
model = input.link(lr)
predictor = LogisticRegressionPredictBatchOp().setPredictionCol("pred")
predictor.linkFrom(model, dataTest).print()

パイプライン

パイプラインは、データ処理、特徴量エンジニアリング、モデルトレーニングを組み合わせて、トレーニング、予測、オンラインサービスを提供します。

quantileDiscretizer = QuantileDiscretizer()\
            .setNumBuckets(2)\
            .setSelectedCols("sepal_length")

binarizer = Binarizer()\
            .setSelectedCol("petal_width")\
            .setOutputCol("bina")\
            .setReservedCols("sepal_length", "petal_width", "petal_length", "category")\
            .setThreshold(1.);

lda = Lda()\
            .setPredictionCol("lda_pred")\
            .setPredictionDetailCol("lda_pred_detail")\
            .setSelectedCol("category")\
            .setTopicNum(2)\
            .setRandomSeed(0)

pipeline = Pipeline()\
    .add(binarizer)\
    .add(binarizer)\
    .add(lda)

pipeline.fit(data1)
pipeline.transform(data2)

ベクター

Alink は 2 種類のカスタムベクターデータ形式をサポートしています。

  • SparseVector

    形式:$4$1:0.1 2:0.2。ドル記号 ($) の間の数字はベクターの長さを表します。ドル記号の後の値は、列のインデックスと対応する値を示します。

  • DenseVector

    形式:0.1 0.2 0.3。スペースで区切られた値。

説明

ベクター型の列では、パラメーター名 `vectorColName` を使用します。

サポートされるコンポーネント

PyAlink スクリプトは、データ処理、特徴量エンジニアリング、モデルトレーニングのための数百の Alink コンポーネントをサポートしています。

説明

PyAlink スクリプトは、パイプラインコンポーネントとバッチコンポーネントのみをサポートします。ストリームコンポーネントはサポートされていません。

スタンドアロンでの使用

この例では、Designer 上の PyAlink スクリプトを使用して、ItemCf モデルで MovieLens データセットをスコアリングします。

  1. Designer を開き、空のパイプラインを作成します。詳細は、「操作手順」をご参照ください。

  2. ワークフローリストでパイプラインを見つけ、[ワークフローに入る] をクリックします。

  3. コンポーネントリストから [PyAlink スクリプト] をキャンバスにドラッグして、PyAlink スクリプト-1 を作成します。

    image

  4. PyAlink スクリプト-1 を選択します。[パラメーター設定] タブと [実行チューニング] タブでパラメーターを設定します。

    • [パラメーター設定] タブで、コードを入力します。

      from pyalink.alink import *
      
      def main(sources, sinks, parameter):
          PATH = "http://alink-test.oss-cn-beijing.aliyuncs.com/yuhe/movielens/"
          RATING_FILE = "ratings.csv"
          PREDICT_FILE = "predict.csv"
          RATING_SCHEMA_STRING = "user_id long, item_id long, rating int, ts long"
      
          ratingsData = CsvSourceBatchOp() \
                  .setFilePath(PATH + RATING_FILE) \
                  .setFieldDelimiter("\t") \
                  .setSchemaStr(RATING_SCHEMA_STRING)
      
          predictData = CsvSourceBatchOp() \
                  .setFilePath(PATH + PREDICT_FILE) \
                  .setFieldDelimiter("\t") \
                  .setSchemaStr(RATING_SCHEMA_STRING)
      
          itemCFModel = ItemCfTrainBatchOp() \
                  .setUserCol("user_id").setItemCol("item_id") \
                  .setRateCol("rating").linkFrom(ratingsData);
      
          itemCF = ItemCfRateRecommender() \
                  .setModelData(itemCFModel) \
                  .setItemCol("item_id") \
                  .setUserCol("user_id") \
                  .setReservedCols(["user_id", "item_id"]) \
                  .setRecommCol("prediction_score")
      
          result = itemCF.transform(predictData)
      
          result.link(sinks[0])
          BatchOperator.execute()

      PyAlink スクリプトには 4 つの出力ポートがあります。コード result.link(sinks[0]) は、出力データを最初のポートに書き込みます。ダウンストリームコンポーネントは、このポートに接続してデータを読み取ります。詳細は、「データの読み書きタイプ」をご参照ください。

    • [実行チューニング] タブで、実行モードとノードスペックを設定します。

      パラメーター

      説明

      ジョブ実行モードの選択

      実行モード:

      • DLC (単一ノード複数同時実行):テストおよび検証中の小規模データセット向け。

      • MaxCompute (分散):大規模データセットまたは本番タスク向け。

      • フルマネージド Flink (分散):ワークスペースにアタッチされた Flink クラスターリソースを分散実行に使用します。

      ワーカー数

      [ジョブ実行モードの選択][MaxCompute (分散)] または [フルマネージド Flink (分散)] の場合に必須です。ワーカー数を指定します。デフォルト:Empty (システムがタスクデータに基づいて割り当てます)。

      ワーカーあたりのメモリ (MB)

      [ジョブ実行モードの選択][MaxCompute (分散)] または [フルマネージド Flink (分散)] の場合に必須です。ワーカーのメモリを MB 単位で指定します (正の整数)。デフォルト:8192。

      ノードあたりの CPU コア数

      [ジョブ実行モードの選択][MaxCompute (分散)] または [フルマネージド Flink (分散)] の場合に必須です。ワーカーあたりの CPU コア数を指定します (正の整数)。デフォルト:Empty。

      スクリプトのノードスペックを選択

      DLC ノードのリソースタイプ。デフォルト:2 vCPU + 8 GB Mem-ecs.g6.large。

  5. [保存] をクリックし、実行ボタン image をクリックしてスクリプトを実行します。

  6. タスクが完了したら、PyAlink スクリプト-1 を右クリックします。[データの表示] > [出力 0] を選択して結果を表示します。

    列名

    説明

    user_id

    ユーザー ID。

    item_id

    映画 ID。

    prediction_score

    ユーザーの映画に対するプリファレンススコア。映画のレコメンデーションに使用されます。

Designer コンポーネントとの組み合わせ

PyAlink スクリプトの入力ポートと出力ポートは、他の Designer コンポーネントと互換性があり、シームレスに統合できます。组合使用

読み取りと書き込みのデータの型

  • データの読み取り

    • アップストリームコンポーネントの入力ポートから MaxCompute テーブルを読み取ります。

      train_data = sources[0]
      test_data = sources[1]

      `sources[0]` は最初の入力ポートの MaxCompute テーブルを表し、`sources[1]` は 2 番目の入力ポートの MaxCompute テーブルを表します。最大 4 つの入力ポートをサポートします。

    • Alink ソースコンポーネント (CsvSourceBatchOp、AkSourceBatchOp) を使用して、ネットワークファイルシステムから読み取ります。サポートされているファイルタイプ:

      • HTTP ネットワーク共有ファイル:

        ratingsData = CsvSourceBatchOp() \
                    .setFilePath(PATH + RATING_FILE) \
                    .setFieldDelimiter("\t") \
                    .setSchemaStr(RATING_SCHEMA_STRING)
      • OSS ネットワークファイル。図のように読み取りパスを設定します。image

        model_data = AkSourceBatchOp().setFilePath("oss://xxxxxxxx/model_20220323.ak")
  • データの書き込み

    • MaxCompute テーブルに書き込み、出力ポートを介してダウンストリームコンポーネントに渡します。

      result0.link(sinks[0])
      result1.link(sinks[1])
      BatchOperator.execute()

      `result0.link(sinks[0])` は、データを最初の出力ポートに書き込みます。最大 4 つの出力ポートをサポートします。

    • OSS ネットワークファイルに書き込みます。図のように書き込みパスを設定します。image

      result.link(AkSinkBatchOp() \
                  .setFilePath("oss://xxxxxxxx/model_20220323.ak") \
                  .setOverwriteSink(True))
      BatchOperator.execute()

モデルを EAS サービスとしてデプロイ

  1. デプロイするモデルの生成

    モデルが PipelineModel である場合にのみ、モデルを EAS サービスとしてデプロイできます。このコードを使用して PipelineModel ファイルを生成します。詳細は、「スタンドアロンでの使用」をご参照ください。

    from pyalink.alink import *
    
    def main(sources, sinks, parameter):
        PATH = "http://alink-test.oss-cn-beijing.aliyuncs.com/yuhe/movielens/"
        RATING_FILE = "ratings.csv"
        PREDICT_FILE = "predict.csv"
        RATING_SCHEMA_STRING = "user_id long, item_id long, rating int, ts long"
    
        ratingsData = CsvSourceBatchOp() \
                .setFilePath(PATH + RATING_FILE) \
                .setFieldDelimiter("\t") \
                .setSchemaStr(RATING_SCHEMA_STRING)
    
        predictData = CsvSourceBatchOp() \
                .setFilePath(PATH + PREDICT_FILE) \
                .setFieldDelimiter("\t") \
                .setSchemaStr(RATING_SCHEMA_STRING)
    
        itemCFModel = ItemCfTrainBatchOp() \
                .setUserCol("user_id").setItemCol("item_id") \
                .setRateCol("rating").linkFrom(ratingsData);
    
        itemCF = ItemCfRateRecommender() \
                .setModelData(itemCFModel) \
                .setItemCol("item_id") \
                .setUserCol("user_id") \
                .setReservedCols(["user_id", "item_id"]) \
                .setRecommCol("prediction_score")
    
        model = PipelineModel(itemCF)
        model.save().link(AkSinkBatchOp() \
                .setFilePath("oss://<your_bucket_name>/model.ak") \
                .setOverwriteSink(True))
        BatchOperator.execute()

    <your_bucket_name> は、ご利用の OSS バケット名です。

    重要

    PATH のデータセットパスに対する読み取り権限を確認してください。権限がない場合、コンポーネントの実行に失敗します。

  2. EAS 設定ファイルの生成

    このスクリプトを実行して、出力を `config.json` に書き込みます。

    # EAS の設定ファイル
    import json
    
    # EAS のモデル設定を生成します。
    model_config = {}
    # EAS が受け入れるデータのスキーマ。
    model_config['inputDataSchema'] = "id long, movieid long" 
    model_config['modelVersion'] = "v0.2"
    
    eas_config = {
        "name": "recomm_demo",
        "model_path": "http://xxxxxxxx/model.ak",
        "processor": "alink_outer_processor",
        "metadata": {
            "instance": 1,
            "memory": 2048,
            "region":"cn-beijing"
        },
        "model_config": model_config
    }
    print(json.dumps(eas_config, indent=4))

    `config.json` の主要なパラメーター:

    • name:モデルサービス名。

    • model_path:PipelineModel ファイルを格納する OSS パス。実際のパスに置き換えてください。

    その他の `config.json` パラメーターについては、「コマンドリファレンス」をご参照ください。

  3. モデルを EAS サービスとしてデプロイ

    eascmd クライアントにログインして、モデルサービスをデプロイします。詳細は、「クライアントのダウンロードと認証」をご参照ください。Windows 64 ビットシステムの場合は、次のコマンドを使用します。

    eascmdwin64.exe create config.json