for-each ノードは、ファイル名やパーティションなどのリスト内の各項目に対して同じサブタスクを実行するために使用します。上流ノード (通常は代入ノード) からの結果セットを反復処理し、各要素に対して内部ループを実行します。これにより、反復タスクが自動化され、項目ごとに手動でタスクを作成する必要がなくなります。
利用シーン
for-each ノードは、パラメータ化された実行を可能にします。これを使用して、異なる業務部門、プロダクトライン、または設定項目に同一の処理ロジックを適用できます。例えば、ロジックは同じでも対象が異なる複数のプロダクトラインの日次レポートを生成できます。
プログラミングにおける for ループのように、for-each ノードはリスト (テーブル名、パーティション名、ファイル名など) を反復処理し、リスト内のすべての項目に対して事前に設定されたサブワークフローを実行します。これにより、ワークフローの自動化と柔軟性が大幅に向上します。
注意事項
エディション:DataWorks Standard Edition 以上。
権限:DataWorks ワークスペースで 開発 または ワークスペース管理者 のロールが必要です。詳細については、「ワークスペースへのメンバーの追加」をご参照ください。
仕組み
for-each ノードは、カスタマイズ可能なサブワークフロー (ループ本文) をカプセル化します。次のように動作します。
データ入力:このノードは、上流の代入ノード (または EMR Hive のような互換性のあるノード) に依存します。
loopDataArrayパラメーターを上流の結果セット配列にバインドします。ループ実行: ノードは結果セットを順次反復します。各要素に対して、(
StartからEndまでの) ループ本文全体を実行します。説明Start ノードと End ノードは編集できません。ループ本文の開始と終了を示すためだけに使用されます。
データ受け渡し:各反復処理では、現在の要素の値が組み込み変数を介してループ本文内のノードに渡されます。内部のビジネスノードは、
${dag.foreach.current}を使用して現在処理中のデータ項目にアクセスします。
組み込み変数
${...} 形式の変数は、DataWorks 固有のテンプレート構文です。DataWorks はパラメーターを解析し、静的な置き換えを実行します。
for-each ループ本文内のノードで以下の組み込み変数を使用して、ループの状態とデータにアクセスします。
組み込み変数 | 説明 | for ループとの類似点 |
| 上流の代入ノードからの完全な結果セットを返します。 | 次のコードを例として説明します。
|
| 現在の反復処理で処理される項目を返します。 | |
| 現在のループのオフセット (0から始まる) を返します。 | |
| 現在のループ回数 (1から始まる) を返します。 |
上流の出力が 2次元配列 (SQL クエリ結果など) の場合、以下のメソッドを使用して特定の値を取得できます。
その他の変数 | 説明 |
| 現在のデータ行 (1次元配列) の文字列を、カンマ |
| 現在のデータ行 (1次元配列) から |
| 結果セット全体の for-each ノードはネストされたループをサポートしていません。これは値の取得方法を示すためのものです。 |
全般的な注意事項
実行メカニズム:シリアルモードとパラレルモードをサポートします。独立した反復処理にはパラレルモードを使用してください。
ループ上限:デフォルトは 128 回で、最大 1024 回まで調整可能です。
デバッグ:DataStudio で for-each ノードを直接実行することはできません。タスクをデプロイし、オペレーションセンターのスモークテストを使用してください。
実行制約:for-each ノードは、スモークテスト、データバックフィル、手動実行などの個別実行をサポートしていません。
フロー制御:ループ内で分岐ノードを使用する場合、すべての分岐を
Endノードの前にマージノードに集約する必要があります。
操作手順
上流の代入ノードと内部の Shell ノードを使用して for-each タスクを設定するには:
上流データの準備
代入ノードを作成して結果セットを出力します。
ワークフローで、代入ノード (例:
assign) を作成し、for-each ノードの上流に配置します。代入ノードをダブルクリックし、Python 2 などの言語を選択します。
Python 2を使用して、4つの要素を含む配列を出力します。代入ノードは [10,20,30,40] を下流のノードに出力します。これは、代入ノードが最終行の出力をカンマをデリミタとして自動的に配列に分割するためです。
print "10,20,30,40"代入ノードは、その結果セットを表す
outputsという名前の出力パラメーターを自動的に生成します。代入ノードを保存します。
データを消費するように for-each ノードを設定
for-each ノードが上流のデータを受信し、そのループ本文内で使用するように設定します。
for-each ノードをダブルクリックして、内部のオーケストレーションキャンバスに入ります。
右側の [スケジューリング] パネルで、[スケジューリングパラメーター] の下にある
loopDataArrayパラメーターを見つけ、[関連付け] をクリックします。
上流の代入ノード (
assign) のoutputsを値ソースとして選択します。これにより、依存関係が自動的に確立されます。for-each ループ本文で、[内部ノードを作成] をクリックし、
Shellを選択します。実際のシナリオでは、任意のノードタイプを設定できます。
新しい Shell ノードをダブルクリックし、コード内で組み込み変数を使用してループ情報を取得し、出力します。
#!/bin/bash # ${dag.loopTimes} を使用して現在のループ回数を取得します echo "現在のループ番号は: ${dag.loopTimes}" # ${dag.foreach.current} を使用して現在反復処理中のデータを取得します echo "現在の項目は: ${dag.foreach.current}"(オプション) 右側の [スケジューリング] パネルで、[スケジューリングポリシー] の下にある関連プロパティを設定します。
[最大ループ数]:デフォルトは 128 で、最大 1024 まで調整可能です。このパラメーターは、ループ本文の最大反復回数を決定します。上流のデータ量が多い場合は、このパラメーターをすべての反復をカバーするように調整してください。
実行ポリシー: ここでは [シリアル] を選択します。
シリアル:ループ回数に基づいて順次実行します。
パラレル:for-each ノード内の内部ループの同時実行を許可し、タスクの効率を向上させます。パラレルモードで特定のバッチが失敗しても、他のバッチの実行には影響しません。スケジューラはすべてのバッチを完了します。デフォルトの同時実行数は 5 で、サポートされる最大値は 20 です。

Shell ノードを保存します。
デプロイ、実行、検証
ワークフローをオペレーションセンターに送信して実行し、for-each ノードの結果を検証します。
メインのワークフローキャンバスに戻り、ツールバーの [デプロイ] ボタンをクリックしてワークフロー全体を公開します。
に移動し、対象のワークフローでスモークテストを実行します。
重要for-each ノードを個別にスモークテストしないでください。 for-each ノードは上流の代入ノードの出力に依存するため、データリネージの完全性を確保するために、テストは代入ノードから開始する必要があります。
テストインスタンスが正常に実行された後、インスタンスリストで for-each ノードのインスタンスを見つけ、クリックして開き、右クリックして [内部ノードを表示] を選択します。

内部ノードビューで、各ループで生成された Shell ノードインスタンスを確認します。任意のインスタンスの実行ログを開き、その特定の反復の出力結果を表示して、期待どおりであるかを確認します。

利用シーン:異なるデータフォーマットの処理
シナリオ 1:1次元配列 (Shell/Python の出力) の処理
代入ノードの出力:2025-11-01,2025-11-02,2025-11-03
反復回数:3 回。
2回目のループ中:
${dag.foreach.current}の値は2025-11-02です。${dag.loopTimes}の値は2です。
シナリオ 2:2次元配列 (SQL の出力) の処理
代入ノード (MaxCompute SQL) の出力:
+-----+----------+ | id | city | +-----+----------+ | 101 | beijing | | 102 | shanghai | +-----+----------+反復回数:2 回。
2回目のループ中:
${dag.foreach.current}の値は102,shanghaiです。${dag.loopTimes}の値は2です。${dag.foreach.current[0]}の値は102です。${dag.foreach.current[1]}の値はshanghaiです。
シナリオ:複数の LOB のパーティションテーブルからデータをバッチ処理
この例では、代入ノードと for-each ノードを使用して、複数の LOB (Line-of-Business) のユーザー行動データをバッチ処理する方法を示します。このアプローチでは、複数のプロダクトラインに対して単一のロジックセットを使用できるため、データ処理が自動化されます。
背景情報
ある大手インターネット企業のデータ開発者であると仮定します。あなたは、E コマース (ecom)、金融 (finance)、物流 (logistics) という3つのコア LOB のデータ処理を担当しています。将来的にはさらに多くの LOB が追加される可能性があります。毎日、これら3つの LOB のユーザー行動ログに対して同じ集約ロジックを実行する必要があります。このロジックは、各ユーザーの1日あたりの人気度 (PV) を計算し、その結果を統一された集計テーブルに保存します。
上流ソーステーブル (DWD レイヤー):
dwd_user_behavior_ecom_d:E コマースユーザー行動テーブル。dwd_user_behavior_finance_d:金融ユーザー行動テーブル。dwd_user_behavior_logistics_d:物流ユーザー行動テーブル。dwd_user_behavior_${line-of-business}_d:他の潜在的な LOB のユーザー行動テーブル。これらのテーブルは同じスキーマを持ち、日 (
dt) でパーティション分割されています。
下流ターゲットテーブル (DWS レイヤー):
dws_user_summary_d:ユーザー集計テーブル。このテーブルは、LOB (
biz_line) と日 (dt) の両方でパーティション分割されています。すべての LOB の集約結果を保存するために使用されます。
LOB ごとに個別のタスクを作成すると、メンテナンスコストが高くなり、エラーが発生しやすくなります。for-each ノードを使用すれば、1セットの処理ロジックを維持するだけで済みます。システムは自動的にすべての LOB を走査して計算を完了します。
データ準備
まず、サンプルテーブルを作成し、テストデータを挿入します。この例では、データタイムスタンプ 20251010 を使用します。
DataStudio に移動してデータ開発を行い、MaxCompute SQL ノードを作成します。
ソーステーブル (DWD レイヤー) を作成します:MaxCompute SQL ノードに次のコードを追加し、それを選択して実行します。
-- E コマースユーザー行動テーブル CREATE TABLE IF NOT EXISTS dwd_user_behavior_ecom_d ( user_id STRING COMMENT 'ユーザー ID', action_type STRING COMMENT '行動タイプ', event_time BIGINT COMMENT 'イベント発生のミリ秒レベルの UNIX タイムスタンプ' ) COMMENT 'E コマースユーザー行動ログの詳細' PARTITIONED BY (dt STRING COMMENT '日付パーティション (yyyymmdd 形式)'); INSERT OVERWRITE TABLE dwd_user_behavior_ecom_d PARTITION (dt='20251010') VALUES ('user001', 'click', 1760004060000), -- 2025-10-10 10:01:00.000 ('user002', 'browse', 1760004150000), -- 2025-10-10 10:02:30.000 ('user001', 'add_to_cart', 1760004300000); -- 2025-10-10 10:05:00.000 -- E コマースユーザー行動テーブルが作成されたことを確認します。 SELECT * FROM dwd_user_behavior_ecom_d where dt='20251010'; -- 金融ユーザー行動テーブル CREATE TABLE IF NOT EXISTS dwd_user_behavior_finance_d ( user_id STRING COMMENT 'ユーザー ID', action_type STRING COMMENT '行動タイプ', event_time BIGINT COMMENT 'イベント発生のミリ秒レベルの UNIX タイムスタンプ' ) COMMENT '金融ユーザー行動ログの詳細' PARTITIONED BY (dt STRING COMMENT '日付パーティション (yyyymmdd 形式)'); INSERT OVERWRITE TABLE dwd_user_behavior_finance_d PARTITION (dt='20251010') VALUES ('user003', 'open_app', 1760020200000), -- 2025-10-10 14:30:00.000 ('user003', 'transfer', 1760020215000), -- 2025-10-10 14:30:15.000 ('user003', 'check_balance', 1760020245000), -- 2025-10-10 14:30:45.000 ('user004', 'open_app', 1760020300000); -- 2025-10-10 14:31:40.000 -- 金融ユーザー行動テーブルが作成されたことを確認します。 SELECT * FROM dwd_user_behavior_finance_d where dt='20251010'; -- 物流ユーザー行動テーブル CREATE TABLE IF NOT EXISTS dwd_user_behavior_logistics_d ( user_id STRING COMMENT 'ユーザー ID', action_type STRING COMMENT '行動タイプ', event_time BIGINT COMMENT 'イベント発生のミリ秒レベルの UNIX タイムスタンプ' ) COMMENT '物流ユーザー行動ログの詳細' PARTITIONED BY (dt STRING COMMENT '日付パーティション (yyyymmdd 形式)'); INSERT OVERWRITE TABLE dwd_user_behavior_logistics_d PARTITION (dt='20251010') VALUES ('user001', 'check_status', 1760032800000), -- 2025-10-10 18:00:00.000 ('user005', 'schedule_pickup', 1760032920000); -- 2025-10-10 18:02:00.000 -- 物流ユーザー行動テーブルが作成されたことを確認します。 SELECT * FROM dwd_user_behavior_logistics_d where dt='20251010';ターゲットテーブル (DWS レイヤー) を作成します:MaxCompute SQL ノードに次のコードを追加し、それを選択して実行します。
CREATE TABLE IF NOT EXISTS dws_user_summary_d ( user_id STRING COMMENT 'ユーザー ID', pv BIGINT COMMENT '1日あたりの人気度', ) COMMENT '1日あたりのユーザー人気度集計テーブル' PARTITIONED BY ( dt STRING COMMENT '日付パーティション (yyyymmdd 形式)', biz_line STRING COMMENT 'LOB パーティション (ecom, finance, logistics など)' );重要ワークスペースが標準環境を使用している場合、このノードを本番環境に公開し、データバックフィルを実行する必要があります。
ワークフローの実装
ワークフローを作成します。右側の [スケジューリングパラメーター] ペインで、スケジューリングパラメーター bizdate を前日
$[yyyymmdd-1]に設定します。
ワークフローで、get_biz_list という名前の代入ノードを作成します。MaxCompute SQL に次のコードを記述します。このノードは、処理する LOB のリストを出力します。
-- 処理するすべての LOB を出力します SELECT 'ecom' AS biz_line UNION ALL SELECT 'finance' AS biz_line UNION ALL SELECT 'logistics' AS biz_line;for-each ノードの設定
ワークフローキャンバスに戻り、get_biz_list 代入ノードの下流に for-each ノードを作成します。
for-each ノードの設定ページに移動します。右側の [スケジュール] タブの で、loopDataArray パラメーターの値を get_biz_list ノードの outputs に設定します。

for-each ノードのループ本文で、[内部ノードを作成] をクリックします。MaxCompute SQL ノードを作成し、ループ本文の処理ロジックを記述します。
説明このスクリプトは for-each ノードによって駆動され、LOB ごとに1回実行されます。
組み込み変数 ${dag.foreach.current} は、実行時に現在の LOB 名に動的に置き換えられます。期待される反復値は 'ecom'、'finance'、'logistics' です。
SET odps.sql.allow.dynamic.partition=true; INSERT OVERWRITE TABLE dws_user_summary_d PARTITION (dt='${bizdate}', biz_line) SELECT user_id, COUNT(*) AS pv, '${dag.foreach.current}' AS biz_line FROM dwd_user_behavior_${dag.foreach.current}_d WHERE dt = '${bizdate}' GROUP BY user_id;
検証ノードの追加
ワークフローキャンバスに戻ります。for-each ノードで、[下流ノードを作成] をクリックします。MaxCompute SQL ノードを作成し、次のコードを追加します。
SELECT * FROM dws_user_summary_d WHERE dt='20251010' ORDER BY biz_line, user_id;
発行と実行
ワークフローを本番環境に発行します。オペレーションセンターで、 に移動し、対象のワークフローを見つけてスモークテストを実行し、データタイムスタンプとして '20251010' を選択します。
実行が完了したら、テストインスタンスで実行ログを表示します。最終ノードの期待される出力は次のとおりです。
user_id | pv | dt | biz_line |
user001 | 2 | 20251010 | ecom |
user002 | 1 | 20251010 | ecom |
user003 | 3 | 20251010 | finance |
user004 | 1 | 20251010 | finance |
user001 | 1 | 20251010 | logistics |
user005 | 1 | 20251010 | logistics |
このソリューションの利点
高い拡張性:新しい LOB が追加された場合、代入ノードに SQL コードを1行追加するだけで済みます。処理ロジックを変更する必要はありません。
容易なメンテナンス:すべての LOB が同じ処理ロジックを共有します。1か所の変更がすべてに反映されます。
よくある質問
Q:なぜ DataStudio で for-each ノードを直接実行してテストできないのですか?
A:これは設計上の制約です。ノードはノードのコンテキストと依存関係を解決するために完全なスケジューリング環境を必要とするため、DataStudio での直接デバッグはサポートされていません。タスクをオペレーションセンターに発行し、データバックフィルまたは定期スケジューリングを通じてテストする必要があります。
Q:for-each ノードを個別にスモークテストすると失敗したり、何も実行されなかったりするのはなぜですか?
A:for-each ノードのループデータは、その
loopDataArray入力パラメーターから派生します。このパラメーターは、上流の代入ノードのoutputsパラメーターにバインドする必要があります。for-each ノードを個別に実行すると、入力結果セットを取得できないため、実行がスキップされるか失敗します。Q:ループが1回しか実行されないのはなぜですか?
A:これは通常、上流の代入ノードの出力結果が単一の要素として解析されたためです。出力を確認してください。
正しい区切り文字のない単一の文字列ではありませんか?
複数の項目を反復処理する場合は、それらが標準のカンマ (
,) で区切られていることを確認してください。
例えば、'item1,item2,item3'は3回ループしますが、'item1 item2 item3'は1回しかループしません。