データ処理ワークフローでは、for-each ノードを使用して、ファイル名やパーティションのリストなど、リスト内の各項目に対して同じサブタスクを実行できます。このノードは、通常は代入ノードである上流ノードからの結果セットを自動的に反復処理し、各要素に対して内部のループ本体を繰り返します。このアプローチにより、個別のタスクを作成する反復的な手作業が不要になり、ワークフローが自動化および効率化されます。
利用シーン
日々のデータ開発において、for-each ノードは、異なる業務部門、プロダクトライン、または設定項目に同じ分析や処理ロジックを適用する必要がある場合に、パラメーター化された実行を可能にします。例えば、会社に複数のプロダクトラインがあり、それぞれに個別のデイリーレポートを生成する必要がある場合、処理ロジックは同じで、対象データのみが異なります。
プログラミング言語の for ループと同様に、for-each ノードはテーブル名、パーティション名、ファイル名などのリストを自動的に反復処理します。リスト内の各項目に対して事前定義されたサブワークフローを実行し、ワークフローの自動化と柔軟性を大幅に向上させます。
前提条件
-
エディション要件:この機能は、DataWorks Standard Edition 以降でのみ利用可能です。
-
権限要件:ご利用の RAM アカウントを対応するワークスペースに追加し、開発またはワークスペース管理者ロールを付与する必要があります。詳細については、「ワークスペースにメンバーを追加」をご参照ください。
仕組み
for-each ノードは、ループ本体として知られるカスタマイズ可能なサブワークフローをカプセル化するコンテナとして機能します。その仕組みは次のとおりです:
-
データ入力: for-each ノードは、上流の 代入ノードまたはその他の割り当て可能なノード (EMR Hive ノードなど) に依存します。
loopDataArrayパラメーターにバインドすることで、配列形式の 結果セット を取得します。 -
ループ実行: ノードが開始されると、結果セット内の各要素を順に反復処理します。各要素について、
StartノードからEndノードまで、内部のループ本文を 1 回、完全に実行します。説明Start ノードと End ノードは編集できません。これらはループ本体の開始と終了を示すだけです。
-
データ渡し: 各反復において、現在の要素の値は、組み込み変数を介してループ本文内のノードに渡されます。内部のビジネスノードは
${dag.foreach.current}を使用して、処理中のデータ項目にアクセスします。
組み込みパラメーター
${...} 形式の変数は、DataWorks に固有のテンプレート構文です。DataWorks は、実行前にこれらのパラメーターを直接解析し、値に置き換えます。
for-each ループ本体内のノードは、以下の組み込み変数を使用してループのステータスとデータにアクセスできます:
|
組み込みパラメーター |
説明 |
for ループのアナロジー |
|
|
上流の代入ノードから渡される完全な結果セット。 |
次の for ループコードを考えてみましょう:
|
|
|
現在の反復処理で処理中のデータ項目。 |
|
|
|
現在のループのオフセット (0 から始まるインデックス)。 |
|
|
|
現在のループ回数 (1 から始まるインデックス)。 |
上流の出力が SQL クエリの結果のような2 次元配列である場合、次の構文を使用して特定の値にアクセスすることもできます:
|
その他のパラメーター |
説明 |
|
|
現在のデータ行 (1 次元配列) の要素をカンマ |
|
|
現在のデータ行から |
|
|
結果セット全体の for-each ノードは現在、ネストされたループをサポートしていません。この例は値の取得を示すためのものです。 |
制限事項
-
実行メカニズム:ループはシリアル実行と並列実行の両方をサポートします。反復処理が互いに独立している場合は、並列実行を選択できます。
-
ループ制限:デフォルトの最大ループ数は 128 で、最大 1024 まで調整可能です。
-
デバッグの制約:for-each ノードを Data Studio で直接実行することはできません。タスクをデプロイし、オペレーションセンターでスモークテスト機能を使用してテストする必要があります。
-
実行の制約:for-each ノードを単独で実行することはできません。これには、スモークテスト、バックフィル、手動実行が含まれます。
-
ループ本文内でのフロー制御: for-each ループ本文内でブランチノードを使用する場合、
Endノードに接続する前に、すべてのブランチを最終的に 1 つのマージノードに収束させる必要があります。これにより、ループ本文の論理的な整合性が保証されます。 -
再実行の制約:ノードがデプロイされた後、失敗時の自動再実行は失敗点から再開されます。ただし、手動での再実行は、for-each ノード全体が完全に再実行されます。
操作手順
この手順では、代入ノードを上流ノードとして使用し、ループ本体内の Shell ノードで結果を出力します。このセクションでは、完全な for-each タスクを設定する方法を説明します:
-
上流データの準備 (代入ノードの設定)
代入ノードを作成して設定し、下流の for-each ノードに反復可能な結果セットを提供します。
-
ワークフローで、代入ノード (例:
assign) を作成し、for-each ノードの上流に配置します。 -
代入ノードをダブルクリックし、Python 2 環境を選択します。例えば、
Python 2を使用して 4 つの要素を持つ配列を出力します:ノードは、最後の出力行をカンマで自動的に配列に分割し、[10,20,30,40] を下流ノードに出力します。
print "10,20,30,40" -
代入ノードは、その結果セットを表す
outputsという名前の出力パラメーターを自動的に生成します。 -
代入ノードを保存します。
-
-
データを消費するための for-each ノードの設定
for-each ノードを設定して上流データを受信し、そのループ本体内で使用します。
-
for-each ノードをダブルクリックして、その内部キャンバスを開きます。
-
右側の[スケジューリング]パネルで、[スケジューリングパラメーター]の下にある
loopDataArrayパラメーターを見つけ、[バインド]をクリックします。assign ノードの outputs パラメーターを選択してバインドを作成します。バインドが完了すると、loopDataArray パラメーターの値にバインドされた状態が反映されます。
-
表示されるダイアログボックスで、[値ソース] を上流の 代入ノード (
assign) に設定し、そのoutputsパラメーターを選択します。この操作により、2 つのノード間に依存関係が自動的に作成されます。 -
for-each ループ本体で、[内部ノードの作成] をクリックし、
Shellノードを作成します。実際のシナリオでは、任意のタイプのノードを設定できます。
-
新しい Shell ノードをダブルクリックし、コード内で組み込み変数を使用してループに関する情報を取得し、出力します:
#!/bin/bash # ${dag.loopTimes} を使用して現在のループ回数を取得 echo "Current loop number is: ${dag.loopTimes}" # ${dag.foreach.current} を使用して現在の反復処理のデータ項目を取得 echo "Current item is: ${dag.foreach.current}" -
(オプション) 右側の Scheduling Settings パネルで、Scheduling Policy の下のプロパティを設定します。
-
Maximum Number of Loops:デフォルトは 128 で、最大は 1024 です。
重要このパラメーターは、ループ本体の最大反復回数を決定します。上流のデータ項目数が多い場合は、すべての項目が処理されるようにこの値を増やしてください。
-
Execute Policy:この例では Serial を選択します。
-
Serial:反復処理を順次実行します。
-
Parallel:ループの反復処理を同時に実行して、タスクの効率を向上させます。並列モードでは、1 つの反復処理が失敗しても、他の反復処理には影響しません。スケジューラは、すべての反復処理を完了させようとします。デフォルトの同時実行数は 5 で、最大は 20 です。
-
-
-
Shell ノードを保存します。
-
-
デプロイ、実行、検証
ワークフローをオペレーションセンターにデプロイして実行し、for-each ノードの結果を検証します。
-
メインのワークフローキャンバスに戻り、ツールバーの [デプロイ] ボタンをクリックして、ワークフロー全体を公開します。
-
に移動し、対象のワークフローでスモークテストを実行します。
重要for-each ノードを個別にスモークテストしないでください。for-each ノードは上流の代入ノードの出力に依存するため、データリネージが完全であることを確認するために、テストを代入ノードから開始する必要があります。
-
テストインスタンスが正常に実行された後、リストから for-each ノードインスタンスを見つけて開き、右クリックして [内部ノードの表示] を選択します。
-
内部ノードビューで、各ループによって生成された Shell ノードインスタンスを確認します。任意のインスタンスの実行ログを開いて、その反復処理の出力を表示し、出力が正しいことを確認します。
左側のパネルには、4 つすべてのループ反復が完了したことが示されています。4 回目の反復処理の実行ログには、
Current loop number is: 4とCurrent item is: 40が出力され、Shell コマンドはコード 0 で終了し、正常に実行されたことを示しています。
-
ユースケース:異なるデータ形式の処理
シナリオ 1:1 次元配列の処理
-
代入ノードの出力:2025-11-01,2025-11-02,2025-11-03
-
反復回数:3
-
2 回目の反復処理中:
-
${dag.foreach.current}の値は2025-11-02です。 -
${dag.loopTimes}の値は2です。
-
シナリオ 2:2 次元配列の処理
-
代入ノード (MaxCompute SQL) の出力:
+-----+----------+ | id | city | +-----+----------+ | 101 | beijing | | 102 | shanghai | +-----+----------+ -
反復回数:2
-
2 回目の反復処理中:
-
${dag.foreach.current}の値は102,shanghaiです。 -
${dag.loopTimes}の値は2です。 -
${dag.foreach.current[0]}の値は102です。 -
${dag.foreach.current[1]}の値はshanghaiです。
-
ユースケース:複数の業務ラインに対するバッチ処理
この例では、代入ノードと for-each ノードを使用して、複数の業務ラインのユーザー行動データをバッチ処理する方法を示します。これにより、単一のロジックセットが複数のプロダクトラインに対応する自動データ処理が可能になります。
背景情報
ある大企業でデータエンジニアとして、E コマース (ecom)、金融 (finance)、物流 (logistics) という 3 つの中核業務ラインのデータ処理を担当しており、将来的にはさらに追加される可能性があると仮定します。これらの業務ラインのユーザー行動ログに毎日同じ集約ロジックを適用して、各ユーザーの1 日あたりのページビュー (PV) を計算し、その結果を統一された集計テーブルに保存する必要があります。
-
上流ソーステーブル (DWD レイヤー):
-
dwd_user_behavior_ecom_d:E コマースユーザー行動テーブル。 -
dwd_user_behavior_finance_d:金融ユーザー行動テーブル。 -
dwd_user_behavior_logistics_d:物流ユーザー行動テーブル。 -
dwd_user_behavior_${line_of_business}_d:その他の潜在的な業務ラインのユーザー行動テーブル。 -
これらのテーブルは同じスキーマを持ち、日 (
dt) でパーティション化されています。
-
-
下流送信先テーブル (DWS レイヤー):
-
dws_user_summary_d:ユーザー集計テーブル。 -
このテーブルは、業務ライン (
biz_line) と日 (dt) でパーティション化されており、すべての業務ラインからの集約結果を保存します。
-
for-each ノードを使用すると、1 つの処理ロジックセットを維持するだけで、システムが自動的にすべての業務ラインを反復処理して計算を完了します。
データ準備
まず、サンプルテーブルを作成し、テストデータを挿入します。この例では、ビジネス日付 20251010 を使用します。
-
ワークスペースに MaxCompute 計算リソースを関連付けます。
-
Data Studio に移動し、MaxCompute SQL ノードを作成します。
-
ソーステーブル (DWD レイヤー) を作成します。MaxCompute SQL ノードで、次のコードを追加し、選択して実行します。
-- E コマースユーザー行動テーブル CREATE TABLE IF NOT EXISTS dwd_user_behavior_ecom_d ( user_id STRING COMMENT 'ユーザー ID', action_type STRING COMMENT '行動タイプ', event_time BIGINT COMMENT 'イベント時間 (ミリ秒レベルの UNIX タイムスタンプ)' ) COMMENT 'E コマースユーザー行動ログ詳細テーブル' PARTITIONED BY (dt STRING COMMENT '日付パーティション (yyyymmdd 形式)'); INSERT OVERWRITE TABLE dwd_user_behavior_ecom_d PARTITION (dt='20251010') VALUES ('user001', 'click', 1760004060000), -- 2025-10-10 10:01:00.000 ('user002', 'browse', 1760004150000), -- 2025-10-10 10:02:30.000 ('user001', 'add_to_cart', 1760004300000); -- 2025-10-10 10:05:00.000 -- E コマースユーザー行動テーブルが作成されたことを確認します。 SELECT * FROM dwd_user_behavior_ecom_d where dt='20251010'; -- 金融ユーザー行動テーブル CREATE TABLE IF NOT EXISTS dwd_user_behavior_finance_d ( user_id STRING COMMENT 'ユーザー ID', action_type STRING COMMENT '行動タイプ', event_time BIGINT COMMENT 'イベント時間 (ミリ秒レベルの UNIX タイムスタンプ)' ) COMMENT '金融ユーザー行動ログ詳細テーブル' PARTITIONED BY (dt STRING COMMENT '日付パーティション (yyyymmdd 形式)'); INSERT OVERWRITE TABLE dwd_user_behavior_finance_d PARTITION (dt='20251010') VALUES ('user003', 'open_app', 1760020200000), -- 2025-10-10 14:30:00.000 ('user003', 'transfer', 1760020215000), -- 2025-10-10 14:30:15.000 ('user003', 'check_balance', 1760020245000), -- 2025-10-10 14:30:45.000 ('user004', 'open_app', 1760020300000); -- 2025-10-10 14:31:40.000 -- 金融ユーザー行動テーブルが作成されたことを確認します。 SELECT * FROM dwd_user_behavior_finance_d where dt='20251010'; -- 物流ユーザー行動テーブル CREATE TABLE IF NOT EXISTS dwd_user_behavior_logistics_d ( user_id STRING COMMENT 'ユーザー ID', action_type STRING COMMENT '行動タイプ', event_time BIGINT COMMENT 'イベント時間 (ミリ秒レベルの UNIX タイムスタンプ)' ) COMMENT '物流ユーザー行動ログ詳細テーブル' PARTITIONED BY (dt STRING COMMENT '日付パーティション (yyyymmdd 形式)'); INSERT OVERWRITE TABLE dwd_user_behavior_logistics_d PARTITION (dt='20251010') VALUES ('user001', 'check_status', 1760032800000), -- 2025-10-10 18:00:00.000 ('user005', 'schedule_pickup', 1760032920000); -- 2025-10-10 18:02:00.000 -- 物流ユーザー行動テーブルが作成されたことを確認します。 SELECT * FROM dwd_user_behavior_logistics_d where dt='20251010'; -
送信先テーブル (DWS レイヤー) を作成します。MaxCompute SQL ノードで、次のコードを追加し、選択して実行します。
CREATE TABLE IF NOT EXISTS dws_user_summary_d ( user_id STRING COMMENT 'ユーザー ID', pv BIGINT COMMENT '日次ページビュー' ) COMMENT 'ユーザー日次ページビュー集計テーブル' PARTITIONED BY ( dt STRING COMMENT '日付パーティション (yyyymmdd 形式)', biz_line STRING COMMENT '業務ラインパーティション (ecom, finance, logistics など)' );重要ワークスペースが標準開発環境を使用している場合、このノードを本番環境にデプロイし、データバックフィルを実行する必要があります。
ワークフローの実装
-
ワークフローを作成します。右側のスケジューリングパラメーターパネルで、スケジューリングパラメーター bizdate を前日に設定します:
$[yyyymmdd-1]。 -
ワークフローで、get_biz_list という名前の代入ノードを作成します。MaxCompute SQL 言語を使用して次のコードを記述します。このノードは、処理対象の業務ラインのリストを出力します。
-- 処理対象のすべての業務ラインを出力します。 SELECT 'ecom' AS biz_line UNION ALL SELECT 'finance' AS biz_line UNION ALL SELECT 'logistics' AS biz_line; -
for-each ノードの設定
-
ワークフローキャンバスに戻り、get_biz_list 代入ノードの下流に for-each ノードを作成します。
-
for-each ノードの設定パネルに移動します。右側のスケジューリングパネルの タブで、loopDataArray パラメーターを get_biz_list ノードの outputs にバインドします。
-
for-each ノードのループ本体で、[内部ノードの作成] をクリックして MaxCompute SQL ノードを作成し、ループ本体の処理ロジックを記述します。
説明-
このスクリプトは for-each ノードによって駆動され、各業務ラインに対して一度実行されます。
-
ランタイム時に、システムは組み込み変数 ${dag.foreach.current} を現在の業務ライン名に動的に置き換えます。期待される反復値は 'ecom'、'finance'、'logistics' です。
SET odps.sql.allow.dynamic.partition=true; INSERT OVERWRITE TABLE dws_user_summary_d PARTITION (dt='${bizdate}', biz_line) SELECT user_id, COUNT(*) AS pv, '${dag.foreach.current}' AS biz_line FROM dwd_user_behavior_${dag.foreach.current}_d WHERE dt = '${bizdate}' GROUP BY user_id; -
-
-
検証ノードの追加
ワークフローキャンバスに戻ります。for-each ノードで、[下流の作成] をクリックして MaxCompute SQL ノードを作成し、次のコードを追加します。
SELECT * FROM dws_user_summary_d WHERE dt='20251010' ORDER BY biz_line, user_id;
デプロイと結果
ワークフローを本番環境にデプロイします。 に移動し、ワークフローを見つけてスモークテストを実行します。ビジネス日付として '20251010' を選択します。
実行が完了すると、最終ノードは次の出力を生成します:
|
user_id |
pv |
dt |
biz_line |
|
user001 |
2 |
20251010 |
ecom |
|
user002 |
1 |
20251010 |
ecom |
|
user003 |
3 |
20251010 |
finance |
|
user004 |
1 |
20251010 |
finance |
|
user001 |
1 |
20251010 |
logistics |
|
user005 |
1 |
20251010 |
logistics |
メリット
-
高いスケーラビリティ:新しい業務ラインを追加するには、処理ロジックを変更することなく、代入ノードに 1 行の SQL を追加するだけです。
-
容易なメンテナンス:すべての業務ラインが同じ処理ロジックを共有します。1 か所での変更がすべてに適用されます。
よくある質問
-
Q:なぜ Data Studio で for-each ノードを直接実行してテストできないのですか?
A:これは仕様によるものです。ノードはノードコンテキストとその依存関係を解決するために完全なスケジューリング環境を必要とするため、Data Studio での直接実行はサポートされていません。タスクをオペレーションセンターにデプロイし、バックフィルを使用するか、スケジュールされた実行をトリガーしてテストする必要があります。
-
Q:個別の for-each ノードのスモークテストが失敗するか、何も実行されないのはなぜですか?
A: for-each ノードのループデータは、その
loopDataArray入力パラメーターから取得されます。このパラメーターには、上流の 代入ノード のoutputsパラメーターへの バインド が必要です。 for-each ノードを単独で実行した場合、入力 結果セット を受け取ることができないため、失敗するか、スキップされます。 -
Q:ループが 1 回しか実行されないのはなぜですか?
A:これは通常、上流の代入ノードからの出力が単一の要素として解析されるために発生します。出力を確認してください:
-
1. デリミタのない単一の文字列ではありませんか?
-
2. 複数の項目を反復処理する場合は、それらがカンマ (
,) で区切られていることを確認してください。例えば、'item1,item2,item3'は 3 回のループになりますが、'item1 item2 item3'は 1 回しかループしません。
-