複雑なデータワークフローでは、ノード間で動的な情報を渡す必要があることがよくあります。一般的な方法は中間テーブルを使用することですが、このアプローチは少量のデータを渡すには非常に非効率的であり、不要な I/O と複雑さを増します。代入ノードは軽量なソリューションを提供します。短いスクリプト (MaxCompute SQL、Python 2、または Shell) を実行し、その出力を直接下流ノードにパラメーターとして渡します。これにより、上流タスクの結果に基づいてタスクが動的に構成される柔軟なパイプラインを構築できます。
注意事項
エディション:DataWorks Standard Edition 以上。
権限:DataWorks ワークスペースで [開発] または [ワークスペース管理者] のロールが必要です。詳細については、「ワークスペースへのメンバーの追加」をご参照ください。
仕組み
代入ノードのコア機能はパラメーター渡し、つまり代入ノードから下流ノードへのデータ転送です。
代入ノードは、最後の出力またはクエリ結果を、
outputsという名前のシステム生成のノード出力パラメーターに自動的に割り当てることでデータを生成します。下流ノードがデータを消費します。ノード入力パラメーター (例:
param) を追加して、outputsを使用するように構成します。
パラメーターのフォーマット
outputs パラメーターの値とフォーマットは、使用されるスクリプト言語によって異なります。
言語 | 値 | フォーマット |
MaxCompute SQL | 最後の | 結果セットは2次元配列として渡されます。 |
Python 2 | 最後の | 出力は単一の文字列として扱われ、カンマ ( 例:出力が 重要 出力内容に含まれるカンマはエスケープしてください。たとえば、出力が |
Shell | 最後の |
操作手順
代入ノードの結果は、任意の下流ノードに渡すことができます。次の例では、Shell ノードを使用してこのワークフローを説明します。
代入ノードの構成
ワークフローで、代入ノードを作成して編集します。必要に応じて MaxCompute SQL、Python 2、または Shell を選択し、コードを記述します。

Shell ノードの構成

Shell ノードを作成します。
右側の [スケジューリング] パネルで、[入出力パラメーター] タブを選択します。
[入力パラメーター] セクションで、[パラメーターの作成] をクリックします。
入力パラメーターに
paramなどのパラメーター名を割り当て、値をoutputsパラメーターに設定します。説明この構成の後、DataWorks は Shell ノードと代入ノードの間に自動的に依存関係を作成します。
パラメーターを構成した後、Shell スクリプト内で
${param}の形式を使用して渡された値を参照できます。
実行と検証
ワークフローキャンバスの上部ツールバーで [デプロイ] をクリックし、[完全デプロイ] を選択します。
に移動します。
対象のワークフローで スモークテスト を実行し、結果が期待どおりであることを確認します。
制限事項
パラメーターは、直接の下流ノードにのみ渡すことができます。
サイズ制限:
outputsは 2 MB を超えることはできません。超えた場合、代入ノードは失敗します。構文の制限:
代入ノードのコードにコメントを含めないでください。コメントは出力の解析を妨げ、ノードの失敗や不正な値の生成を引き起こす可能性があります。
MaxCompute SQL モードでは
WITH句はサポートされていません。
言語別の例
outputs のデータ形式と参照方法は言語によって異なります。
例1:MaxCompute SQL クエリ結果の受け渡し
SQL クエリの結果は、2次元配列として Shell ノードに渡されます。
代入ノード
SQL コードが2行2列を返すと仮定します。
SELECT 'beijing', '1001' UNION ALL SELECT 'hangzhou', '1002';Shell ノード
代入ノードの
outputsを参照するparamという名前の入力パラメーターを追加します。次のスクリプトを使用してデータを読み取ります。
echo "Entire result set: ${region}" echo "First row: ${region[0]}" echo "Second field of the first row: ${region[0][1]}"DataWorks は静的な置き換えを実行します。出力:
Full result set: beijing,1001 hangzhou,1002 First row: beijing,1001 Second field of the first row: 1001
例2:Python 2 出力の受け渡し
Python 2 の print 文の出力は、カンマ (,) で分割され、1次元配列として渡されます。
代入ノード
Python 2 のコードは次のとおりです。
print 'Electronics, Clothing, Books';Shell ノード
代入ノードの
outputsを参照するparamという名前の入力パラメーターを追加します。次のスクリプトを使用してデータを読み取ります。次のスクリプトを使用してデータを読み取ります。
# Output the full array echo "Full result set: ${types}" # Output a specific element by index echo "Second element: ${types[1]}"DataWorks は静的な置き換えを実行します。出力:
Full result set: Electronics,Clothing,Books Second element: Clothing
Shell ノードの処理ロジックは Python 2 ノードと類似しているため、ここでは説明を省略します。
シナリオ:複数の業務ラインのパーティションテーブルからのデータの一括処理
この例では、代入ノードと for-each ノードを使用して、複数の業務ラインからのユーザー行動データを一括処理する方法を示します。このアプローチにより、複数のプロダクトラインに対して単一のロジックセットを使用することで、データ処理を自動化できます。
背景情報
ある大手インターネット企業のデータ開発者であると仮定します。E コマース (ecom)、金融 (finance)、物流 (logistics) という3つのコア業務ラインのデータ処理を担当しています。将来的にはさらに多くの業務ラインが追加される可能性があります。毎日、これら3つの業務ラインのユーザー行動ログに対して同じ集約ロジックを実行する必要があります。このロジックは、各ユーザーの1日あたりの人気度 (PV) を計算し、その結果を統一された集計テーブルに保存します。
上流ソーステーブル (DWD レイヤー):
dwd_user_behavior_ecom_d:E コマースユーザー行動テーブル。dwd_user_behavior_finance_d:金融ユーザー行動テーブル。dwd_user_behavior_logistics_d:物流ユーザー行動テーブル。dwd_user_behavior_${line-of-business}_d:他の潜在的な業務ラインのユーザー行動テーブル。これらのテーブルは同じスキーマを持ち、日 (
dt) でパーティション化されています。
下流ターゲットテーブル (DWS レイヤー):
dws_user_summary_d:ユーザー集計テーブル。このテーブルは、業務ライン (
biz_line) と日 (dt) の両方でパーティション化されています。すべての業務ラインの集約結果を保存するために使用されます。
業務ラインごとに個別のタスクを作成すると、メンテナンスコストが高くなり、エラーが発生しやすくなります。for-each ノードを使用すると、1セットの処理ロジックを維持するだけで済みます。システムは自動的にすべての業務ラインを走査して計算を完了します。
データ準備
まず、サンプルテーブルを作成し、テストデータを挿入します。この例では、データタイムスタンプ 20251010 を使用します。
DataStudio に移動してデータ開発を行い、MaxCompute SQL ノードを作成します。
ソーステーブル (DWD レイヤー) の作成:次のコードを MaxCompute SQL ノードに追加し、それを選択して実行します。
-- E-commerce user behavior table CREATE TABLE IF NOT EXISTS dwd_user_behavior_ecom_d ( user_id STRING COMMENT 'User ID', action_type STRING COMMENT 'Behavior type', event_time BIGINT COMMENT 'Millisecond-level UNIX timestamp of the event occurrence' ) COMMENT 'Details of E-commerce user behavioral logs' PARTITIONED BY (dt STRING COMMENT 'Date partition in yyyymmdd format'); INSERT OVERWRITE TABLE dwd_user_behavior_ecom_d PARTITION (dt='20251010') VALUES ('user001', 'click', 1760004060000), -- 2025-10-10 10:01:00.000 ('user002', 'browse', 1760004150000), -- 2025-10-10 10:02:30.000 ('user001', 'add_to_cart', 1760004300000); -- 2025-10-10 10:05:00.000 -- Verify that the E-commerce user behavior table is created. SELECT * FROM dwd_user_behavior_ecom_d where dt='20251010'; -- Finance user behavior table CREATE TABLE IF NOT EXISTS dwd_user_behavior_finance_d ( user_id STRING COMMENT 'User ID', action_type STRING COMMENT 'Behavior type', event_time BIGINT COMMENT 'Millisecond-level UNIX timestamp of the event occurrence' ) COMMENT 'Details of finance user behavioral logs' PARTITIONED BY (dt STRING COMMENT 'Date partition in yyyymmdd format'); INSERT OVERWRITE TABLE dwd_user_behavior_finance_d PARTITION (dt='20251010') VALUES ('user003', 'open_app', 1760020200000), -- 2025-10-10 14:30:00.000 ('user003', 'transfer', 1760020215000), -- 2025-10-10 14:30:15.000 ('user003', 'check_balance', 1760020245000), -- 2025-10-10 14:30:45.000 ('user004', 'open_app', 1760020300000); -- 2025-10-10 14:31:40.000 -- Verify that the finance user behavior table is created. SELECT * FROM dwd_user_behavior_finance_d where dt='20251010'; -- Logistics user behavior table CREATE TABLE IF NOT EXISTS dwd_user_behavior_logistics_d ( user_id STRING COMMENT 'User ID', action_type STRING COMMENT 'Behavior type', event_time BIGINT COMMENT 'Millisecond-level UNIX timestamp of the event occurrence' ) COMMENT 'Details of logistics user behavioral logs' PARTITIONED BY (dt STRING COMMENT 'Date partition in yyyymmdd format'); INSERT OVERWRITE TABLE dwd_user_behavior_logistics_d PARTITION (dt='20251010') VALUES ('user001', 'check_status', 1760032800000), -- 2025-10-10 18:00:00.000 ('user005', 'schedule_pickup', 1760032920000); -- 2025-10-10 18:02:00.000 -- Verify that the logistics user behavior table is created. SELECT * FROM dwd_user_behavior_logistics_d where dt='20251010';ターゲットテーブル (DWS レイヤー) の作成:次のコードを MaxCompute SQL ノードに追加し、それを選択して実行します。
CREATE TABLE IF NOT EXISTS dws_user_summary_d ( user_id STRING COMMENT 'User ID', pv BIGINT COMMENT 'Daily popularity', ) COMMENT 'Daily user popularity aggregate table' PARTITIONED BY ( dt STRING COMMENT 'Date partition in yyyymmdd format', biz_line STRING COMMENT 'Line-of-business partition, such as ecom, finance, logistics' );重要ワークスペースが標準環境を使用している場合は、このノードを本番環境に公開し、データバックフィルを実行する必要があります。
ワークフローの実装
ワークフローを作成します。右側の [スケジューリングパラメーター] ペインで、スケジューリングパラメーター bizdate を前日
$[yyyymmdd-1]に設定します。
ワークフローで、get_biz_list という名前の代入ノードを作成します。MaxCompute SQL に次のコードを記述します。このノードは、処理する業務ラインのリストを出力します。
-- Output all lines of business to be processed SELECT 'ecom' AS biz_line UNION ALL SELECT 'finance' AS biz_line UNION ALL SELECT 'logistics' AS biz_line;for-each ノードの構成
ワークフローキャンバスに戻り、get_biz_list 代入ノードの下流に for-each ノードを作成します。
for-each ノードの設定ページに移動します。右側の [スケジュール] タブの で、loopDataArray パラメーターの値を get_biz_list ノードの outputs に設定します。

for-each ノードのループ本体で、[内部ノードの作成] をクリックします。MaxCompute SQL ノードを作成し、ループ本体の処理ロジックを記述します。
説明このスクリプトは for-each ノードによって駆動され、業務ラインごとに1回実行されます。
組み込み変数 ${dag.foreach.current} は、実行時に現在の業務ライン名に動的に置き換えられます。期待される反復値は 'ecom'、'finance'、'logistics' です。
SET odps.sql.allow.dynamic.partition=true; INSERT OVERWRITE TABLE dws_user_summary_d PARTITION (dt='${bizdate}', biz_line) SELECT user_id, COUNT(*) AS pv, '${dag.foreach.current}' AS biz_line FROM dwd_user_behavior_${dag.foreach.current}_d WHERE dt = '${bizdate}' GROUP BY user_id;
検証ノードの追加
ワークフローキャンバスに戻ります。for-each ノードで、[下流ノードの作成] をクリックします。MaxCompute SQL ノードを作成し、次のコードを追加します。
SELECT * FROM dws_user_summary_d WHERE dt='20251010' ORDER BY biz_line, user_id;
公開と実行
ワークフローを本番環境に公開します。オペレーションセンターで、 に移動し、対象のワークフローを見つけてスモークテストを実行し、データタイムスタンプとして '20251010' を選択します。
実行が完了したら、テストインスタンスで実行ログを表示します。最終ノードの期待される出力は次のとおりです。
user_id | pv | dt | biz_line |
user001 | 2 | 20251010 | ecom |
user002 | 1 | 20251010 | ecom |
user003 | 3 | 20251010 | finance |
user004 | 1 | 20251010 | finance |
user001 | 1 | 20251010 | logistics |
user005 | 1 | 20251010 | logistics |
このソリューションの利点
高い拡張性:新しい業務ラインが追加された場合、代入ノードに1行の SQL コードを追加するだけで済みます。処理ロジックを変更する必要はありません。
容易なメンテナンス:すべての業務ラインが同じ処理ロジックを共有します。1か所の変更がすべての業務ラインに反映されます。
よくある質問
Q:MaxCompute SQL で、「find no select sql in sql assignment!」というエラーが表示されるのはなぜですか?
A:このエラーは、MaxCompute SQL コードに
SELECT文がないために発生します。SELECT文を追加する必要があります。WITH 構文は現在サポートされておらず、これを使用した場合もこのエラーが発生します。Q:Shell または Python で、「OutPut Result is null, cannot handle!」というエラーが表示されるのはなぜですか?
A:このエラーは、出力がないために発生します。コードに
printやechoなどの print 文が含まれているか確認してください。Q:Shell または Python で、カンマを含む出力要素をどのように処理しますか?
A:カンマ
,を\,に変換してエスケープする必要があります。次の Python コードが例です。categories = ["Electronics", "Clothing, Shoes & Accessories"] # Escape commas contained in each element # Replace ',' with '\,' escaped_categories = [cat.replace(",", "\,") for cat in categories] # Join the escaped elements with a comma output_string = ",".join(escaped_categories) print output_string # The final string output to the downstream node is: # Electronics,Clothing\, Shoes & AccessoriesQ:下流ノードは複数の上流代入ノードから結果を受け取ることができますか?
A:はい、できます。異なるノードからの結果を異なるパラメーターに割り当てることができます。

Q:代入ノードは他の言語をサポートしていますか?
A:代入ノードは現在、MaxCompute SQL、Python 2、Shell のみをサポートしています。ただし、EMR Hive、Hologres SQL、EMR Spark SQL、AnalyticDB for PostgreSQL、ClickHouse SQL、MySQL などの一部のノードには、同じ機能を提供する組み込みのパラメーター代入機能があります。

関連ドキュメント
下流ノードでのループ処理の実行方法の詳細については、「for-each ノード」および「do-while ノード」をご参照ください。
レベルをまたいだパラメーターの受け渡し方法の詳細については、「パラメーターノード」をご参照ください。
パラメーター渡しの構成の詳細については、「ノードコンテキストパラメーター」をご参照ください。