當需要將上遊節點的查詢或輸出結果傳遞給下遊節點時,可通過賦值節點實現。賦值節點(即上遊節點)支援 MaxCompute SQL、Python 2和Shell三種語言,會自動將最後一條查詢或輸出結果賦值給節點的輸出參數(outputs),下遊節點可以通過引用該參數來擷取賦值節點的輸出結果。
適用範圍
版本限制:僅支援DataWorks標準版及以上版本。
許可權限制:RAM帳號已被添加至對應工作空間中,並具有開發或空間管理員角色許可權。詳情請參見為工作空間增加空間成員。
核心概念:參數的傳遞與引用
賦值節點的核心是參數傳遞,負責將上遊節點產生的資料傳遞給下遊節點。
上遊賦值節點:負責產生資料。它會將最後一條輸出或查詢結果自動賦值給一個名為
outputs的節點輸出參數。下遊業務節點:負責接收和使用資料。通過在節點配置中添加一個節點輸入參數(例如,
param),並讓它引用上遊節點的outputs參數,就可以在代碼中使用這些資料。
傳遞參數格式說明
賦值節點的傳遞參數格式說明如下:
賦值語言 | 傳遞參數範圍 | 傳遞參數格式 |
MaxCompute SQL | 擷取最後一行 | 將輸出結果作為一個二維數組傳遞至下遊。 |
Python 2 | 擷取最後一行 | 將輸出結果轉化成字串後基於逗號 例如,賦值節點最後一行輸出為 重要 若輸出內容本身包含逗號,需進行轉義處理。例如輸出 |
Shell | 擷取最後一行 |
操作步驟
下面以將賦值節點的結果傳遞給Shell節點為例,介紹通用的操作流程。 實際情境中,支援任意節點作為下遊節點。
配置上遊賦值節點
在目標工作流程中,建立並編輯一個賦值節點,按需選擇MaxCompute SQL、Python 2和Shell,並編寫代碼以產生需要傳遞給下遊的結果。

配置下遊Shell節點
建立Shell節點,在Shell節點的編輯頁面,引用上遊結果:

在右側調度配置中,選擇節點上下文參數頁簽。
在節點輸入參數地區,單擊添加參數。
在彈出的對話方塊中,選擇上遊節點的輸出參數為上一步賦值節點的
outputs,並為當前節點的輸入參數自訂一個參數名稱(例如:param)。說明配置完成後,下遊節點會自動與上遊賦值節點建立依賴關係。
完成參數配置後,即可在下遊Shell節點的代碼中通過
${param}的格式來使用上遊傳遞過來的值。
運行驗證
返回工作流程,單擊工具列上方的發布,選擇全量發布。
前往營運中心模組的,對目標工作流程進行煙霧測試 (Smoke Test)。
在測試執行個體中,查看最終運行結果是否符合預期。
注意事項
傳遞層級:賦值節點參數只能傳遞給直接下遊的一層子節點,不支援跨層級的節點傳遞。
傳遞大小限制:傳遞值最大為2MB。如果指派陳述式的輸出結果超過該限制,賦值節點會運行失敗。
文法限制:
賦值節點代碼中不支援添加註釋,否則可能導致運行結果異常。
MaxCompute SQL模式下暫不支援WITH文法。
使用樣本:分語言詳解
不同語言的賦值節點,其輸出結果(outputs)的資料格式和下遊節點的引用方式略有不同。下面以Shell為下遊節點,分別舉例說明。
樣本一:傳遞 MaxCompute SQL 查詢結果
SQL的查詢結果會作為一個二維數組傳遞給下遊。
上遊節點(賦值節點-SQL)配置
假設SQL代碼如下,查詢返回兩行兩列的資料:
SELECT 'beijing', '1001' UNION ALL SELECT 'hangzhou', '1002';下遊節點(Shell節點)配置和輸出
在Shell節點中添加名為
region的輸入參數,並引用上遊SQL節點的outputs。編寫如下代碼讀取資料:
echo "整個結果集: ${region}" echo "第一行: ${region[0]}" echo "第一行第二個欄位: ${region[0][1]}"DataWorks將直接解析參數,並做靜態替換。運行輸出如下:
整個結果集: beijing,1001 hangzhou,1002 第一行: beijing,1001 第一行第二個欄位: 1001
樣本二:傳遞 Python 2 輸出結果
Python 2的print語句輸出結果會基於逗號,分割,並作為一個一維數組傳遞給下遊。
上遊節點(賦值節點-Python 2)配置
Python 2代碼如下:
print 'Electronics, Clothing, Books';下遊節點(Shell節點)配置和輸出
在Shell節點中添加名為
types的輸入參數,並引用上遊賦值節點的outputs。編寫如下代碼讀取資料:
# 直接輸出整個一維數組 echo "整個結果集: ${types}" # 按索引輸出數組中的元素 echo "第二個元素: ${types[1]}"DataWorks將直接解析參數,並做靜態替換。運行輸出如下:
整個結果集: Electronics,Clothing,Books 第二個元素: Clothing
Shell節點處理邏輯和Python 2類似,不再重複說明。
情境實踐:批量處理多業務線分區表資料
本樣本展示如何使用賦值節點和for-each-節點批量處理多個業務線的使用者行為資料,實現一套處理邏輯服務多條產品線的自動化資料處理。
業務背景
假設您是一家綜合性互連網公司的資料開發工程師,負責處理三個核心業務線的資料:電商(ecom)、金融(finance)和物流(logistics),且後續存在增加業務線的可能。您需要每天對這三個業務線的使用者行為日誌執行相同的彙總邏輯,計算每個使用者的日活躍度(PV),並將結果存入統一的匯總表。
上遊源表(DWD層):
dwd_user_behavior_ecom_d:電商使用者行為表。dwd_user_behavior_finance_d:金融使用者行為表。dwd_user_behavior_logistics_d:物流使用者行為表。dwd_user_behavior_${業務線}_d:後續更多可能的業務線使用者行為表。這些表結構相同,都按天分區(
dt)。
下遊目標表(DWS):
dws_user_summary_d:使用者匯總表。該表按業務線(
biz_line)和天(dt)雙重分區,用於統一儲存所有業務線的彙總結果。
若為每個業務線建立獨立的任務,維護成本高且容易出錯。使用 for-each-節點後,只需維護一份處理邏輯,系統會自動遍曆所有業務線完成計算。
資料準備
首先,建立樣本表並插入測試資料(以業務日期 20251010 為例):
為工作空間綁定MaxCompute計算資源。
進入Data Studio資料開發,建立MaxCompute SQL節點。
建立源表(DWD層):在MaxCompute SQL節點添加如下代碼,並選中運行。
-- 電商使用者行為表 CREATE TABLE IF NOT EXISTS dwd_user_behavior_ecom_d ( user_id STRING COMMENT '使用者ID', action_type STRING COMMENT '行為類型', event_time BIGINT COMMENT '事件發生的毫秒級Unix時間戳記' ) COMMENT '電商使用者行為日誌明細表' PARTITIONED BY (dt STRING COMMENT '日期分區,格式 yyyymmdd'); INSERT OVERWRITE TABLE dwd_user_behavior_ecom_d PARTITION (dt='20251010') VALUES ('user001', 'click', 1760004060000), -- 2025-10-10 10:01:00.000 ('user002', 'browse', 1760004150000), -- 2025-10-10 10:02:30.000 ('user001', 'add_to_cart', 1760004300000); -- 2025-10-10 10:05:00.000 -- 驗證電商使用者行為表建立成功 SELECT * FROM dwd_user_behavior_ecom_d where dt='20251010'; -- 金融使用者行為表 CREATE TABLE IF NOT EXISTS dwd_user_behavior_finance_d ( user_id STRING COMMENT '使用者ID', action_type STRING COMMENT '行為類型', event_time BIGINT COMMENT '事件發生的毫秒級Unix時間戳記' ) COMMENT '金融使用者行為日誌明細表' PARTITIONED BY (dt STRING COMMENT '日期分區,格式 yyyymmdd'); INSERT OVERWRITE TABLE dwd_user_behavior_finance_d PARTITION (dt='20251010') VALUES ('user003', 'open_app', 1760020200000), -- 2025-10-10 14:30:00.000 ('user003', 'transfer', 1760020215000), -- 2025-10-10 14:30:15.000 ('user003', 'check_balance', 1760020245000), -- 2025-10-10 14:30:45.000 ('user004', 'open_app', 1760020300000); -- 2025-10-10 14:31:40.000 -- 驗證金融使用者行為表建立成功 SELECT * FROM dwd_user_behavior_finance_d where dt='20251010'; -- 物流使用者行為表 CREATE TABLE IF NOT EXISTS dwd_user_behavior_logistics_d ( user_id STRING COMMENT '使用者ID', action_type STRING COMMENT '行為類型', event_time BIGINT COMMENT '事件發生的毫秒級Unix時間戳記' ) COMMENT '物流使用者行為日誌明細表' PARTITIONED BY (dt STRING COMMENT '日期分區,格式 yyyymmdd'); INSERT OVERWRITE TABLE dwd_user_behavior_logistics_d PARTITION (dt='20251010') VALUES ('user001', 'check_status', 1760032800000), -- 2025-10-10 18:00:00.000 ('user005', 'schedule_pickup', 1760032920000); -- 2025-10-10 18:02:00.000 -- 驗證物流使用者行為表建立成功 SELECT * FROM dwd_user_behavior_logistics_d where dt='20251010';建立目標表(DWS層):在MaxCompute SQL節點上添加如下代碼,並選中運行。
CREATE TABLE IF NOT EXISTS dws_user_summary_d ( user_id STRING COMMENT '使用者ID', pv BIGINT COMMENT '日活躍度', ) COMMENT '使用者日活躍度匯總表' PARTITIONED BY ( dt STRING COMMENT '日期分區,格式 yyyymmdd', biz_line STRING COMMENT '業務線分區,如 ecom, finance, logistics' );重要若工作空間使用標準環境,需要將此節點發布至生產環境,並執行補資料。
工作流程實現
建立一個工作流程,並在右側調度參數設定調度參數bizdate為上一天
$[yyyymmdd-1]。
在工作流程中,建立名為get_biz_list的賦值節點,並使用MaxCompute SQL語言編寫如下代碼。該節點輸出需要處理的業務線列表:
-- 輸出所有需要處理的業務線 SELECT 'ecom' AS biz_line UNION ALL SELECT 'finance' AS biz_line UNION ALL SELECT 'logistics' AS biz_line;配置 for-each-節點
回到工作流程介面,給賦值節點get_biz_list建立一個下遊的for-each節點。
進入for-each節點設定介面,在右側調度配置的,將 loopDataArray 參數綁定到get_biz_list節點的outputs。

在for-each節點迴圈體內單擊建立內部節點,建立 MaxCompute SQL節點,編寫迴圈體內的處理邏輯。
說明該指令碼由 for-each-節點驅動,會針對每個業務線執行一次。
內建變數 ${dag.foreach.current} 在每次運行時,會動態地替換為當前的業務線名稱。預期的迭代值為:'ecom', 'finance', 'logistics'。
SET odps.sql.allow.dynamic.partition=true; INSERT OVERWRITE TABLE dws_user_summary_d PARTITION (dt='${bizdate}', biz_line) SELECT user_id, COUNT(*) AS pv, '${dag.foreach.current}' AS biz_line FROM dwd_user_behavior_${dag.foreach.current}_d WHERE dt = '${bizdate}' GROUP BY user_id;
添加驗證節點
回到工作流程,在for-each節點單擊建立下遊,建立一個MaxCompute SQL節點並添加如下代碼。
SELECT * FROM dws_user_summary_d WHERE dt='20251010' ORDER BY biz_line, user_id;
發布和運行結果
將工作流程發布至生產環境,前往營運中心的,找到目標工作流程並執行煙霧測試 (Smoke Test),選擇業務日期為'20251010'。
完成運行後,在測試執行個體中查看作業記錄。最終節點預期輸出為:
user_id | pv | dt | biz_line |
user001 | 2 | 20251010 | ecom |
user002 | 1 | 20251010 | ecom |
user003 | 3 | 20251010 | finance |
user004 | 1 | 20251010 | finance |
user001 | 1 | 20251010 | logistics |
user005 | 1 | 20251010 | logistics |
方案優勢
高擴充性:新增業務線時,只需在賦值節點添加一行 SQL,無需修改處理邏輯。
易維護:所有業務線共用同一套處理邏輯,修改一處即可生效於全部。
常見問題
Q:在MaxCompute SQL語言下,報錯“find no select sql in sql assignment!”。
A:MaxCompute SQL缺少
SELECT語句,請添加SELECT語句。暫不支援WITH語句,若使用WITH語句,也會報此錯誤。Q:在Shell、Python語言下,報錯“OutPut Result is null, cannot handle!”。
A:缺少輸出,請檢查代碼中是否存在列印語句(
print、echo)。Q:在Shell、Python語言下,輸出元素本身帶有逗號,如何處理?
A :需要對逗號
,進行轉義,處理成\,。以Python為例,處理代碼如下。categories = ["Electronics", "Clothing, Shoes & Accessories"] # 對每個元素中包含的逗號進行轉義 # 將 ',' 替換為 '\,' escaped_categories = [cat.replace(",", "\,") for cat in categories] # 使用逗號串連轉義後的元素 output_string = ",".join(escaped_categories) print output_string # 最終輸出到下遊的字串是: # Electronics,Clothing\, Shoes & AccessoriesQ:下遊節點支援上遊配置多個賦值節點來接收多個結果嗎?
A :支援,只需給不同節點結果賦值不同的參數即可。

Q:賦值節點支援其他語言類型嗎?
A:賦值節點目前僅支援MaxCompute SQL、Python 2和Shell三種語言,部分節點(如EMR Hive、Hologres SQL、EMR Spark SQL、AnalyticDB for PostgreSQL、ClickHouse SQL和MySQL節點等)自身支援賦值參數功能,可實現與賦值節點相同的效果。

相關文檔
若下遊需迴圈遍曆處理,請參見for-each節點、do-while節點配合使用。
若需要跨層級傳遞參數,請參見參數節點配合使用。
參數的傳遞配置,更多請參見節點上下文參數。