MaxCompute PyFG ジョブは、オフラインバッチ処理で複雑な特徴量を生成し、リスト、マップ、浮動小数点数、整数など、ODPS 2.0 の複雑なデータの型をサポートします。生成された特徴量のビン分割(離散化)を行うかどうかは、設定ファイルおよびコマンドラインパラメーターで指定します。
方法 1:汎用リソースグループイメージの使用
DataWorks コンソールで、スケジューリング構成 - リソースプロパティ セクションに移動します。汎用リソースグループを選択し、最新の dataworks_pairec_task_pod イメージを指定します。
dataworks_pairec_task_pod イメージのリリースは pyfg の更新に遅れる場合があるため、イメージに含まれる pyfg パッケージが最新版でない可能性があります。バージョンを確認するには、「特徴量の構成」で生成されるスクリプトをご参照ください。あるいは、方法 3 を使用してリソースグループイメージをカスタマイズし、最新版の pyfg を利用してください。方法 2:依存パッケージのインストール(旧バージョンの DataWorks 向け)
DataWorks コンソールにログインし、専用スケジューリングリソースグループを作成した後、O&M アシスタント を使用して pyfg パッケージをインストールします。
専用スケジューリングリソースグループに pyfg パッケージをインストールするには、DataWorks > 管理センター > リソースグループリスト > <a href="https://dataworks.console.aliyun.com/resource/runcommand" id="963b1e2ebd2ts">O&Mアシスタント</a> に移動し、次のコマンドを実行します。
/home/tops/bin/pip3 install -i https://pypi.tuna.tsinghua.edu.cn/simple --upgrade --force-reinstall http://tzrec.oss-cn-beijing.aliyuncs.com/third_party/pyfg102-1.0.2-cp37-cp37m-linux_x86_64.whl方法 3:リソースグループイメージのカスタマイズ(新バージョンの DataWorks 向け)
詳細については、「カスタムイメージ」をご参照ください。
リソースファイルのアップロード
JSON 形式の FG 構成ファイルを MaxCompute プロジェクトにアップロードします。
一部の FG オペレーターでは、追加のリソースファイルが必要となるため、これらを手動で MaxCompute プロジェクトにアップロードする必要があります。
特徴量オペレーター | 説明 | リソースファイルパラメーター |
テキスト正規化 | ストップワードファイル | |
テキスト形態素解析特徴量 | 語彙構成ファイル | |
テキスト関連性特徴量 | 語頻度構成ファイル | |
カスタムオペレーター | オペレーター構成ファイル |
出力テーブルの作成
DataWorks で PyOdps3 ノードを作成し、以下のスクリプトを実行します。このスクリプトは、fg.json ファイルの内容に基づいて出力テーブルおよびその他の必要なリソースを作成します。
from pyfg102 import run_on_odps
fg_task = run_on_odps.FgTask(
args['input_table'],
args['output_table'],
args['fg_json_file'],
args['partition_value'],
force_delete_output_table=True,
force_update_resource=True)
fg_task.create_output_table(o)スクリプトを実行する前に、スケジューリング構成で以下のパラメーターを設定します:input_table、output_table、fg_json_file、および partition_value。
fg_task.run(o) メソッドは、出力テーブルが存在しない場合に自動的に作成しますが、事前にテーブルを作成することを推奨します。この方法により、データバックフィル時の同時実行による競合エラーを防止できます。
FG オフラインタスクの実行
DataWorks で PyOdps3 ノードを作成し、以下のスクリプトを実行します。出力テーブルが存在しない場合、スクリプトは fg.json ファイルに基づいて自動的にテーブルを作成します。
from pyfg102 import run_on_odps
fg_task = run_on_odps.FgTask(
args['input_table'],
args['output_table'],
args['fg_json_file'],
args['partition_value'],
batch_size=128,
force_delete_output_table=False,
force_update_resource=False)
fg_task.add_sql_setting('odps.stage.mapper.split.size', 256)
fg_task.run(o)
スクリプトを実行する前に、スケジューリング構成で以下のパラメーターを設定します:input_table、output_table、fg_json_file、および partition_value。
また、PyODPS をローカルマシンにインストール済みの場合、pyfg をローカルマシンにインストールしてタスクを送信することも可能です。
パラメーター
パラメーター | デフォルト値 | 説明 |
input_table | なし | 入力テーブル。 |
output_table | なし | 出力テーブル。タスクは、テーブルが存在しない場合に自動的に作成します。 |
fg_json_file | なし | JSON 形式の FG 構成ファイル。 |
partition_value | なし | 処理対象の入力テーブルパーティション。結果は出力テーブルの対応するパーティションに書き込まれます。 |
schema | なし | MaxCompute スキーマを指定します。詳細については、「スキーマ操作」をご参照ください。 |
batch_size | 128 | 1 バッチあたりの処理レコード数。 |
memory | 1024 | タスクノードに割り当てるメモリ量(MB 単位)。 |
force_delete_output_table | False |
|
force_update_resource | False |
|
output_merged_str | False |
|
debug | False |
|
sql_setting | なし |
|
fg_setting | なし |
|
これらの設定は、コード例内のパラメーター値を変更することでカスタマイズできます。
仕組み
pyfg パッケージがインストール済みの専用リソースグループ内のゲートウェイマシンから、SQL タスクを MaxCompute に送信できます。あるいは、PyODPS がインストール済みのローカルマシンに pyfg をインストールし、そこからタスクを送信することも可能です。
SQL タスク内のカスタムユーザー定義関数(UDF)では、FG 共有ライブラリ、構成ファイル(例:fg.json)、辞書、カスタムオペレーターのライブラリ、UDF コードファイル(.py)など、複数のリソースが必要です。これらのリソースはすべて MaxCompute クラスターにアップロードする必要があり、MaxCompute 分散ファイルシステム内に格納されます。タスク実行時、各ワーカーは分散ファイルシステムからこれらのリソースをダウンロードし、メモリに読み込みます。
FG 共有ライブラリや UDF コードファイルなど、複数のタスクで共有されるリソースがあります。force_update_resource=True の場合、新しいリソースをアップロードする前に既存のリソースが削除されます。このため、他の実行中のタスクに影響を与える時間間隔が発生します。