すべてのプロダクト
Search
ドキュメントセンター

DataWorks:for-eachノード

最終更新日:Feb 27, 2026

データ処理ワークフローでは、for-eachノードを使用して、ファイル名やパーティションのリストなど、リスト内の各項目に対して同じサブタスクを実行します。このノードは、アップストリームノード (通常は代入ノード) からの結果セットを自動的に反復処理し、各要素に対して内部ループボディを実行します。これにより、各項目に対して手動でタスクを作成する必要がなくなり、より動的で自動化されたワークフローが可能になります。

ユースケース

for-eachノードは、異なる業務部門、製品ライン、または設定項目からのデータに対して同じ分析または処理ロジックを実行するパラメーター化された実行に最適です。たとえば、会社に複数の製品ラインがあり、それぞれについて個別の日次レポートを生成する必要がある場合、処理ロジックは同じですが、ターゲットデータは異なります。

プログラミング言語のforループと同様に、for-eachノードは、項目 (テーブル名、パーティション名、ファイル名など) のリストを自動的に反復処理し、各項目に対して事前定義されたサブワークフローを実行します。これにより、ワークフローの自動化と柔軟性が大幅に向上します。

注意事項

  • エディション: DataWorks Standard Edition 以上。

  • 権限: DataWorks ワークスペースで、[開発者] または [ワークスペース管理者] ロールを持っている必要があります。詳細については、「ワークスペースにメンバーを追加する」をご参照ください。

仕組み

for-eachノードは、ループボディとして知られるカスタマイズ可能なサブワークフローをカプセル化するコンテナーとして機能します。以下のように動作します。

  1. データ入力: for-eachノードは、アップストリームの代入ノードまたは別の値割り当てノード (たとえば、EMR Hiveノード) に依存します。loopDataArrayパラメーターにバインドすることで、配列形式の結果セットを取得します。

  2. ループ実行: ノードが開始すると、結果セットの各要素を反復処理します。各要素に対して、内部ループボディを1回完全に実行します (startからendまで)。

    説明

    startノードとendノードは編集できません。これらはループボディの開始と終了を示すだけです。

  3. データ受け渡し: 各反復で、ループボディ内のノードは、${dag.foreach.current}などの組み込み変数を介して現在の要素の値にアクセスできます。

組み込み変数

重要

${...}形式の変数は、DataWorksに固有のテンプレート構文を使用します。DataWorksはこれらのパラメーターを直接解析し、静的に置き換えます。

for-eachループボディ内で、以下の組み込み変数を使用してループステータスとデータにアクセスします。

組み込み変数

説明

forループとの類似点

${dag.loopDataArray}

アップストリームの代入ノードから完全な結果セットを取得します。

以下のforループを検討してください。

for(int i=0;i<data.length;i++) {
   print(data[i]);
}
  • ${dag.loopDataArray}dataに相当します。

  • ${dag.foreach.current}data[i]に相当します。

  • ${dag.offset}iに相当します。

  • ${dag.loopTimes}i+1に相当します。

${dag.foreach.current}

現在のループで処理されているデータ項目を取得します。

${dag.offset}

現在のループのオフセットを取得します。0から始まります。

${dag.loopTimes}

現在のループの反復回数を取得します。1から始まります。

アップストリームの出力が2次元配列 (SQLクエリ結果など) の場合、以下の構文を使用して正確な値を取得することもできます。

その他の変数

説明

${dag.foreach.current}

現在のデータ行 (1次元配列) を、要素をカンマ (,) で区切った文字列として取得します。

${dag.foreach.current[n]}

現在のデータ行 (1次元配列) からn番目の要素を取得します。

${dag.loopDataArray[i][j]}

結果セット全体のi行目とj列目の値を取得します。

for-eachノードは現在、ネストされたループをサポートしていません。この構文は値の取得のみを目的としています。

制限事項

  • 実行モード: ループはシーケンシャル実行並列実行の両方をサポートしています。反復が互いに独立している場合は、並列実行を選択してください。

  • ループ制限: デフォルトでは、最大反復回数は128です。これは最大1024まで増やすことができます。

  • デバッグ制限: DataStudioでfor-eachノードを直接実行することはできません。タスクをオペレーションセンターにデプロイし、データバックフィルを実行してテストする必要があります。

  • 実行制限: for-eachノードを個別に実行することはできません。これには、スモークテストデータバックフィル、手動実行が含まれます。

  • ループボディ内の制御フロー: ループボディ内でブランチノードを使用する場合、すべてのブランチが単一のマージノードに収束してからendノードに接続するようにしてください。これにより、ループボディの論理的整合性が保証されます。

  • 再実行の制限: ノードがスケジューリングのためにデプロイされた後、障害発生時の自動再実行障害発生箇所から再開します。ただし、手動での再実行は、for-eachノード全体を最初から再起動します。

操作手順

この手順では、完全なfor-eachタスクの設定について説明します。アップストリームノードとして代入ノードを使用し、ループボディ内でShellノードを使用して結果を出力します。

  1. アップストリームデータの準備 (代入ノードの設定)

    代入ノード」を作成し、その出力を構成して、下流の for-each ノードが反復処理できるようにする 結果セット を提供します。

    1. ワークフローで、代入ノード (例: assign) を作成し、for-eachノードのアップストリームに配置します。

    2. 代入ノードをダブルクリックし、言語を選択します。たとえば、Python 2を使用して4つの要素を持つ配列を出力します。

      代入ノードは、[10,20,30,40] をダウンストリームノードに出力します。出力の最終行をカンマで自動的に分割して配列を作成します。
      print "10,20,30,40"
    3. 代入ノードは、その結果セットを表すためにoutputsという名前の出力パラメーターを自動的に生成します。

    4. 代入ノードを保存します。

  2. for-eachノードのデータ消費設定

    for-eachノードがアップストリームデータを受信し、そのループボディ内で使用するように設定します。

    1. for-eachノードをダブルクリックして、内部キャンバスに入ります。

    2. 右側の[スケジューリング設定]パネルで、[スケジューリングパラメーター]の下にあるloopDataArrayパラメーターを見つけて、[バインド]をクリックします。

      image

    3. 表示されるダイアログボックスで、[値のソース]をアップストリームの代入ノード (assign) のoutputsパラメーターに設定します。このアクションにより、依存関係が自動的に確立されます。

    4. for-eachループボディで、[内部ノードを作成]をクリックし、Shellノードを選択します。

      実際のシナリオでは、任意の種類のノードを設定できます。
    5. 新しいShellノードをダブルクリックし、コード内で組み込み変数を使用してループ情報にアクセスして出力します。

      #!/bin/bash
      # Use ${dag.loopTimes} to get the current loop number
      echo "Current loop number is: ${dag.loopTimes}"
      
      # Use ${dag.foreach.current} to get the current data item
      echo "Current item is: ${dag.foreach.current}"
    6. (オプション) 右側のスケジューリング設定パネルで、スケジューリング戦略の下にあるプロパティを設定します。

      • 最大サイクル数: デフォルトは128で、最大1024まで増やすことができます。

        重要

        このパラメーターは、ループボディが実行できる最大回数を決定します。大量のデータを処理している場合は、この値がすべての項目をカバーするのに十分高いことを確認してください。

      • [実行モード]: オプションを選択します。この例では、[シリアル] を選択します。

        • シーケンシャル: 反復を順次実行します。

        • 並列: 反復を同時に実行してタスク効率を向上させます。並列実行用に設定されている場合、1つのバッチの失敗は他のバッチに影響せず、すべてのバッチが完了するまでスケジューリングは継続します。デフォルトの同時実行数は5で、最大20です。

          image

    7. Shellノードを保存します。

  3. デプロイ、実行、検証

    ワークフローをオペレーションセンターに送信して実行し、for-eachノードの結果を検証します。

    1. メインワークフローキャンバスに戻り、ツールバーの[デプロイ]ボタンをクリックしてワークフロー全体をデプロイします。

    2. オペレーションセンター > タスクO&M > 定期タスクに移動し、ターゲットワークフローでスモークテストを実行します。

      重要

      for-eachノード単独でスモークテストを実行しないでください。for-eachノードはアップストリームの代入ノードの出力に依存するため、ノードが必要な入力データを受信することを確認するために、代入ノードからテストを開始する必要があります。

    3. テストインスタンスが正常に実行された後、リストでfor-eachノードインスタンスを見つけます。次に、それを開き、右クリックして[内部ノードを表示]を選択します。

      image

    4. 内部ノードビューで、各ループによって生成されたShellノードインスタンスを検査します。任意のインスタンスのランタイムログを開いて、その反復の出力を表示し、出力が正しいことを検証します。

      image

ユースケース: 異なるデータ形式の処理

シナリオ 1: 1 次元配列の処理 (Shell/Python 出力)

  • 代入ノード出力: 2025-11-01,2025-11-02,2025-11-03

  • 反復回数: 3

  • 2回目の反復で:

    • ${dag.foreach.current}の値は2025-11-02です。

    • ${dag.loopTimes}の値は2です。

シナリオ 2: 2 次元配列の処理 (SQL 出力)

  • 代入ノード (MaxCompute SQL) 出力:

    +-----+----------+
    | id  | city     |
    +-----+----------+
    | 101 | beijing  |
    | 102 | shanghai |
    +-----+----------+
  • 反復回数: 2

  • 2回目の反復で:

    • ${dag.foreach.current}の値は102,shanghaiです。

    • ${dag.loopTimes}の値は2です。

    • ${dag.foreach.current[0]}の値は102です。

    • ${dag.foreach.current[1]}の値はshanghaiです。

ユースケース: 複数の事業部門のパーティションテーブルからのデータの一括処理

この例では、代入ノードfor-eachノードを使用して、複数の事業部門のユーザー行動データを一括処理する方法を示します。

背景情報

あなたは大手インターネット企業のデータ開発者であると仮定します。あなたは、E コマース、金融、物流という3つの主要事業部門のデータ処理を担当しています。今後、さらに多くの事業部門が追加される可能性があります。これら3つの事業部門のユーザー行動ログに対して、毎日同じ集約ロジックを実行する必要があります。このロジックは、各ユーザーのPVを計算し、結果を統合集計テーブルに保存します。

  • アップストリームソーステーブル (DWD レイヤー):

    • dwd_user_behavior_ecom_d: E コマースユーザー行動テーブル。

    • dwd_user_behavior_finance_d: 金融ユーザー行動テーブル。

    • dwd_user_behavior_logistics_d: 物流ユーザー行動テーブル。

    • dwd_user_behavior_${line-of-business}_d: その他の潜在的な事業部門のユーザー行動テーブル。

    • これらのテーブルは同じスキーマを持ち、日 (dt) ごとにパーティション化されています。

  • ダウンストリーム宛先テーブル (DWS レイヤー):

    • dws_user_summary_d: ユーザー集計テーブル。

    • このテーブルは、事業部門 (biz_line) と日 (dt) の両方でパーティション化されています。すべての事業部門の集計結果を保存するために使用されます。

各事業部門に対して個別のタスクを作成することは、維持コストが高く、エラーが発生しやすいです。for-eachノードを使用すると、1つの処理ロジックセットを維持するだけで済みます。システムはすべての事業部門を自動的に走査して計算を完了します。

データ準備

まず、サンプルテーブルを作成し、テストデータを挿入します。この例では、データタイムスタンプ20251010を使用します。

  1. ワークスペースにMaxComputeコンピューティングリソースを関連付けます。

  2. Data Studioに移動し、MaxCompute SQLノードを作成します。

  3. ソーステーブルの作成: 以下のコードをMaxCompute SQLノードに追加し、選択して実行します。

    -- E-commerce user behavior table
    CREATE TABLE IF NOT EXISTS dwd_user_behavior_ecom_d (
        user_id     STRING COMMENT 'User ID',
        action_type STRING COMMENT 'Behavior type',
        event_time  BIGINT COMMENT 'Millisecond-level UNIX timestamp of the event occurrence'
    ) 
    COMMENT 'Details of e-commerce user behavioral logs'
    PARTITIONED BY (dt STRING COMMENT 'Date partition in yyyymmdd format');
    
    INSERT OVERWRITE TABLE dwd_user_behavior_ecom_d PARTITION (dt='20251010') VALUES
    ('user001', 'click',        1760004060000), -- 2025-10-10 10:01:00.000
    ('user002', 'browse',       1760004150000), -- 2025-10-10 10:02:30.000
    ('user001', 'add_to_cart',  1760004300000); -- 2025-10-10 10:05:00.000
    -- Verify that the e-commerce user behavior table is created.
    SELECT * FROM dwd_user_behavior_ecom_d where dt='20251010';
    
    -- Finance user behavior table
    CREATE TABLE IF NOT EXISTS dwd_user_behavior_finance_d (
        user_id     STRING COMMENT 'User ID',
        action_type STRING COMMENT 'Behavior type',
        event_time  BIGINT COMMENT 'Millisecond-level UNIX timestamp of the event occurrence'
    ) 
    COMMENT 'Details of finance user behavioral logs'
    PARTITIONED BY (dt STRING COMMENT 'Date partition in yyyymmdd format');
    
    INSERT OVERWRITE TABLE dwd_user_behavior_finance_d PARTITION (dt='20251010') VALUES
    ('user003', 'open_app',      1760020200000), -- 2025-10-10 14:30:00.000
    ('user003', 'transfer',      1760020215000), -- 2025-10-10 14:30:15.000
    ('user003', 'check_balance', 1760020245000), -- 2025-10-10 14:30:45.000
    ('user004', 'open_app',      1760020300000); -- 2025-10-10 14:31:40.000
    -- Verify that the finance user behavior table is created.
    SELECT * FROM dwd_user_behavior_finance_d where dt='20251010';
    
    -- Logistics user behavior table
    CREATE TABLE IF NOT EXISTS dwd_user_behavior_logistics_d (
        user_id     STRING COMMENT 'User ID',
        action_type STRING COMMENT 'Behavior type',
        event_time  BIGINT COMMENT 'Millisecond-level UNIX timestamp of the event occurrence'
    ) 
    COMMENT 'Details of logistics user behavioral logs'
    PARTITIONED BY (dt STRING COMMENT 'Date partition in yyyymmdd format');
    
    INSERT OVERWRITE TABLE dwd_user_behavior_logistics_d PARTITION (dt='20251010') VALUES
    ('user001', 'check_status',    1760032800000), -- 2025-10-10 18:00:00.000
    ('user005', 'schedule_pickup', 1760032920000); -- 2025-10-10 18:02:00.000
    
    -- Verify that the logistics user behavior table is created.
    SELECT * FROM dwd_user_behavior_logistics_d where dt='20251010';
  4. 宛先テーブルの作成: 以下のコードをMaxCompute SQLノードに追加し、選択して実行します。

    CREATE TABLE IF NOT EXISTS dws_user_summary_d (
        user_id     STRING COMMENT 'User ID',
        pv          BIGINT COMMENT 'Daily popularity'
    ) 
    COMMENT 'Daily user popularity aggregate table'
    PARTITIONED BY (
        dt           STRING COMMENT 'Date partition in yyyymmdd format',
        biz_line     STRING COMMENT 'Line-of-business partition, such as ecom, finance, logistics'
    );
    重要

    ワークスペースが標準モードを使用している場合、このノードを本番環境にデプロイし、データバックフィルを実行する必要があります。

ワークフローの実装

  1. ワークフローを作成します。右側の [スケジューリング] ペインで、スケジューリングパラメーター bizdate を前日の $[yyyymmdd-1] に設定します。

    image

  2. ワークフローで、get_biz_listという名前の代入ノードを作成します。MaxCompute SQLで以下のコードを記述します。このノードは、処理する事業部門のリストを出力します。

    -- Output all lines of business to be processed
    SELECT 'ecom' AS biz_line
    UNION ALL
    SELECT 'finance' AS biz_line
    UNION ALL
    SELECT 'logistics' AS biz_line;
  3. for-eachノードの設定

    • ワークフローキャンバスに戻り、代入ノードのダウンストリームにfor-eachノードを作成します。

    • for-each ノードの設定ページに移動します。右側の [スケジューリング] タブで、[スケジューリングパラメーター] > [スクリプトパラメーター] の下で、loopDataArray パラメーターの値を get_biz_list ノードの出力に設定します。

      image

    • for-each ノードのループ本文で、[内部ノードの作成] をクリックします。 MaxCompute SQL ノードを作成し、ループ本文の処理ロジックを記述します。

      説明
      • このスクリプトはfor-eachノードによって駆動され、各事業部門に対して1回実行されます。

      • 組み込み変数${dag.foreach.current}は、実行時に現在の事業部門名に動的に置き換えられます。期待される反復値はecomfinancelogisticsです。

      SET odps.sql.allow.dynamic.partition=true;
      
      INSERT OVERWRITE TABLE dws_user_summary_d PARTITION (dt='${bizdate}', biz_line)
      SELECT
          user_id,
          COUNT(*) AS pv,
          '${dag.foreach.current}' AS biz_line
      FROM
          dwd_user_behavior_${dag.foreach.current}_d
      WHERE
          dt = '${bizdate}'
      GROUP BY
          user_id;
  4. 検証ノードの追加

    ワークフローキャンバスに戻ります。for-each ノードで、[子孫ノードの作成] をクリックします。MaxCompute SQL ノードを作成し、次のコードを追加します。

    SELECT * FROM dws_user_summary_d WHERE dt='20251010' ORDER BY biz_line, user_id;

デプロイと実行

ワークフローを本番環境にデプロイします。オペレーションセンターで、自動トリガーノードO&M > 自動トリガーノードに移動し、ターゲットワークフローを見つけてテストを実行し、データタイムスタンプとして'20251010'を選択します。

実行完了後、テストインスタンスで実行ログを表示します。最終ノードの期待される出力は以下のとおりです。

user_id

pv

dt

biz_line

user001

2

20251010

ecom

user002

1

20251010

ecom

user003

3

20251010

finance

user004

1

20251010

finance

user001

1

20251010

logistics

user005

1

20251010

logistics

利点

  • 高い拡張性: 新しい事業部門が追加された場合、代入ノードにSQLコードを1行追加するだけで済みます。処理ロジックを変更する必要はありません。

  • 容易なメンテナンス: すべての事業部門が同じ処理ロジックを共有します。1箇所の変更がすべてに適用されます。

よくある質問

  • Q: DataStudioでfor-eachノードを直接テスト実行できないのはなぜですか?

    A: これは設計上の制限です。ノードは、そのノードコンテキストと依存関係を解決するために完全なスケジューリング環境を必要とします。そのため、DataStudioで直接デバッグすることはできません。タスクをオペレーションセンターにデプロイし、データバックフィルまたはスケジュール実行を使用してテストする必要があります。

  • Q: for-eachノード単独でのスモークテストが失敗したり、何も実行されないのはなぜですか?

    A: for-each ノードのループデータは、その loopDataArray 入力パラメーターから取得されますが、このパラメーターは上流の代入ノードoutputs パラメーターにバインドする必要があります。for-each ノードを単独で実行すると、入力結果セットを取得できないため、失敗するか、スキップされます。

  • Q: ループが1回しか実行されなかったのはなぜですか?

    A: これは通常、システムがアップストリームの代入ノードからの出力を単一の要素として解析した場合に発生します。出力を確認してください。

    • 1. デリミタなしの単一の文字列ですか?

    • 2. 複数の項目を反復処理することを期待する場合は、それらがカンマ (,) で区切られていることを確認してください。たとえば、'item1,item2,item3'は3回ループしますが、'item1 item2 item3'は1回しかループしません。