全部產品
Search
文件中心

DataWorks:for-each節點

更新時間:Nov 26, 2025

在資料處理工作流程中,當需要對一個列表(如檔案名稱列表、分區列表)的每一項執行相同子任務時,可使用 for-each-節點。該節點能自動遍曆上遊節點(通常為賦值節點)輸出的結果集,並為其中的每個元素重複執行內部迴圈體。該節點可避免為每一項手動建立任務的繁瑣工作,實現工作流程的動態化與自動化。

使用情境

在日常資料開發中,當需要對不同的業務單元、產品線或配置項執行相同的分析或處理邏輯時,for-each節點可以實現參數化執行。例如,當公司有多個產品線,需要為每個產品線單獨產生一份日報,此時處理邏輯完全相同,只是處理對象不同。

for-each節點能像程式設計語言中的 for 迴圈一樣,自動遍曆一個列表(如表名、分區名、檔案名稱等),並為列表中的每一項重複執行您預設好的子任務流,從而極大地提升工作流程的自動化和靈活性。

適用範圍

  • 版本限制:僅支援DataWorks標準版及以上版本。

  • 許可權限制:RAM帳號已被添加至對應工作空間中,並具有開發空間管理員角色許可權。詳情請參見為工作空間增加空間成員

工作原理

for-each-節點如同一個容器,其內部封裝一個可以自訂的子工作流程(迴圈體)。其工作機制如下:

  1. 資料輸入:for-each-節點依賴一個上遊賦值節點/其他能賦值的節點(例如,EMR Hive等),通過綁定 loopDataArray 參數,擷取其輸出的數組形式的結果集

  2. 迴圈執行:節點啟動後,會按順序遍曆結果集中的每一個元素。對於每一個元素,它都會完整地執行一次內部迴圈體(從 開始結束)。

    說明

    開始節點和結束節點不可編輯,僅用於標記整個迴圈體的開始和結束。

  3. 資料傳遞:在每次迴圈中,當前被遍曆的元素值會通過內建變數傳遞給迴圈體內的節點。內部的業務節點通過 ${dag.foreach.current} 擷取當前正在處理的資料項目。

內建變數

重要

${...} 形式變數均為DataWorks特有的模板文法,DataWorks將直接解析參數,並做靜態替換。

在 for-each 迴圈體內部的節點中,使用以下內建變數來擷取迴圈狀態和資料:

內建變數

含義

與 for 迴圈類比

${dag.loopDataArray}

擷取上遊賦值節點傳入的完整結果集

以下面的for迴圈代碼為例:

for(int i=0;i<data.length;i++) {
   print(data[i]);
}
  • ${dag.loopDataArray} 相當於data

  • ${dag.foreach.current}相當於data[i]

  • ${dag.offset}相當於i

  • ${dag.loopTimes}相當於 i+1

${dag.foreach.current}

擷取當前迴圈正在處理的資料項目。

${dag.offset}

擷取當前迴圈的位移量,從 0 開始。

${dag.loopTimes}

擷取當前是第幾次迴圈,從 1 開始。

當上遊輸出為二維數組時(如 SQL 查詢結果),還可使用以下方式精確取值:

其他變數

含義

${dag.foreach.current}

擷取當前資料行(一維數組)基於逗號,分隔後的字串。

${dag.foreach.current[n]}

擷取當前資料行(一維數組)中第 n個資料。

${dag.loopDataArray[i][j]}

擷取整個結果集中,第 i 行、第 j 列的資料。

for-each節點暫不支援雙重迴圈,此處僅做取值示範。

注意事項

  • 執行機制:迴圈支援串列執行並存執行,當每次迴圈間相互獨立時可選擇並行

  • 迴圈上限:預設最大迴圈次數為 128 次,可調整的最大迴圈次數為1024次

  • 調試限制:不支援在 Data Studio(資料開發頁面)直接運行 for-each-節點。必須將任務發布後,在營運中心通過煙霧測試 (Smoke Test)功能進行測試。

  • 執行限制:for-each-節點不支援單獨運行,包括煙霧測試 (Smoke Test)、補資料、手動運行。

  • 迴圈體內的流程式控制制:當在 for-each 迴圈體內使用分支節點時,必須確保所有分支最終都彙集到一個歸併節點,然後再串連到 結束 節點,以保證迴圈體的邏輯完整性。

操作步驟

本操作以賦值節點作為上遊節點,迴圈體中配置一個Shell節點列印結果為例,引導您完成一個完整的 for-each 任務配置:

  1. 準備上遊資料(配置賦值節點)

    建立一個賦值節點並配置其輸出,為下遊的 for-each-節點提供一個待遍曆的結果集

    1. 在工作流程中,建立賦值節點(如 assign)並將其置於 for-each-節點的上遊。

    2. 雙擊開啟賦值節點,選擇一種Python 2。例如,使用 Python 2 輸出一個包含四個元素的數組:

      此時賦值節點將會輸出[10,20,30,40]至下遊。因為賦值節點會自動將最後一行輸出基於逗號分割成數組形式作為輸出結果。
      print "10,20,30,40"
    3. 賦值節點將自動產生一個名為 outputs 的輸出參數,代表其結果集。

    4. 儲存賦值節點。

  2. 配置 for-each-節點消費資料

    配置 for-each-節點接收上遊資料,並在其迴圈體內使用這些資料。

    1. 雙擊 for-each-節點,進入其內部編排畫布。

    2. 在右側的調度配置面板,找到調度參數下的 loopDataArray 參數,單擊綁定

      image

    3. 在彈出的對話方塊中,取值來源選擇上遊賦值節點 (assign) 的 outputs 參數進行綁定。此操作會自動建立兩個節點間的依賴關係。

    4. for-each 迴圈體中,單擊建立內部節點,選擇建立一個 Shell 節點。

      實際情境中,可以配置任何節點。
    5. 雙擊開啟建立的 Shell 節點,在代碼中使用內建變數擷取並列印迴圈資訊:

      #!/bin/bash
      # 使用 ${dag.loopTimes} 擷取當前迴圈次數
      echo "Current loop number is: ${dag.loopTimes}"
      
      # 使用 ${dag.foreach.current} 擷取當前遍曆到的資料
      echo "Current item is: ${dag.foreach.current}"
    6. (可選)在右側的調度配置面板,配置調度策略下的相關屬性。

      • 最大迴圈次數:預設為128次,最大可調至1024次

        重要

        該參數決定迴圈體的最大迴圈次數。當上遊資料量較大時,請確保調整此參數以完成所有遍曆。

      • 執行策略:此處選擇串列

        • 串列:按照迴圈次數,順序運行。

        • 並行:可並發運行for-each內部迴圈,提升任務的執行效率。當配置並行後,其中某個運行批次失敗,不會影響其他批次的運行,調度運行完所有批次。預設值為5,最大支援設定為20。

          image

    7. 儲存Shell節點。

  3. 發布、運行與驗證

    將工作流程提交到營運中心執行,並驗證 for-each-節點的運行結果。

    1. 返回主工作流程畫布,單擊工具列發布按鈕,發布整個工作流程。

    2. 前往營運中心模組的任務營運 > 周期任務營運 > 周期任務,對目標工作流程進行煙霧測試 (Smoke Test)

      重要

      請勿單獨對 for-each-節點執行冒煙。由於 for-each-節點依賴上遊賦值節點的輸出,您必須從賦值節點開始測試,以確保資料鏈路的完整性。

    3. 等待測試執行個體運行成功後,在執行個體列表中找到 for-each-節點的執行個體並單擊開啟,右鍵選擇查看內部節點

      image

    4. 在內部節點視圖中,查看每次迴圈產生的 Shell 節點執行個體。開啟任一執行個體的作業記錄,即可查看該次迴圈的輸出結果,驗證其是否符合預期。

      image

用例:處理不同資料格式

情境1:處理一維數組(Shell/Python 輸出)

  • 賦值節點輸出:2025-11-01,2025-11-02,2025-11-03

  • 遍曆次數:3 次。

  • 第2次迴圈時

    • ${dag.foreach.current} 的值為 2025-11-02

    • ${dag.loopTimes} 的值為 2

情境2:處理二維數組(SQL 輸出)

  • 賦值節點 (MaxCompute SQL) 輸出

    +-----+----------+
    | id  | city     |
    +-----+----------+
    | 101 | beijing  |
    | 102 | shanghai |
    +-----+----------+
  • 遍曆次數:2 次。

  • 第2次迴圈時

    • ${dag.foreach.current} 的值為 102,shanghai

    • ${dag.loopTimes} 的值為 2

    • ${dag.foreach.current[0]} 的值為 102

    • ${dag.foreach.current[1]} 的值為 shanghai

情境實踐:批量處理多業務線分區表資料

本樣本展示如何使用賦值節點for-each-節點批量處理多個業務線的使用者行為資料,實現一套處理邏輯服務多條產品線的自動化資料處理。

業務背景

假設您是一家綜合性互連網公司的資料開發工程師,負責處理三個核心業務線的資料:電商(ecom)、金融(finance)和物流(logistics),且後續存在增加業務線的可能。您需要每天對這三個業務線的使用者行為日誌執行相同的彙總邏輯,計算每個使用者的日活躍度(PV),並將結果存入統一的匯總表。

  • 上遊源表(DWD層):

    • dwd_user_behavior_ecom_d:電商使用者行為表。

    • dwd_user_behavior_finance_d :金融使用者行為表。

    • dwd_user_behavior_logistics_d :物流使用者行為表。

    • dwd_user_behavior_${業務線}_d :後續更多可能的業務線使用者行為表。

    • 這些表結構相同,都按天分區(dt)。

  • 下遊目標表(DWS):

    • dws_user_summary_d :使用者匯總表。

    • 該表按業務線(biz_line)和天(dt)雙重分區,用於統一儲存所有業務線的彙總結果。

若為每個業務線建立獨立的任務,維護成本高且容易出錯。使用 for-each-節點後,只需維護一份處理邏輯,系統會自動遍曆所有業務線完成計算。

資料準備

首先,建立樣本表並插入測試資料(以業務日期 20251010 為例):

  1. 為工作空間綁定MaxCompute計算資源

  2. 進入Data Studio資料開發,建立MaxCompute SQL節點。

  3. 建立源表(DWD層):在MaxCompute SQL節點添加如下代碼,並選中運行。

    -- 電商使用者行為表
    CREATE TABLE IF NOT EXISTS dwd_user_behavior_ecom_d (
        user_id     STRING COMMENT '使用者ID',
        action_type STRING COMMENT '行為類型',
        event_time  BIGINT COMMENT '事件發生的毫秒級Unix時間戳記'
    ) 
    COMMENT '電商使用者行為日誌明細表'
    PARTITIONED BY (dt STRING COMMENT '日期分區,格式 yyyymmdd');
    
    INSERT OVERWRITE TABLE dwd_user_behavior_ecom_d PARTITION (dt='20251010') VALUES
    ('user001', 'click',        1760004060000), -- 2025-10-10 10:01:00.000
    ('user002', 'browse',       1760004150000), -- 2025-10-10 10:02:30.000
    ('user001', 'add_to_cart',  1760004300000); -- 2025-10-10 10:05:00.000
    -- 驗證電商使用者行為表建立成功
    SELECT * FROM dwd_user_behavior_ecom_d where dt='20251010';
    
    -- 金融使用者行為表
    CREATE TABLE IF NOT EXISTS dwd_user_behavior_finance_d (
        user_id     STRING COMMENT '使用者ID',
        action_type STRING COMMENT '行為類型',
        event_time  BIGINT COMMENT '事件發生的毫秒級Unix時間戳記'
    ) 
    COMMENT '金融使用者行為日誌明細表'
    PARTITIONED BY (dt STRING COMMENT '日期分區,格式 yyyymmdd');
    
    INSERT OVERWRITE TABLE dwd_user_behavior_finance_d PARTITION (dt='20251010') VALUES
    ('user003', 'open_app',      1760020200000), -- 2025-10-10 14:30:00.000
    ('user003', 'transfer',      1760020215000), -- 2025-10-10 14:30:15.000
    ('user003', 'check_balance', 1760020245000), -- 2025-10-10 14:30:45.000
    ('user004', 'open_app',      1760020300000); -- 2025-10-10 14:31:40.000
    -- 驗證金融使用者行為表建立成功
    SELECT * FROM dwd_user_behavior_finance_d where dt='20251010';
    
    -- 物流使用者行為表
    CREATE TABLE IF NOT EXISTS dwd_user_behavior_logistics_d (
        user_id     STRING COMMENT '使用者ID',
        action_type STRING COMMENT '行為類型',
        event_time  BIGINT COMMENT '事件發生的毫秒級Unix時間戳記'
    ) 
    COMMENT '物流使用者行為日誌明細表'
    PARTITIONED BY (dt STRING COMMENT '日期分區,格式 yyyymmdd');
    
    INSERT OVERWRITE TABLE dwd_user_behavior_logistics_d PARTITION (dt='20251010') VALUES
    ('user001', 'check_status',    1760032800000), -- 2025-10-10 18:00:00.000
    ('user005', 'schedule_pickup', 1760032920000); -- 2025-10-10 18:02:00.000
    
    -- 驗證物流使用者行為表建立成功
    SELECT * FROM dwd_user_behavior_logistics_d where dt='20251010';
  4. 建立目標表(DWS層):在MaxCompute SQL節點上添加如下代碼,並選中運行。

    CREATE TABLE IF NOT EXISTS dws_user_summary_d (
        user_id     STRING COMMENT '使用者ID',
        pv          BIGINT COMMENT '日活躍度',
    ) 
    COMMENT '使用者日活躍度匯總表'
    PARTITIONED BY (
        dt           STRING COMMENT '日期分區,格式 yyyymmdd',
        biz_line     STRING COMMENT '業務線分區,如 ecom, finance, logistics'
    );
    重要

    若工作空間使用標準環境,需要將此節點發布至生產環境,並執行補資料。

工作流程實現

  1. 建立一個工作流程,並在右側調度參數設定調度參數bizdate為上一天$[yyyymmdd-1]

    image

  2. 在工作流程中,建立名為get_biz_list的賦值節點,並使用MaxCompute SQL語言編寫如下代碼。該節點輸出需要處理的業務線列表:

    -- 輸出所有需要處理的業務線
    SELECT 'ecom' AS biz_line
    UNION ALL
    SELECT 'finance' AS biz_line
    UNION ALL
    SELECT 'logistics' AS biz_line;
  3. 配置 for-each-節點

    • 回到工作流程介面,給賦值節點get_biz_list建立一個下遊的for-each節點。

    • 進入for-each節點設定介面,在右側調度配置調度參數 > 指令碼參數,將 loopDataArray 參數綁定到get_biz_list節點的outputs

      image

    • 在for-each節點迴圈體內單擊建立內部節點,建立 MaxCompute SQL節點,編寫迴圈體內的處理邏輯。

      說明
      • 該指令碼由 for-each-節點驅動,會針對每個業務線執行一次。

      • 內建變數 ${dag.foreach.current} 在每次運行時,會動態地替換為當前的業務線名稱。預期的迭代值為:'ecom', 'finance', 'logistics'。

      SET odps.sql.allow.dynamic.partition=true;
      
      INSERT OVERWRITE TABLE dws_user_summary_d PARTITION (dt='${bizdate}', biz_line)
      SELECT
          user_id,
          COUNT(*) AS pv,
          '${dag.foreach.current}' AS biz_line
      FROM
          dwd_user_behavior_${dag.foreach.current}_d
      WHERE
          dt = '${bizdate}'
      GROUP BY
          user_id;
  4. 添加驗證節點

    回到工作流程,在for-each節點單擊建立下遊,建立一個MaxCompute SQL節點並添加如下代碼。

    SELECT * FROM dws_user_summary_d WHERE dt='20251010' ORDER BY biz_line, user_id;

發布和運行結果

將工作流程發布至生產環境,前往營運中心的周期任務營運 > 周期任務,找到目標工作流程並執行煙霧測試 (Smoke Test),選擇業務日期為'20251010'

完成運行後,在測試執行個體中查看作業記錄。最終節點預期輸出為:

user_id

pv

dt

biz_line

user001

2

20251010

ecom

user002

1

20251010

ecom

user003

3

20251010

finance

user004

1

20251010

finance

user001

1

20251010

logistics

user005

1

20251010

logistics

方案優勢

  • 高擴充性:新增業務線時,只需在賦值節點添加一行 SQL,無需修改處理邏輯。

  • 易維護:所有業務線共用同一套處理邏輯,修改一處即可生效於全部。

常見問題

  • Q:為什麼無法在 Data Studio 直接運行 for-each-節點進行測試?

    A:此為設計限制。該節點需要在完整的調度環境中解析節點上下文和依賴關係,因此不支援在 Data Studio 中直接運行調試。必須將任務發布營運中心,通過補資料或周期調度的方式進行測試。

  • Q:為什麼單獨對for-each-節點煙霧測試 (Smoke Test)會失敗或不執行任何操作?

    A:for-each-節點的迴圈資料來源於其 loopDataArray 輸入參數,該參數需綁定上遊賦值節點outputs 參數。如果單獨運行 for-each-節點,它將因無法擷取輸入結果集而跳過執行或執行失敗。

  • Q:我的迴圈為什麼只執行了一次? 

    A:這通常是因為上遊賦值節點的輸出結果被解析為了單個元素。請檢查您的輸出:

    • 1. 是否是一個不含分隔字元的單一字串?

    • 2. 如果期望遍曆多項,請確保它們由英文逗號 (,) 分隔。例如,'item1,item2,item3' 會迴圈三次,而 'item1 item2 item3' 只會迴圈一次。