全部產品
Search
文件中心

DataWorks:賦值節點

更新時間:Nov 19, 2025

當需要將上遊節點的查詢或輸出結果傳遞給下遊節點時,可通過賦值節點實現。賦值節點(即上遊節點)支援 MaxCompute SQLPython 2Shell三種語言,會自動將最後一條查詢或輸出結果賦值給節點的輸出參數(outputs),下遊節點可以通過引用該參數來擷取賦值節點的輸出結果。

適用範圍

  • 版本限制:僅支援DataWorks標準版及以上版本。

  • 許可權限制:RAM帳號已被添加至對應工作空間中,並具有開發空間管理員角色許可權。詳情請參見為工作空間增加空間成員

核心概念:參數的傳遞與引用

賦值節點的核心是參數傳遞,負責將上遊節點產生的資料傳遞給下遊節點。

  • 上遊賦值節點:負責產生資料。它會將最後一條輸出或查詢結果自動賦值給一個名為outputs節點輸出參數

  • 下遊業務節點:負責接收和使用資料。通過在節點配置中添加一個節點輸入參數(例如,param),並讓它引用上遊節點的outputs參數,就可以在代碼中使用這些資料。

傳遞參數格式說明

賦值節點的傳遞參數格式說明如下:

賦值語言

傳遞參數範圍

傳遞參數格式

MaxCompute SQL

擷取最後一行 SELECT 語句的輸出。

將輸出結果作為一個二維數組傳遞至下遊。

Python 2

擷取最後一行 print 語句的輸出。

將輸出結果轉化成字串後基於逗號,分割為一維數組

例如,賦值節點最後一行輸出為'Electronics,Clothing,Books',傳遞給下遊的格式為['Electronics','Clothing','Books']
重要

若輸出內容本身包含逗號,需進行轉義處理。例如輸出 'Electronics,Clothing\, Shoes & Accessories',下遊將正確解析為 ['Electronics', 'Clothing, Shoes & Accessories']

Shell

擷取最後一行 echo 語句的輸出。

操作步驟

下面以將賦值節點的結果傳遞給Shell節點為例,介紹通用的操作流程。 實際情境中,支援任意節點作為下遊節點。

  1. 配置上遊賦值節點

    在目標工作流程中,建立並編輯一個賦值節點,按需選擇MaxCompute SQLPython 2Shell,並編寫代碼以產生需要傳遞給下遊的結果。

    image

  2. 配置下遊Shell節點

    建立Shell節點,在Shell節點的編輯頁面,引用上遊結果:

    image

    1. 在右側調度配置中,選擇節點上下文參數頁簽。

    2. 節點輸入參數地區,單擊添加參數

    3. 在彈出的對話方塊中,選擇上遊節點的輸出參數為上一步賦值節點的outputs,並為當前節點的輸入參數自訂一個參數名稱(例如:param)。

      說明

      配置完成後,下遊節點會自動與上遊賦值節點建立依賴關係。

    4. 完成參數配置後,即可在下遊Shell節點的代碼中通過${param}的格式來使用上遊傳遞過來的值。

  3. 運行驗證

    1. 返回工作流程,單擊工具列上方的發布,選擇全量發布。

    2. 前往營運中心模組的任務營運 > 周期任務營運 > 周期任務,對目標工作流程進行煙霧測試 (Smoke Test)

    3. 在測試執行個體中,查看最終運行結果是否符合預期。

注意事項

  • 傳遞層級:賦值節點參數只能傳遞給直接下遊的一層子節點,不支援跨層級的節點傳遞。

  • 傳遞大小限制:傳遞值最大為2MB。如果指派陳述式的輸出結果超過該限制,賦值節點會運行失敗。

  • 文法限制

    • 賦值節點代碼中不支援添加註釋,否則可能導致運行結果異常。

    • MaxCompute SQL模式下暫不支援WITH文法

使用樣本:分語言詳解

不同語言的賦值節點,其輸出結果(outputs)的資料格式和下遊節點的引用方式略有不同。下面以Shell為下遊節點,分別舉例說明。

樣本一:傳遞 MaxCompute SQL 查詢結果

SQL的查詢結果會作為一個二維數組傳遞給下遊。

  • 上遊節點(賦值節點-SQL)配置

    假設SQL代碼如下,查詢返回兩行兩列的資料:

    SELECT 'beijing', '1001'
    UNION ALL 
    SELECT 'hangzhou', '1002';
  • 下遊節點(Shell節點)配置和輸出

    在Shell節點中添加名為region的輸入參數,並引用上遊SQL節點的outputs

    編寫如下代碼讀取資料:

    echo "整個結果集: ${region}"
    echo "第一行: ${region[0]}"
    echo "第一行第二個欄位: ${region[0][1]}"

    DataWorks將直接解析參數,並做靜態替換。運行輸出如下:

    整個結果集: beijing,1001
    hangzhou,1002
    第一行: beijing,1001
    第一行第二個欄位: 1001

樣本二:傳遞 Python 2 輸出結果

Python 2的print語句輸出結果會基於逗號,分割,並作為一個一維數組傳遞給下遊。

  • 上遊節點(賦值節點-Python 2)配置

    Python 2代碼如下:

    print 'Electronics, Clothing, Books';
  • 下遊節點(Shell節點)配置和輸出

    在Shell節點中添加名為types的輸入參數,並引用上遊賦值節點的outputs

    編寫如下代碼讀取資料:

    # 直接輸出整個一維數組
    echo "整個結果集: ${types}"
    
    # 按索引輸出數組中的元素
    echo "第二個元素: ${types[1]}"

    DataWorks將直接解析參數,並做靜態替換。運行輸出如下:

    整個結果集: Electronics,Clothing,Books
    第二個元素: Clothing
說明

Shell節點處理邏輯和Python 2類似,不再重複說明。

情境實踐:批量處理多業務線分區表資料

本樣本展示如何使用賦值節點for-each-節點批量處理多個業務線的使用者行為資料,實現一套處理邏輯服務多條產品線的自動化資料處理。

業務背景

假設您是一家綜合性互連網公司的資料開發工程師,負責處理三個核心業務線的資料:電商(ecom)、金融(finance)和物流(logistics),且後續存在增加業務線的可能。您需要每天對這三個業務線的使用者行為日誌執行相同的彙總邏輯,計算每個使用者的日活躍度(PV),並將結果存入統一的匯總表。

  • 上遊源表(DWD層):

    • dwd_user_behavior_ecom_d:電商使用者行為表。

    • dwd_user_behavior_finance_d :金融使用者行為表。

    • dwd_user_behavior_logistics_d :物流使用者行為表。

    • dwd_user_behavior_${業務線}_d :後續更多可能的業務線使用者行為表。

    • 這些表結構相同,都按天分區(dt)。

  • 下遊目標表(DWS):

    • dws_user_summary_d :使用者匯總表。

    • 該表按業務線(biz_line)和天(dt)雙重分區,用於統一儲存所有業務線的彙總結果。

若為每個業務線建立獨立的任務,維護成本高且容易出錯。使用 for-each-節點後,只需維護一份處理邏輯,系統會自動遍曆所有業務線完成計算。

資料準備

首先,建立樣本表並插入測試資料(以業務日期 20251010 為例):

  1. 為工作空間綁定MaxCompute計算資源

  2. 進入Data Studio資料開發,建立MaxCompute SQL節點。

  3. 建立源表(DWD層):在MaxCompute SQL節點添加如下代碼,並選中運行。

    -- 電商使用者行為表
    CREATE TABLE IF NOT EXISTS dwd_user_behavior_ecom_d (
        user_id     STRING COMMENT '使用者ID',
        action_type STRING COMMENT '行為類型',
        event_time  BIGINT COMMENT '事件發生的毫秒級Unix時間戳記'
    ) 
    COMMENT '電商使用者行為日誌明細表'
    PARTITIONED BY (dt STRING COMMENT '日期分區,格式 yyyymmdd');
    
    INSERT OVERWRITE TABLE dwd_user_behavior_ecom_d PARTITION (dt='20251010') VALUES
    ('user001', 'click',        1760004060000), -- 2025-10-10 10:01:00.000
    ('user002', 'browse',       1760004150000), -- 2025-10-10 10:02:30.000
    ('user001', 'add_to_cart',  1760004300000); -- 2025-10-10 10:05:00.000
    -- 驗證電商使用者行為表建立成功
    SELECT * FROM dwd_user_behavior_ecom_d where dt='20251010';
    
    -- 金融使用者行為表
    CREATE TABLE IF NOT EXISTS dwd_user_behavior_finance_d (
        user_id     STRING COMMENT '使用者ID',
        action_type STRING COMMENT '行為類型',
        event_time  BIGINT COMMENT '事件發生的毫秒級Unix時間戳記'
    ) 
    COMMENT '金融使用者行為日誌明細表'
    PARTITIONED BY (dt STRING COMMENT '日期分區,格式 yyyymmdd');
    
    INSERT OVERWRITE TABLE dwd_user_behavior_finance_d PARTITION (dt='20251010') VALUES
    ('user003', 'open_app',      1760020200000), -- 2025-10-10 14:30:00.000
    ('user003', 'transfer',      1760020215000), -- 2025-10-10 14:30:15.000
    ('user003', 'check_balance', 1760020245000), -- 2025-10-10 14:30:45.000
    ('user004', 'open_app',      1760020300000); -- 2025-10-10 14:31:40.000
    -- 驗證金融使用者行為表建立成功
    SELECT * FROM dwd_user_behavior_finance_d where dt='20251010';
    
    -- 物流使用者行為表
    CREATE TABLE IF NOT EXISTS dwd_user_behavior_logistics_d (
        user_id     STRING COMMENT '使用者ID',
        action_type STRING COMMENT '行為類型',
        event_time  BIGINT COMMENT '事件發生的毫秒級Unix時間戳記'
    ) 
    COMMENT '物流使用者行為日誌明細表'
    PARTITIONED BY (dt STRING COMMENT '日期分區,格式 yyyymmdd');
    
    INSERT OVERWRITE TABLE dwd_user_behavior_logistics_d PARTITION (dt='20251010') VALUES
    ('user001', 'check_status',    1760032800000), -- 2025-10-10 18:00:00.000
    ('user005', 'schedule_pickup', 1760032920000); -- 2025-10-10 18:02:00.000
    
    -- 驗證物流使用者行為表建立成功
    SELECT * FROM dwd_user_behavior_logistics_d where dt='20251010';
  4. 建立目標表(DWS層):在MaxCompute SQL節點上添加如下代碼,並選中運行。

    CREATE TABLE IF NOT EXISTS dws_user_summary_d (
        user_id     STRING COMMENT '使用者ID',
        pv          BIGINT COMMENT '日活躍度',
    ) 
    COMMENT '使用者日活躍度匯總表'
    PARTITIONED BY (
        dt           STRING COMMENT '日期分區,格式 yyyymmdd',
        biz_line     STRING COMMENT '業務線分區,如 ecom, finance, logistics'
    );
    重要

    若工作空間使用標準環境,需要將此節點發布至生產環境,並執行補資料。

工作流程實現

  1. 建立一個工作流程,並在右側調度參數設定調度參數bizdate為上一天$[yyyymmdd-1]

    image

  2. 在工作流程中,建立名為get_biz_list的賦值節點,並使用MaxCompute SQL語言編寫如下代碼。該節點輸出需要處理的業務線列表:

    -- 輸出所有需要處理的業務線
    SELECT 'ecom' AS biz_line
    UNION ALL
    SELECT 'finance' AS biz_line
    UNION ALL
    SELECT 'logistics' AS biz_line;
  3. 配置 for-each-節點

    • 回到工作流程介面,給賦值節點get_biz_list建立一個下遊的for-each節點。

    • 進入for-each節點設定介面,在右側調度配置調度參數 > 指令碼參數,將 loopDataArray 參數綁定到get_biz_list節點的outputs

      image

    • 在for-each節點迴圈體內單擊建立內部節點,建立 MaxCompute SQL節點,編寫迴圈體內的處理邏輯。

      說明
      • 該指令碼由 for-each-節點驅動,會針對每個業務線執行一次。

      • 內建變數 ${dag.foreach.current} 在每次運行時,會動態地替換為當前的業務線名稱。預期的迭代值為:'ecom', 'finance', 'logistics'。

      SET odps.sql.allow.dynamic.partition=true;
      
      INSERT OVERWRITE TABLE dws_user_summary_d PARTITION (dt='${bizdate}', biz_line)
      SELECT
          user_id,
          COUNT(*) AS pv,
          '${dag.foreach.current}' AS biz_line
      FROM
          dwd_user_behavior_${dag.foreach.current}_d
      WHERE
          dt = '${bizdate}'
      GROUP BY
          user_id;
  4. 添加驗證節點

    回到工作流程,在for-each節點單擊建立下遊,建立一個MaxCompute SQL節點並添加如下代碼。

    SELECT * FROM dws_user_summary_d WHERE dt='20251010' ORDER BY biz_line, user_id;

發布和運行結果

將工作流程發布至生產環境,前往營運中心的周期任務營運 > 周期任務,找到目標工作流程並執行煙霧測試 (Smoke Test),選擇業務日期為'20251010'

完成運行後,在測試執行個體中查看作業記錄。最終節點預期輸出為:

user_id

pv

dt

biz_line

user001

2

20251010

ecom

user002

1

20251010

ecom

user003

3

20251010

finance

user004

1

20251010

finance

user001

1

20251010

logistics

user005

1

20251010

logistics

方案優勢

  • 高擴充性:新增業務線時,只需在賦值節點添加一行 SQL,無需修改處理邏輯。

  • 易維護:所有業務線共用同一套處理邏輯,修改一處即可生效於全部。

常見問題

  • Q:在MaxCompute SQL語言下,報錯“find no select sql in sql assignment!”。

    A:MaxCompute SQL缺少SELECT語句,請添加SELECT語句。暫不支援WITH語句,若使用WITH語句,也會報此錯誤。

  • Q:在Shell、Python語言下,報錯“OutPut Result is null, cannot handle!”。

    A:缺少輸出,請檢查代碼中是否存在列印語句(printecho)。

  • Q:在Shell、Python語言下,輸出元素本身帶有逗號,如何處理?

    A :需要對逗號,進行轉義,處理成\,。以Python為例,處理代碼如下。

    categories = ["Electronics", "Clothing, Shoes & Accessories"]
    
    # 對每個元素中包含的逗號進行轉義
    # 將 ',' 替換為 '\,'
    escaped_categories = [cat.replace(",", "\,") for cat in categories]
    
    # 使用逗號串連轉義後的元素
    output_string = ",".join(escaped_categories)
    print output_string
    # 最終輸出到下遊的字串是:
    # Electronics,Clothing\, Shoes & Accessories
  • Q:下遊節點支援上遊配置多個賦值節點來接收多個結果嗎?

    A :支援,只需給不同節點結果賦值不同的參數即可。

    image

  • Q:賦值節點支援其他語言類型嗎?

    A:賦值節點目前僅支援MaxCompute SQLPython 2Shell三種語言,部分節點(如EMR HiveHologres SQLEMR Spark SQLAnalyticDB for PostgreSQLClickHouse SQLMySQL節點等)自身支援賦值參數功能,可實現與賦值節點相同的效果。

    image

相關文檔