すべてのプロダクト
Search
ドキュメントセンター

Artificial Intelligence Recommendation:MaxCompute タスクにおける FG の使用

最終更新日:Mar 28, 2026

MaxCompute PyFG ジョブは、オフラインバッチ処理で複雑な特徴量を生成し、リスト、マップ、浮動小数点数、整数など、ODPS 2.0 の複雑なデータの型をサポートします。生成された特徴量のビン分割(離散化)を行うかどうかは、設定ファイルおよびコマンドラインパラメーターで指定します。

方法 1:汎用リソースグループイメージの使用

DataWorks コンソールで、スケジューリング構成 - リソースプロパティ セクションに移動します。汎用リソースグループを選択し、最新の dataworks_pairec_task_pod イメージを指定します。

説明 dataworks_pairec_task_pod イメージのリリースは pyfg の更新に遅れる場合があるため、イメージに含まれる pyfg パッケージが最新版でない可能性があります。バージョンを確認するには、「特徴量の構成」で生成されるスクリプトをご参照ください。あるいは、方法 3 を使用してリソースグループイメージをカスタマイズし、最新版の pyfg を利用してください。

方法 2:依存パッケージのインストール(旧バージョンの DataWorks 向け)

DataWorks コンソールにログインし、専用スケジューリングリソースグループを作成した後、O&M アシスタント を使用して pyfg パッケージをインストールします。

専用スケジューリングリソースグループに pyfg パッケージをインストールするには、DataWorks > 管理センター > リソースグループリスト > <a href="https://dataworks.console.aliyun.com/resource/runcommand" id="963b1e2ebd2ts">O&Mアシスタント</a> に移動し、次のコマンドを実行します。

/home/tops/bin/pip3 install -i https://pypi.tuna.tsinghua.edu.cn/simple --upgrade --force-reinstall http://tzrec.oss-cn-beijing.aliyuncs.com/third_party/pyfg102-1.0.2-cp37-cp37m-linux_x86_64.whl

方法 3:リソースグループイメージのカスタマイズ(新バージョンの DataWorks 向け)

詳細については、「カスタムイメージ」をご参照ください。

リソースファイルのアップロード

JSON 形式の FG 構成ファイルを MaxCompute プロジェクトにアップロードします。

一部の FG オペレーターでは、追加のリソースファイルが必要となるため、これらを手動で MaxCompute プロジェクトにアップロードする必要があります。

特徴量オペレーター

説明

リソースファイルパラメーター

text_normalizer

テキスト正規化

ストップワードファイル stop_char_file

tokenize_feature

テキスト形態素解析特徴量

語彙構成ファイル vocab_file

bm25_feature

テキスト関連性特徴量

語頻度構成ファイル term_doc_freq_file

custom_feature

カスタムオペレーター

オペレーター構成ファイル operator_lib_file

出力テーブルの作成

DataWorks で PyOdps3 ノードを作成し、以下のスクリプトを実行します。このスクリプトは、fg.json ファイルの内容に基づいて出力テーブルおよびその他の必要なリソースを作成します。

from pyfg102 import run_on_odps

fg_task = run_on_odps.FgTask(
    args['input_table'], 
    args['output_table'], 
    args['fg_json_file'], 
    args['partition_value'],
    force_delete_output_table=True,
    force_update_resource=True)
fg_task.create_output_table(o)

スクリプトを実行する前に、スケジューリング構成で以下のパラメーターを設定します:input_tableoutput_tablefg_json_file、および partition_value

fg_task.run(o) メソッドは、出力テーブルが存在しない場合に自動的に作成しますが、事前にテーブルを作成することを推奨します。この方法により、データバックフィル時の同時実行による競合エラーを防止できます。

FG オフラインタスクの実行

DataWorks で PyOdps3 ノードを作成し、以下のスクリプトを実行します。出力テーブルが存在しない場合、スクリプトは fg.json ファイルに基づいて自動的にテーブルを作成します。

from pyfg102 import run_on_odps

fg_task = run_on_odps.FgTask(
    args['input_table'], 
    args['output_table'], 
    args['fg_json_file'], 
    args['partition_value'],
    batch_size=128,
    force_delete_output_table=False,
    force_update_resource=False)
fg_task.add_sql_setting('odps.stage.mapper.split.size', 256)
fg_task.run(o)

スクリプトを実行する前に、スケジューリング構成で以下のパラメーターを設定します:input_tableoutput_tablefg_json_file、および partition_value

また、PyODPS をローカルマシンにインストール済みの場合、pyfg をローカルマシンにインストールしてタスクを送信することも可能です。

パラメーター

パラメーター

デフォルト値

説明

input_table

なし

入力テーブル。

output_table

なし

出力テーブル。タスクは、テーブルが存在しない場合に自動的に作成します。

fg_json_file

なし

JSON 形式の FG 構成ファイル。

partition_value

なし

処理対象の入力テーブルパーティション。結果は出力テーブルの対応するパーティションに書き込まれます。

schema

なし

MaxCompute スキーマを指定します。詳細については、「スキーマ操作」をご参照ください。

batch_size

128

1 バッチあたりの処理レコード数。

memory

1024

タスクノードに割り当てるメモリ量(MB 単位)。

force_delete_output_table

False

True の場合、タスク実行前に既存の出力テーブルを削除します。

force_update_resource

False

True の場合、タスク実行前にリソースを更新します。重要:同時実行による競合を回避するため、このパラメーターを True のままにしないでください。

output_merged_str

False

True に設定すると、出力文字列を RTP 形式の単一文字列特徴量にマージします。

debug

False

True の場合、デバッグモードを有効にします。このモードでは、更新されたすべてのリソースの内容がログに出力されます。

sql_setting

なし

fg_task.add_sql_setting メソッドを使用して MaxCompute SQL パラメーターを指定します。このメソッドは複数回呼び出すことができます。パラメーターの一覧については、「フラグパラメーター」をご参照ください。

fg_setting

なし

fg_task.add_fg_setting メソッドの key および value パラメーター。これらのパラメーターは FG の設定項目を指定します。詳細については、「[グローバル構成](summary.md#id9)」をご参照ください。複数の構成項目を追加できます。本機能は v0.4.0 以降で利用可能です。

これらの設定は、コード例内のパラメーター値を変更することでカスタマイズできます。

仕組み

pyfg パッケージがインストール済みの専用リソースグループ内のゲートウェイマシンから、SQL タスクを MaxCompute に送信できます。あるいは、PyODPS がインストール済みのローカルマシンに pyfg をインストールし、そこからタスクを送信することも可能です。

SQL タスク内のカスタムユーザー定義関数(UDF)では、FG 共有ライブラリ、構成ファイル(例:fg.json)、辞書、カスタムオペレーターのライブラリ、UDF コードファイル(.py)など、複数のリソースが必要です。これらのリソースはすべて MaxCompute クラスターにアップロードする必要があり、MaxCompute 分散ファイルシステム内に格納されます。タスク実行時、各ワーカーは分散ファイルシステムからこれらのリソースをダウンロードし、メモリに読み込みます。

FG 共有ライブラリや UDF コードファイルなど、複数のタスクで共有されるリソースがあります。force_update_resource=True の場合、新しいリソースをアップロードする前に既存のリソースが削除されます。このため、他の実行中のタスクに影響を与える時間間隔が発生します。