MaxCompute PyFg ジョブは、オフラインバッチで複雑な特徴を生成し、list、map、float、int などの複雑な ODPS 2.0 データ型をサポートします。このジョブは、設定ファイルとコマンドラインパラメーターを使用して、生成された特徴をビニングするかどうかを決定します。
方法1:汎用リソースグループイメージの使用
DataWorks の Scheduling Configuration - Resource Properties セクションで、汎用リソースグループと最新の dataworks_pairec_task_pod イメージを選択します。
注:dataworks_pairec_task_pod イメージは、そのリリースサイクルが pyfg の更新に遅れる可能性があるため、pyfg パッケージの最新バージョンが含まれていない場合があります。特定のバージョンを確認するには、「推奨ソリューションのカスタマイズ - 特徴設定」で生成されたスクリプトをご参照ください。最新の pyfg バージョンを使用するには、方法3 で説明されているようにカスタムリソースグループイメージを作成します。
方法 2: 依存関係パッケージのインストール (DataWorks の旧バージョン向け)
DataWorks コンソールにログインし、専用スケジューリングリソースグループを作成してから、O&M Assistant を使用して pyfg パッケージをインストールします。
DataWorks 専用リソースグループに pyfg パッケージをインストールするには、DataWorks > 管理センター > リソースグループリスト > <a href="https://dataworks.console.aliyun.com/resource/runcommand" id="963b1e2ebd2ts">O&M Assistant</a> に移動します。以下に例を示します:
/home/tops/bin/pip3 install -i https://pypi.tuna.tsinghua.edu.cn/simple --upgrade --force-reinstall http://tzrec.oss-cn-beijing.aliyuncs.com/third_party/pyfg101-1.0.1-cp37-cp37m-linux_x86_64.whl方法 3: カスタムリソースグループイメージの使用 (DataWorks の新しいバージョン向け)
詳細については、「カスタムイメージ」をご参照ください。
リソースファイルのアップロード
JSON フォーマットの FG 設定ファイルを MaxCompute プロジェクトにアップロードします。
一部の FG オペレーターは追加のリソースファイルを必要とします。これらのファイルは手動で MaxCompute プロジェクトにアップロードする必要があります。
特徴オペレーター | 説明 | リソースファイル設定項目 |
テキスト正規化 | ストップワードファイル | |
テキストトークン化特徴 | 語彙設定ファイル | |
テキスト関連性特徴 | 単語頻度構成ファイル | |
カスタムオペレーター | オペレーター設定ファイル |
出力テーブルの作成
DataWorks で、PyOdps3 ノードを作成し、次のスクリプトを実行します。このスクリプトは、fg.json ファイルの内容に基づいて、出力テーブルとその他の必要なリソースを作成します。
from pyfg101 import run_on_odps
fg_task = run_on_odps.FgTask(
args['input_table'],
args['output_table'],
args['fg_json_file'],
args['partition_value'],
force_delete_output_table=True,
force_update_resource=True)
fg_task.create_output_table(o)スクリプトを実行する前に、スケジューリング設定セクションで次のパラメーターを設定します:input_table、output_table、fg_json_file、partition_value。
fg_task.run(o) メソッドは、出力テーブルが存在しない場合に自動的に作成します。ただし、同時データバックフィル中にタスクの失敗を引き起こす可能性のある競合を防ぐために、事前にテーブルを作成することを推奨します。
FG オフラインタスクの実行
DataWorks で PyOdps3 ノードを作成し、次のスクリプトを実行します。このスクリプトは、fg.json ファイルの内容に基づいて出力テーブルを自動的に作成します。
from pyfg101 import run_on_odps
fg_task = run_on_odps.FgTask(
args['input_table'],
args['output_table'],
args['fg_json_file'],
args['partition_value'],
batch_size=128,
force_delete_output_table=False,
force_update_resource=False)
fg_task.add_sql_setting('odps.stage.mapper.split.size', 256)
fg_task.run(o)
スクリプトを実行する前に、スケジューリング設定セクションで次のパラメーターを設定します:input_table、output_table、fg_json_file、partition_value。
ご利用のローカルマシンに PyODPS がインストールされている場合は、pyfg をローカルにインストールしてタスクを送信することもできます。
パラメーター
パラメーター | デフォルト値 | 説明 |
input_table | None | 入力テーブル。 |
output_table | None | 出力テーブル。自動的に作成されます。 |
fg_json_file | None | JSON フォーマットの FG 設定ファイル。 |
partition_value | None | FG の入力として使用する入力テーブルのパーティションを指定します。結果は、出力テーブルの対応するパーティションに保存されます。 |
schema | None | MaxCompute スキーマを指定します。詳細については、「スキーマ操作」をご参照ください。 |
batch_size | 128 | バッチで処理するレコード数。 |
memory | 1024 | タスクノードが使用するメモリ量 (MB)。 |
force_delete_output_table | False | 出力テーブルを削除するかどうかを指定します。True に設定すると、タスクの実行前に出力テーブルが削除されます。 |
force_update_resource | False | リソースを更新するかどうかを指定します。True に設定すると、タスクの実行前にリソースが更新されます。常に True に設定しないでください。同時実行の競合を引き起こす可能性があります。 |
output_merged_str | False | 文字列をマージするかどうかを指定します。True に設定すると、文字列が自動的にマージされ、RTP フォーマットの大きな文字列特徴が出力されます。 |
debug | False | デバッグモードで実行するかどうかを指定します。True に設定すると、更新されたすべてのリソースの内容が出力されます。 |
sql_setting | None |
|
fg_setting | None |
|
例に示すように、コード内のデフォルトのパラメーター値を変更します。
補足情報
pyfg パッケージは、専用リソースグループ内のゲートウェイマシンにインストールされます。このマシンは、MaxCompute に SQL タスクを送信できます。または、pyodps ツールがインストールされている任意のマシンに pyfg パッケージをインストールして、MaxCompute クラスターにタスクを送信することもできます。
SQL タスク内のユーザー定義関数 (UDF) には、FG 共有ライブラリ、fg.json などの設定ファイル、辞書、カスタムオペレーターライブラリ、UDF コードファイル (.py) など、いくつかのリソースが必要です。これらのリソースはすべて MaxCompute クラスターにアップロードする必要があり、そこで分散ファイルシステムに保存されます。タスクが実行されると、各ワーカーはこれらのリソースを分散ファイルシステムからダウンロードし、メモリにロードします。
FG 共有ライブラリや UDF コードファイルなどの一部のリソースは、複数のタスクで共有されます。force_update_resource=True を設定すると、新しいリソースがアップロードされる前に元のリソースが削除されます。このプロセスにより時間差が生じ、実行中の他のタスクに影響を与える可能性があります。