All Products
Search
Document Center

DataWorks:For-each node

Last Updated:Mar 26, 2026

Use the for-each node to run the same processing logic on every item in a list — such as partition names, filenames, or business unit identifiers. The node reads a result set from an upstream node at runtime, iterates over each item, and executes its internal loop body once per item. Because the list is determined at runtime by the upstream node's query result, you maintain one set of processing logic regardless of how many items exist. This eliminates the need to create a separate task for each item and keeps your pipeline logic in one place.

Use cases

The for-each node fits any pattern where the same logic runs repeatedly on different data items and the list of items is determined at runtime:

  • Multi-partition processing: Run the same aggregation query on each daily partition without hardcoding partition names.

  • Multi-tenant pipelines: Apply identical transformation logic to data from multiple business units (for example, e-commerce, finance, and logistics) using a single node.

  • Dynamic file processing: Iterate over a list of filenames or table names returned by an upstream query, then process each one in turn.

Prerequisites

Before you begin, make sure you have:

  • DataWorks Standard Edition or higher

  • The Development or Workspace Manager role in the DataWorks workspace (add members to a workspace)

How it works

The for-each node acts as a container that wraps a customizable sub-workflow called the loop body.

image
  1. Data input: The for-each node depends on an upstream assignment node (or another value-assigning node, such as an EMR Hive node). It retrieves a result set in array format by binding to the loopDataArray parameter.

  2. Loop execution: After the node starts, it iterates through each element of the result set. For each element, it runs the loop body once from start to end.

    The start and end nodes are not editable. They only mark the beginning and end of the loop body.
  3. Data passing: Nodes inside the loop body access the current element's value through built-in variables such as ${dag.foreach.current}.

Built-in variables

Important

Variables in the ${...} format use a template syntax specific to DataWorks. DataWorks parses and statically replaces these parameters at runtime.

Use the following built-in variables inside the loop body to access loop state and data:

Built-in variableScopeDescription
${dag.loopDataArray}Global (entire result set)The complete result set from the upstream assignment node.
${dag.foreach.current}Current iterationThe data item being processed in the current loop.
${dag.offset}Current iterationThe zero-based index of the current loop (starts at 0).
${dag.loopTimes}Current iterationThe iteration number of the current loop (starts at 1).

These variables map directly to a standard for loop in Java:

for (int i = 0; i < data.length; i++) {
    print(data[i]);
}
Built-in variableEquivalent
${dag.loopDataArray}data
${dag.foreach.current}data[i]
${dag.offset}i
${dag.loopTimes}i + 1

Minimal example: If the upstream assignment node outputs a,b,c, the for-each node runs three iterations. In the second iteration, ${dag.foreach.current} is b, ${dag.offset} is 1, and ${dag.loopTimes} is 2.

Two-dimensional array variables

When the upstream output is a two-dimensional array (for example, a SQL query result), use these additional variables for column-level access:

VariableDescription
${dag.foreach.current}The current data row as a string, with elements separated by commas (,).
${dag.foreach.current[n]}The nth element from the current data row.
${dag.loopDataArray[i][j]}The value from the ith row and jth column of the entire result set.
The for-each node does not currently support nested loops. ${dag.loopDataArray[i][j]} is for value retrieval only.

Choose serial or parallel execution

The for-each node supports two execution modes. Choose based on whether your iterations depend on each other:

SerialParallel
How it runsOne iteration at a time, in orderMultiple iterations run concurrently
When to useIterations have dependencies, or you need a predictable output orderIterations are independent and you want faster throughput
ConcurrencyN/ADefault 5, maximum 20
Failure impactN/AA failed batch does not affect other batches; scheduling continues until all batches are complete

Configure the execution mode under Scheduling settings > Scheduling strategy > Execution mode on the node editor panel.

Limitations

LimitationWorkaround
Maximum iterations: 128 by default, up to 1,024.Increase the Maximum number of cycles setting on the Scheduling settings panel before deploying.
Cannot run directly in DataStudio: The node requires a full scheduling environment to resolve its dependencies.Deploy to Operation Center and test using data backfill or smoke testing.
Cannot run individually: Smoke testing, data backfill, and manual runs on the for-each node alone will fail or be skipped.Always start the test from the upstream assignment node to ensure loopDataArray receives its input.
Branch nodes in the loop body must converge: If you use a branch node inside the loop body, all branches must connect to a single merge node before the end node.Add a merge node to consolidate all branch paths before exiting the loop body.
Rerun behavior differs by trigger: An automatic rerun upon failure resumes from the point of failure. A manual rerun restarts the entire for-each node from the beginning.To avoid re-processing completed iterations, prefer automatic rerun where possible.
Nested loops are not supported.

Configure a for-each node

This procedure walks through creating a complete for-each workflow. The example uses an assignment node as the upstream and a Shell node in the loop body to print loop output.

Step 1: Configure the upstream assignment node

Create an assignment node that produces the result set the for-each node will iterate over.

  1. In the workflow, create an assignment node (for example, assign) and place it upstream of the for-each node.

  2. Double-click the assignment node and select a language. For example, use Python 2 to output a four-element array:

    print "10,20,30,40"

    The assignment node splits the last line of output by commas to form an array. This outputs [10, 20, 30, 40] to the downstream node.

  3. The assignment node automatically generates an output parameter named outputs to represent its result set.

  4. Save the assignment node.

Step 2: Bind the result set to the for-each node

  1. Double-click the for-each node to open its internal canvas.

  2. On the Scheduling settings panel, locate loopDataArray under Scheduling parameters and click Bind.

    image

  3. In the dialog box, set Value Source to the outputs parameter of the upstream assignment node (assign). This also establishes the dependency automatically.

Step 3: Build the loop body

  1. In the for-each loop body canvas, click Create Inner Node and select a Shell node.

    In a production scenario, use any supported node type.
  2. Double-click the Shell node and use built-in variables to access loop state:

    #!/bin/bash
    # Current iteration number (starts at 1)
    echo "Current loop number is: ${dag.loopTimes}"
    
    # Current data item
    echo "Current item is: ${dag.foreach.current}"
  3. (Optional) On the Scheduling settings panel, configure Scheduling strategy:

    • Maximum number of cycles: Defaults to 128. Increase this value if your dataset has more than 128 items (maximum 1024).

    • Execution mode: Select Serial or Parallel. For this example, select Serial. If you select Parallel, the node runs up to 5 iterations concurrently by default (maximum 20). image

  4. Save the Shell node.

Step 4: Deploy, run, and verify

  1. Return to the main workflow canvas and click Deploy on the toolbar to deploy the entire workflow to Operation Center.

  2. Go to Operation Center > Task O&M > Periodic Task and start smoke testing on the assignment node (not the for-each node).

    Important

    Start smoke testing from the assignment node. The for-each node depends on the output of the assignment node — running it in isolation means loopDataArray has no input to iterate over.

  3. After the test instance runs successfully, find the for-each node instance in the list. Right-click it and select View Inner Nodes.

    image

  4. In the inner node view, open the runtime log of any Shell node instance to verify the output for that iteration.

    image

Handle different data formats

Scenario 1: One-dimensional array (Shell or Python output)

  • Assignment node output: 2025-11-01,2025-11-02,2025-11-03

  • Number of iterations: 3

  • In the second iteration:

    • ${dag.foreach.current} = 2025-11-02

    • ${dag.loopTimes} = 2

Scenario 2: Two-dimensional array (SQL output)

  • Assignment node output (MaxCompute SQL):

    +-----+----------+
    | id  | city     |
    +-----+----------+
    | 101 | beijing  |
    | 102 | shanghai |
    +-----+----------+
  • Number of iterations: 2

  • In the second iteration:

    • ${dag.foreach.current} = 102,shanghai

    • ${dag.loopTimes} = 2

    • ${dag.foreach.current[0]} = 102

    • ${dag.foreach.current[1]} = shanghai

Batch process data from partitioned tables for multiple lines of business

This example shows how to use an assignment node and a for-each node to run the same aggregation logic across multiple lines of business.

image

Background

Assume you are a data developer at a large internet company. You process daily user behavioral data from three lines of business: e-commerce, finance, and logistics. More lines may be added in the future. Each day, you run the same aggregation — calculating page views (PV) per user — on each line's behavioral logs and write results into a shared aggregate table.

Source tables (DWD layer), partitioned by day (dt):

  • dwd_user_behavior_ecom_d

  • dwd_user_behavior_finance_d

  • dwd_user_behavior_logistics_d

  • dwd_user_behavior_${line-of-business}_d (for future lines)

Destination table (DWS layer), partitioned by biz_line and dt:

  • dws_user_summary_d

Using a for-each node, you maintain one set of processing logic. At runtime, the assignment node determines the list of lines of business, and the for-each node iterates over it automatically.

Data preparation

  1. Associate a MaxCompute computing resource with the workspace.

  2. Go to Data Studio and create a MaxCompute SQL node.

  3. Create the source tables: add the following code, select it, and run it.

    -- E-commerce user behavior table
    CREATE TABLE IF NOT EXISTS dwd_user_behavior_ecom_d (
        user_id     STRING COMMENT 'User ID',
        action_type STRING COMMENT 'Behavior type',
        event_time  BIGINT COMMENT 'Millisecond-level UNIX timestamp of the event occurrence'
    )
    COMMENT 'Details of e-commerce user behavioral logs'
    PARTITIONED BY (dt STRING COMMENT 'Date partition in yyyymmdd format');
    
    INSERT OVERWRITE TABLE dwd_user_behavior_ecom_d PARTITION (dt='20251010') VALUES
    ('user001', 'click',        1760004060000), -- 2025-10-10 10:01:00.000
    ('user002', 'browse',       1760004150000), -- 2025-10-10 10:02:30.000
    ('user001', 'add_to_cart',  1760004300000); -- 2025-10-10 10:05:00.000
    -- Verify that the e-commerce user behavior table is created.
    SELECT * FROM dwd_user_behavior_ecom_d where dt='20251010';
    
    -- Finance user behavior table
    CREATE TABLE IF NOT EXISTS dwd_user_behavior_finance_d (
        user_id     STRING COMMENT 'User ID',
        action_type STRING COMMENT 'Behavior type',
        event_time  BIGINT COMMENT 'Millisecond-level UNIX timestamp of the event occurrence'
    )
    COMMENT 'Details of finance user behavioral logs'
    PARTITIONED BY (dt STRING COMMENT 'Date partition in yyyymmdd format');
    
    INSERT OVERWRITE TABLE dwd_user_behavior_finance_d PARTITION (dt='20251010') VALUES
    ('user003', 'open_app',      1760020200000), -- 2025-10-10 14:30:00.000
    ('user003', 'transfer',      1760020215000), -- 2025-10-10 14:30:15.000
    ('user003', 'check_balance', 1760020245000), -- 2025-10-10 14:30:45.000
    ('user004', 'open_app',      1760020300000); -- 2025-10-10 14:31:40.000
    -- Verify that the finance user behavior table is created.
    SELECT * FROM dwd_user_behavior_finance_d where dt='20251010';
    
    -- Logistics user behavior table
    CREATE TABLE IF NOT EXISTS dwd_user_behavior_logistics_d (
        user_id     STRING COMMENT 'User ID',
        action_type STRING COMMENT 'Behavior type',
        event_time  BIGINT COMMENT 'Millisecond-level UNIX timestamp of the event occurrence'
    )
    COMMENT 'Details of logistics user behavioral logs'
    PARTITIONED BY (dt STRING COMMENT 'Date partition in yyyymmdd format');
    
    INSERT OVERWRITE TABLE dwd_user_behavior_logistics_d PARTITION (dt='20251010') VALUES
    ('user001', 'check_status',    1760032800000), -- 2025-10-10 18:00:00.000
    ('user005', 'schedule_pickup', 1760032920000); -- 2025-10-10 18:02:00.000
    -- Verify that the logistics user behavior table is created.
    SELECT * FROM dwd_user_behavior_logistics_d where dt='20251010';
  4. Create the destination table: add the following code, select it, and run it.

    Important

    If the workspace uses standard mode, deploy this node to the production environment and perform a data backfill.

    CREATE TABLE IF NOT EXISTS dws_user_summary_d (
        user_id     STRING COMMENT 'User ID',
        pv          BIGINT COMMENT 'Daily popularity'
    )
    COMMENT 'Daily user popularity aggregate table'
    PARTITIONED BY (
        dt       STRING COMMENT 'Date partition in yyyymmdd format',
        biz_line STRING COMMENT 'Line-of-business partition, such as ecom, finance, logistics'
    );

Workflow implementation

  1. Create a workflow. In the Scheduling pane, set the scheduling parameter bizdate to the previous day: $[yyyymmdd-1].

    image

  2. Create an assignment node named get_biz_list in the workflow. Write the following MaxCompute SQL to output the list of lines of business:

    -- Output all lines of business to process
    SELECT 'ecom' AS biz_line
    UNION ALL
    SELECT 'finance' AS biz_line
    UNION ALL
    SELECT 'logistics' AS biz_line;
  3. Configure the for-each node:

    • On the workflow canvas, create a for-each node downstream of get_biz_list.

    • On the Scheduling tab, under Scheduling Parameters > Script Parameters, bind loopDataArray to the outputs parameter of get_biz_list. image

    • In the for-each loop body, click Create Internal Node and create a MaxCompute SQL node with the following processing logic:

      SET odps.sql.allow.dynamic.partition=true;
      
      INSERT OVERWRITE TABLE dws_user_summary_d PARTITION (dt='${bizdate}', biz_line)
      SELECT
          user_id,
          COUNT(*) AS pv,
          '${dag.foreach.current}' AS biz_line
      FROM
          dwd_user_behavior_${dag.foreach.current}_d
      WHERE
          dt = '${bizdate}'
      GROUP BY
          user_id;
      Note: This script runs once per iteration. ${dag.foreach.current} is replaced at runtime with the current line-of-business name: ecom, finance, or logistics.
  4. Add a verification node: on the workflow canvas, create a MaxCompute SQL node downstream of the for-each node with the following code:

    SELECT * FROM dws_user_summary_d WHERE dt='20251010' ORDER BY biz_line, user_id;

Deploy and run

Deploy the workflow to the production environment. In Operation Center, go to Auto Triggered Node O&M > Auto Triggered Nodes, find the workflow, run a test, and select 20251010 as the data timestamp.

After the run completes, open the runtime log in the test instance. The expected output of the final node is:

user_idpvdtbiz_line
user001220251010ecom
user002120251010ecom
user003320251010finance
user004120251010finance
user001120251010logistics
user005120251010logistics

Why this approach scales: To add a new line of business, add one UNION ALL SELECT line in get_biz_list. The for-each node picks it up automatically — no changes to the processing logic required.

FAQ

Why can't I run the for-each node directly in DataStudio?

The node depends on a full scheduling environment to resolve its node context and dependencies. Deploy to Operation Center and test using data backfill or smoke testing.

Why does smoke testing the for-each node alone fail or do nothing?

The for-each node reads its iteration list from loopDataArray, which must be bound to the outputs parameter of an upstream assignment node. Without that input, the node has nothing to iterate over. Always start the test from the upstream assignment node.

Why did my loop run only once?

The upstream assignment node output is being parsed as a single element rather than an array. Check that items are separated by commas (,). For example, item1,item2,item3 produces three iterations, but item1 item2 item3 produces only one.