與按照預定時間表(如每天淩晨1點)啟動並執行周期工作流程不同,觸發式工作流程是一種按需執行、事件驅動的資料處理模式。它的運行由外部訊號(如檔案上傳、訊息到達、API調用或手動點擊)即時觸發,為資料處理提供極高的即時性和靈活性。
|
特性 |
周期工作流程 |
觸發式工作流程 |
|
驅動方式 |
固定時間(Crontab運算式) |
外部訊號(事件、API、手動) |
|
執行模式 |
計劃性、可預測 |
響應式、按需執行 |
|
適用情境 |
T+1批量數倉、定時報表 |
檔案到達即處理、與業務系統整合、手動資料修複 |
|
核心優勢 |
穩定性、周期性保障 |
即時性、靈活性 |
支援的觸發方式
觸發式工作流程支援以下三種觸發方式,您可以根據業務情境靈活選擇:
|
觸發方式 |
觸發方 |
核心情境 |
關鍵點 |
|
事件觸發 |
外來事件源(如OSS、Kafka) |
事件驅動ETL:檔案落地即處理、訊息驅動Realtime Compute。 |
需要先建立觸發器並與工作流程關聯。僅在生產環境生效。 |
|
手動觸發 |
使用者(開發人員/營運人員) |
臨時任務:一次性資料處理或分析。 |
開發環境、生產環境均可手動運行。是手動商務程序的推薦替代方案。 |
|
API觸發 |
外部系統(通過OpenAPI) |
系統整合:被業務系統(如CRM、ERP)回調,觸發資料加工。 |
需要調用OpenAPI,並具備相應許可權。 |
快速入門:建立手動觸發工作流程
本章節將引導您建立一個最簡單的觸發式工作流程,並通過手動方式運行它,快速體驗完整流程。
步驟一:建立觸發式工作流程
進入DataWorks工作空間列表頁,在頂部切換至目標地區,找到目標工作空間,單擊操作列的,進入Data Studio。
-
在左側導覽列單擊
,然後在專案目錄右側單擊,進入建立工作流程頁面。 -
在彈出的對話方塊中,在建立工作流程頁面,選擇調度類型為觸發式調度。輸入工作流程名稱,單擊確認建立。
步驟二:編排工作流程並開發節點
-
點擊工具列上方的+ 添加節點,開啟節點列表。從左側節點類型列表中,拖拽一個 Shell 節點到畫布中,輸入名稱完成建立。
-
雙擊Shell節點,進入代碼編輯頁面,輸入以下代碼:
echo "Hello, Trigger Workflow! Current time is ${bizdate}" -
單擊工具列的儲存按鈕。
步驟三:調試運行(開發環境)
-
返回工作流程畫布,單擊頂部工具列的
表徵圖。 -
在彈出的對話方塊中,填寫工作流程的本次運行值(例如今天20260310,
bizdate應該被替換為20260309)。 -
稍後,在下方的作業記錄中,您可以看到節點的運行狀態和
echo命令的輸出結果。
步驟四:發布並運行(生產環境)
-
在工作流程畫布上方,單擊發布
按鈕,根據提示完成發布流程。 -
發布成功後,進入 營運中心 > 手動任務營運 > 手動任務 > 觸發式工作流程。
-
找到您剛發布的工作流程,單擊操作列的運行。
-
在彈出的視窗中再次單擊運行,即可在生產環境觸發一次該工作流程的執行個體。您可以在手動執行個體頁面查看本次啟動並執行詳情。
至此,您已經成功掌握最基礎的觸發式工作流程使用方法。接下來,我們將深入探索更強大的事件觸發能力。
進階案例:建立事件觸發式工作流程
情境一:OSS新檔案到達,自動觸發資料處理
目標:當有新的CSV檔案上傳到OSS指定目錄時,自動觸發一個工作流程,列印出該檔案的路徑。
步驟一:建立OSS觸發器
-
進入 營運中心 > 調度設定 > 觸發器管理。
-
單擊 建立觸發器,並按如下配置:
說明詳細參數介紹,請參見OSS觸發器。
-
觸發器名稱:自訂,如
oss_new_file_trigger。 -
適用工作空間:選擇作用的目標工作空間,即工作流程所在的工作空間。
-
觸發事件類型:選擇
Object Storage Service。 -
觸發事件:選擇
oss:ObjectCreated:PutObject(或其他上傳事件)。 -
Bucket名稱:選擇您的OSS Bucket。
-
檔案名稱:指定監聽的檔案路徑和格式,支援萬用字元。例如,監聽
input/目錄下所有.csv檔案,可填寫input/*.csv。 -
角色配置:首次使用需進行一鍵授權,選擇授權產生的名稱為
DataWorks-EventBridge-OSS-MNS-Role-*************角色。*************表示隨機產生一串的13位id數字,用於作唯一性區別。
-
-
單擊確認,完成觸發器建立。
步驟二:建立並關聯工作流程
-
按照快速入門:建立手動觸發工作流程的步驟,建立一個名為
process_oss_file_workflow的觸發式工作流程。 -
在工作流程畫布的右側面板,選擇 調度配置 > 調度策略。
-
在觸發器下拉框中,選擇剛剛建立的
oss_new_file_trigger。
步驟三:開發節點,解析事件參數
-
點擊工具列上方的+ 添加節點,開啟節點列表。從左側節點類型列表中,拖拽一個 Shell 節點到畫布中,輸入名稱完成建立。
-
雙擊節點,編寫代碼以擷取並列印觸發事件中的檔案路徑。
# 當觸發器觸發工作流程時,事件資訊會通過內建變數 workflow.triggerMessage 傳入 # 我們可以通過 ${workflow.triggerMessage.data.oss.object.key} 擷取到上傳檔案的完整路徑 echo "========= Start Processing OSS File =========" message="${workflow.triggerMessage}" echo "Raw Value: ${message}" # 從事件訊息中提取檔案名稱 FILE_PATH="${workflow.triggerMessage.data.oss.object.key}" echo "A new file has arrived: ${FILE_PATH}" # 此處可以寫具體處理邏輯 echo "========= Finish Processing OSS File ========="說明${workflow.triggerMessage}:擷取完整的JSON格式事件訊息體。OSS具體的訊息格式可在EventBridge的事件匯流排 >
DATAWORKS_TRIGGER_FOR_BUCKET_<OSS_Bucket_Name>> 事項追蹤> 事項詳情中擷取。
步驟四:調試與發布
-
調試:
-
返回工作流程畫布,單擊運行
按鈕。 -
在觸發器訊息體輸入框中,粘貼一段類比的OSS事件JSON。您可以從觸發器配置頁的“訊息格式樣本”複製並修改
key的值。以下為簡單樣本。{ "data": { "oss":{ "object": { "key": "input/test_file_20260310.csv" } } } } -
單擊運行,檢查日誌中是否成功列印出
input/test_file_20260310.csv。
-
-
發布:調試通過後,單擊發布按鈕,將工作流程發布到生產環境。事件觸發只有在生產環境才會生效。
步驟五:生產驗證
-
通過OSS控制台或用戶端,向您在觸發器中配置的Bucket和路徑(例如
input/目錄)上傳一個CSV檔案。 -
進入DataWorks 營運中心 > 手動任務營運 > 手動任務 > 觸發式工作流程,出現發布成功的
process_oss_file_workflow。
-
等待片刻,進入DataWorks 營運中心 > 手動任務營運 > 觸發式工作流程執行個體,一個新的工作流程執行個體被自動觸發運行。單擊查看其日誌,確認檔案路徑被正確處理。
最佳實務:等冪性設計
受網路波動等因素影響,OSS 事件可能被重複投遞。為避免資料重複處理,建議在商務邏輯中實現等冪性。常見方案為處理檔案前,先檢查一個記錄表(如MaxCompute表),以檔案的ETag或唯一路徑為標識,若已處理過則跳過。
情境二:Kafka訊息到達,驅動Realtime Compute
目標:監聽Kafka中使用者的行為日誌,當有新訊息時,觸發工作流程進行解析,並根據內容執行不同邏輯。
步驟一:建立Kafka觸發器
-
進入 營運中心 > 調度設定 > 觸發器管理,單擊建立觸發器。
-
配置如下:
-
觸發器名稱:
kafka_user_action_trigger。 -
觸發事件類型:選擇 雲訊息佇列Kafka版。
-
Kafka執行個體、Topic:選擇您要監聽的執行個體和Topic。
-
ConsumerGroupId:建議選擇快速建立,系統會自動產生一個消費組ID,避免與其他應用衝突。
-
Key(可選):可指定訊息的Key,只有Key完全符合的訊息才會觸發工作流程。
-
-
單擊確認。
步驟二:建立並關聯工作流程
-
按照快速入門:建立手動觸發工作流程的步驟,建立一個名為
handle_user_action_workflow的觸發式工作流程。 -
在工作流程畫布的右側面板,選擇調度配置 > 調度策略。
-
在 觸發器下拉框中,選擇剛建立的
kafka_user_action_trigger。
-
(重要) 考慮到訊息可能高頻到達,建議配置內部任務最大並行執行個體數,如
100,防止瞬間大量訊息拖垮調度資源。
步驟三:開發節點,解析嵌套JSON
假設Kafka訊息的value欄位是一個JSON字串,格式如下: {"user_id": "1001", "action_type": "login", "timestamp": 1688888888}。
-
點擊工具列上方的+ 添加節點,開啟節點列表。從左側節點類型列表中,拖拽一個Python節點到畫布中。
-
編寫代碼解析訊息。由於
value本身是字串,我們需要在代碼中進行二次JSON解析。import json # 1. 使用內建變數擷取 Kafka 訊息的 value 欄位,它是一個JSON字串 message_value_str = '${workflow.triggerMessage.value}' print(f'Received raw message value string: ${message_value_str}') try: # 2. 在Python代碼中將這個字串解析成JSON對象(字典) message_data = json.loads(message_value_str) user_id = message_data.get("user_id") action_type = message_data.get("action_type") print(f"Successfully parsed message. User ID: ${user_id}, Action: ${action_type}") # 3. 接下來可以根據 action_type 執行不同的商務邏輯 if action_type == 'login': # o.run_sql(f"INSERT OVERWRITE TABLE user_login_record PARTITION(ds='{bizdate}') VALUES ('{user_id}');") print("Processing login action...") elif action_type == 'purchase': print("Processing purchase action...") else: print("Unknown action type.") except json.JSONDecodeError as e: print(f"Error decoding JSON: {e}") # 異常處理邏輯,例如可以將錯誤訊息寫入一個專門的日誌表 raise e # 拋出異常,使節點運行失敗,便於排查
步驟四:調試與發布
-
調試:
-
返回工作流程畫布,單擊運行
按鈕。 -
在 觸發器訊息體 中,粘貼類比的Kafka事件。注意
value欄位是一個轉義後的JSON字串。{ "topic": "user-behavior-topic", "key": "some-key", "value": "{\"user_id\": \"1001\", \"action_type\": \"login\", \"timestamp\": 1688888888}" } -
運行並檢查日誌,確認Python節點能正確解析出
user_id和action_type。
-
-
發布:調試通過後,將工作流程發布到生產環境。
步驟五:生產驗證
-
向您配置的Kafka Topic發送一條符合格式的訊息。

-
進入DataWorks 營運中心 > 手動任務營運 > 手動任務 > 觸發式工作流程,出現發布成功的
handle_user_action_workflow。
-
在 營運中心 > 手動任務營運 > 手動執行個體 > 觸發式工作流程執行個體中,觀察是否有新的工作流程執行個體被觸發,並檢查其作業記錄。

最佳實務:並發與順序
-
並發控制:務必設定合理的最大並行執行個體數以應對訊息洪峰。
-
順序保證:DataWorks調度本身不保證訊息的嚴格順序處理。如果需要保證同一個使用者(或分區)的訊息按順序執行,您需要在業務代碼中自行實現分布式鎖(如基於Redis/MaxCompute),或者將處理邏輯下沉到保證分區有序消費的計算引擎(如Flink)。
核心設計與配置
工作流程編排
觸發式工作流程編排核心流程和周期工作流程相似,可參考節點/工作流程編排。
調度參數
在工作流程畫布右側的調度配置面板中,在此為工作流程設定全域參數,其內部所有節點均可引用。
-
引用方式:在節點代碼中,通過
${workflow.參數名}格式引用工作流程參數。 -
參數優先順序:DataWorks中的參數存在層級覆蓋關係,生效優先順序為:節點參數 > 工作流程參數。
更多參數說明,請參見參數設計與流轉。
調度策略
當多個工作流程或任務同時被觸發,導致系統資源出現瓶頸時,您可以通過優先順序和加權策略實現智能化的資源調度,確保最重要的任務優先執行。
-
保障核心業務:為核心業務的工作流程設定一個更高的優先順序,使其總能優先於其他非核心工作流程運行。
-
縮短關鍵流程耗時:在同一個工作流程執行個體內部,通過優先順序加權策略,您可以影響節點的執行順序。例如,使用向下加權策略,可以讓處於關鍵路徑上、擁有更多上遊依賴的節點獲得更高的動態權重,從而優先執行,有效縮短整個工作流程的運行時間長度。
配置項
功能說明
優先順序
定義工作流程執行個體在調度隊列中的絕對優先順序別。可選層級為1、3、5、7、8(數字越大,優先順序越高)。高優先順序的任務/工作流程總會優先於低優先順序的任務/工作流程擷取調度資源。
優先順序加權策略
定義同一優先順序下,工作流程內部各節點(Task)權重的動態計算方式。權重越高的節點將優先獲得執行機會。
-
不加權:所有節點的權重均為固定基準值。
-
向下加權:節點的權重會動態調整,其上遊依賴的節點越多,權重越高。此策略有助於DAG(有向非循環圖)中關鍵路徑上的節點優先執行。權重計算方式為:
權重初始值 + 其上遊所有節點的優先順序之和。
內部任務最大並行執行個體數
控制此工作流程在同一時間可啟動並執行最大執行個體數量,用於並發控制和資源保護。當運行中的執行個體數達到上限時,後續被觸發的新執行個體將進入等待狀態。支援設定不限制或自訂一個最大值(上限100,000)。
說明設定上限時若超過資源群組最大可承受能力,則實際的並發瓶頸將由資源群組的物理上限決定。
-
DataWorks的優先順序系統遵循層級覆蓋規則:運行時指定 > 節點級配置 > 工作流程級配置。
-
工作流程級配置 (基準):在工作流程的調度策略中配置,作為所有節點的預設設定。
-
節點級配置 (局部):在工作流程內部單個節點的調度配置 > 調度策略中為特定節點單獨設定更高優先順序,會覆蓋工作流程層級的設定。
-
運行時指定 (臨時):在營運中心手動觸發運行時,通過運行時重設優先順序開關指定的配置。該配置優先順序最高,僅對當次運行生效,不修改任何永久配置。
營運與管理
使用限制與注意事項
-
生效環境:事件觸發機制僅在工作流程發布到生產環境(營運中心)後生效。
-
節點數量:單個工作流程最多支援400個節點,建議控制在100個以內以簡化維護。
-
並發上限:最大並行執行個體數上限為100,000,但實際並發能力受您購買的調度資源群組規格限制。
-
節點級調度:在節點層級配置調度時,僅支援配置優先順序,不支援配置優先順序加權策略。
