全部產品
Search
文件中心

DataWorks:觸發式工作流程

更新時間:Mar 17, 2026

與按照預定時間表(如每天淩晨1點)啟動並執行周期工作流程不同,觸發式工作流程是一種按需執行、事件驅動的資料處理模式。它的運行由外部訊號(如檔案上傳、訊息到達、API調用或手動點擊)即時觸發,為資料處理提供極高的即時性和靈活性。

特性

周期工作流程

觸發式工作流程

驅動方式

固定時間(Crontab運算式)

外部訊號(事件、API、手動)

執行模式

計劃性、可預測

響應式、按需執行

適用情境

T+1批量數倉、定時報表

檔案到達即處理、與業務系統整合、手動資料修複

核心優勢

穩定性、周期性保障

即時性、靈活性

支援的觸發方式

觸發式工作流程支援以下三種觸發方式,您可以根據業務情境靈活選擇:

觸發方式

觸發方

核心情境

關鍵點

事件觸發

外來事件源(如OSS、Kafka)

事件驅動ETL:檔案落地即處理、訊息驅動Realtime Compute。

需要先建立觸發器並與工作流程關聯。僅在生產環境生效

手動觸發

使用者(開發人員/營運人員)

臨時任務:一次性資料處理或分析。

開發環境、生產環境均可手動運行。是手動商務程序的推薦替代方案。

API觸發

外部系統(通過OpenAPI)

系統整合:被業務系統(如CRM、ERP)回調,觸發資料加工。

需要調用OpenAPI,並具備相應許可權。

快速入門:建立手動觸發工作流程

本章節將引導您建立一個最簡單的觸發式工作流程,並通過手動方式運行它,快速體驗完整流程。

步驟一:建立觸發式工作流程

  1. 進入DataWorks工作空間列表頁,在頂部切換至目標地區,找到目標工作空間,單擊操作列的快速進入 > Data Studio,進入Data Studio。

  2. 在左側導覽列單擊image,然後在專案目錄右側單擊image > 建立工作流程,進入建立工作流程頁面。

  3. 在彈出的對話方塊中,在建立工作流程頁面,選擇調度類型觸發式調度。輸入工作流程名稱,單擊確認建立。

步驟二:編排工作流程並開發節點

  1. 點擊工具列上方的+ 添加節點,開啟節點列表。從左側節點類型列表中,拖拽一個 Shell 節點到畫布中,輸入名稱完成建立。

  2. 雙擊Shell節點,進入代碼編輯頁面,輸入以下代碼:

    echo "Hello, Trigger Workflow! Current time is ${bizdate}"
  3. 單擊工具列的儲存按鈕。

步驟三:調試運行(開發環境)

  1. 返回工作流程畫布,單擊頂部工具列的image表徵圖。

  2. 在彈出的對話方塊中,填寫工作流程的本次運行值(例如今天20260310,bizdate應該被替換為20260309)。

  3. 稍後,在下方的作業記錄中,您可以看到節點的運行狀態和 echo 命令的輸出結果。

步驟四:發布並運行(生產環境)

  1. 在工作流程畫布上方,單擊發布image按鈕,根據提示完成發布流程。

  2. 發布成功後,進入 營運中心 > 手動任務營運 > 手動任務 > 觸發式工作流程

  3. 找到您剛發布的工作流程,單擊操作列的運行

  4. 在彈出的視窗中再次單擊運行,即可在生產環境觸發一次該工作流程的執行個體。您可以在手動執行個體頁面查看本次啟動並執行詳情。

至此,您已經成功掌握最基礎的觸發式工作流程使用方法。接下來,我們將深入探索更強大的事件觸發能力。

進階案例:建立事件觸發式工作流程

情境一:OSS新檔案到達,自動觸發資料處理

目標:當有新的CSV檔案上傳到OSS指定目錄時,自動觸發一個工作流程,列印出該檔案的路徑。

步驟一:建立OSS觸發器

  1. 進入 營運中心 > 調度設定 > 觸發器管理

  2. 單擊 建立觸發器,並按如下配置:

    說明

    詳細參數介紹,請參見OSS觸發器

    • 觸發器名稱:自訂,如 oss_new_file_trigger

    • 適用工作空間:選擇作用的目標工作空間,即工作流程所在的工作空間。

    • 觸發事件類型:選擇 Object Storage Service

    • 觸發事件:選擇 oss:ObjectCreated:PutObject(或其他上傳事件)。

    • Bucket名稱:選擇您的OSS Bucket。

    • 檔案名稱:指定監聽的檔案路徑和格式,支援萬用字元。例如,監聽 input/ 目錄下所有.csv檔案,可填寫 input/*.csv

    • 角色配置:首次使用需進行一鍵授權,選擇授權產生的名稱為DataWorks-EventBridge-OSS-MNS-Role-*************角色。

      *************表示隨機產生一串的13位id數字,用於作唯一性區別。
  3. 單擊確認,完成觸發器建立。

步驟二:建立並關聯工作流程

  1. 按照快速入門:建立手動觸發工作流程的步驟,建立一個名為 process_oss_file_workflow觸發式工作流程

  2. 在工作流程畫布的右側面板,選擇 調度配置 > 調度策略

  3. 觸發器下拉框中,選擇剛剛建立的 oss_new_file_trigger

    image

步驟三:開發節點,解析事件參數

  1. 點擊工具列上方的+ 添加節點,開啟節點列表。從左側節點類型列表中,拖拽一個 Shell 節點到畫布中,輸入名稱完成建立。

  2. 雙擊節點,編寫代碼以擷取並列印觸發事件中的檔案路徑。

    # 當觸發器觸發工作流程時,事件資訊會通過內建變數 workflow.triggerMessage 傳入
    # 我們可以通過 ${workflow.triggerMessage.data.oss.object.key} 擷取到上傳檔案的完整路徑
    
    echo "========= Start Processing OSS File ========="
    message="${workflow.triggerMessage}"
    echo "Raw Value: ${message}"
    
    # 從事件訊息中提取檔案名稱
    FILE_PATH="${workflow.triggerMessage.data.oss.object.key}"
    echo "A new file has arrived: ${FILE_PATH}"
    
    # 此處可以寫具體處理邏輯
    
    echo "========= Finish Processing OSS File ========="
    說明

    ${workflow.triggerMessage}:擷取完整的JSON格式事件訊息體。OSS具體的訊息格式可在EventBridge事件匯流排 > DATAWORKS_TRIGGER_FOR_BUCKET_<OSS_Bucket_Name> > 事項追蹤> 事項詳情中擷取。

    查看樣本OSS訊息格式,即workflow.triggerMessage的格式。

    {
        "datacontenttype": "application/json;charset=utf-8",
        "aliyunaccountid": "1***********9",
        "data": {
            "eventVersion": "1.0",
            "responseElements": {
                "requestId": "69B1***********C0A8"
            },
            "eventSource": "acs:oss",
            "eventTime": "2026-03-11T05:40:45.000Z",
            "requestParameters": {
                "sourceIPAddress": "***********"
            },
            "eventName": "ObjectCreated:PostObject",
            "userIdentity": {
                "principalId": "1***********9"
            },
            "region": "cn-hangzhou",
            "oss": {
                "bucket": {
                    "name": "******",
                    "arn": "acs:oss:cn-hangzhou:1***********9:******",
                    "virtualBucket": "",
                    "ownerIdentity": "1***********9"
                },
                "ossSchemaVersion": "1.0",
                "object": {
                    "size": 59537,
                    "objectMeta": {
                        "mimeType": "text/csv"
                    },
                    "deltaSize": 0,
                    "eTag": "63***********D32",
                    "key": "input/***********.csv"
                }
            }
        },
        "subject": "acs:oss:cn-hangzhou:1***********9:dwoss1024/input/******.csv",
        "aliyunoriginalaccountid": "1***********9",
        "source": "acs.oss",
        "type": "oss:ObjectCreated:PostObject",
        "aliyunpublishtime": "2026-03-11T05:40:45.682Z",
        "specversion": "1.0",
        "aliyuneventbusname": "DATAWORKS_TRIGGER_FOR_BUCKET_******",
        "id": "69B1***********0A8",
        "time": "2026-03-11T05:40:45.000Z",
        "aliyunregionid": "cn-hangzhou"
    }

步驟四:調試與發布

  1. 調試

    • 返回工作流程畫布,單擊運行image按鈕。

    • 觸發器訊息體輸入框中,粘貼一段類比的OSS事件JSON。您可以從觸發器配置頁的“訊息格式樣本”複製並修改 key 的值。以下為簡單樣本。

      {
        "data": {
          "oss":{
            "object": {
              "key": "input/test_file_20260310.csv" 
            }
          } 
        }
      }
    • 單擊運行,檢查日誌中是否成功列印出 input/test_file_20260310.csv

  2. 發布:調試通過後,單擊發布按鈕,將工作流程發布到生產環境。事件觸發只有在生產環境才會生效。

步驟五:生產驗證

  1. 通過OSS控制台或用戶端,向您在觸發器中配置的Bucket和路徑(例如 input/ 目錄)上傳一個CSV檔案。

    如何確保事件觸發成功?

    進入https://eventbridge.console.alibabacloud.com/<regionId>/event-bus/DATAWORKS_TRIGGER_FOR_BUCKET_<OssBucketName>/event-tracing,查詢最近觸發事件列表即可,同時可點擊具體事項詳情查看觸發具體訊息(即workflow.triggerMessage)。

    image

  2. 進入DataWorks 營運中心 > 手動任務營運 > 手動任務 > 觸發式工作流程,出現發布成功的process_oss_file_workflow

    image

  3. 等待片刻,進入DataWorks 營運中心 > 手動任務營運 > 觸發式工作流程執行個體,一個新的工作流程執行個體被自動觸發運行。單擊查看其日誌,確認檔案路徑被正確處理。

重要

最佳實務:等冪性設計

受網路波動等因素影響,OSS 事件可能被重複投遞。為避免資料重複處理,建議在商務邏輯中實現等冪性。常見方案為處理檔案前,先檢查一個記錄表(如MaxCompute表),以檔案的ETag或唯一路徑為標識,若已處理過則跳過。

情境二:Kafka訊息到達,驅動Realtime Compute

目標:監聽Kafka中使用者的行為日誌,當有新訊息時,觸發工作流程進行解析,並根據內容執行不同邏輯。

步驟一:建立Kafka觸發器

  1. 進入 營運中心 > 調度設定 > 觸發器管理,單擊建立觸發器

  2. 配置如下:

    • 觸發器名稱kafka_user_action_trigger

    • 觸發事件類型:選擇 雲訊息佇列Kafka版

    • Kafka執行個體Topic:選擇您要監聽的執行個體和Topic。

    • ConsumerGroupId:建議選擇快速建立,系統會自動產生一個消費組ID,避免與其他應用衝突。

    • Key(可選):可指定訊息的Key,只有Key完全符合的訊息才會觸發工作流程。

  3. 單擊確認

步驟二:建立並關聯工作流程

  1. 按照快速入門:建立手動觸發工作流程的步驟,建立一個名為 handle_user_action_workflow觸發式工作流程

  2. 在工作流程畫布的右側面板,選擇調度配置 > 調度策略

  3. 觸發器下拉框中,選擇剛建立的 kafka_user_action_trigger

    image

  4. (重要) 考慮到訊息可能高頻到達,建議配置內部任務最大並行執行個體數,如 100,防止瞬間大量訊息拖垮調度資源。

步驟三:開發節點,解析嵌套JSON

假設Kafka訊息的value欄位是一個JSON字串,格式如下: {"user_id": "1001", "action_type": "login", "timestamp": 1688888888}

  1. 點擊工具列上方的+ 添加節點,開啟節點列表。從左側節點類型列表中,拖拽一個Python節點到畫布中。

  2. 編寫代碼解析訊息。由於value本身是字串,我們需要在代碼中進行二次JSON解析。

    import json
    
    # 1. 使用內建變數擷取 Kafka 訊息的 value 欄位,它是一個JSON字串
    message_value_str = '${workflow.triggerMessage.value}'
    
    print(f'Received raw message value string: ${message_value_str}')
    
    try:
        # 2. 在Python代碼中將這個字串解析成JSON對象(字典)
        message_data = json.loads(message_value_str)
        
        user_id = message_data.get("user_id")
        action_type = message_data.get("action_type")
        print(f"Successfully parsed message. User ID: ${user_id}, Action: ${action_type}")
        
        # 3. 接下來可以根據 action_type 執行不同的商務邏輯
        if action_type == 'login':
            # o.run_sql(f"INSERT OVERWRITE TABLE user_login_record PARTITION(ds='{bizdate}') VALUES ('{user_id}');")
            print("Processing login action...")
        elif action_type == 'purchase':
            print("Processing purchase action...")
        else:
            print("Unknown action type.")
            
    except json.JSONDecodeError as e:
        print(f"Error decoding JSON: {e}")
        # 異常處理邏輯,例如可以將錯誤訊息寫入一個專門的日誌表
        raise e # 拋出異常,使節點運行失敗,便於排查

步驟四:調試與發布

  1. 調試

    • 返回工作流程畫布,單擊運行image按鈕。

    • 觸發器訊息體 中,粘貼類比的Kafka事件。注意 value 欄位是一個轉義後的JSON字串。

      {
        "topic": "user-behavior-topic",
        "key": "some-key",
        "value": "{\"user_id\": \"1001\", \"action_type\": \"login\", \"timestamp\": 1688888888}"
      }
    • 運行並檢查日誌,確認Python節點能正確解析出user_idaction_type

  2. 發布:調試通過後,將工作流程發布到生產環境。

步驟五:生產驗證

  1. 向您配置的Kafka Topic發送一條符合格式的訊息。

    image

  2. 進入DataWorks 營運中心 > 手動任務營運 > 手動任務 > 觸發式工作流程,出現發布成功的handle_user_action_workflow

    image

  3. 營運中心 > 手動任務營運 > 手動執行個體 > 觸發式工作流程執行個體中,觀察是否有新的工作流程執行個體被觸發,並檢查其作業記錄。

    image

重要

最佳實務:並發與順序

  • 並發控制:務必設定合理的最大並行執行個體數以應對訊息洪峰。

  • 順序保證:DataWorks調度本身不保證訊息的嚴格順序處理。如果需要保證同一個使用者(或分區)的訊息按順序執行,您需要在業務代碼中自行實現分布式鎖(如基於Redis/MaxCompute),或者將處理邏輯下沉到保證分區有序消費的計算引擎(如Flink)。

核心設計與配置

工作流程編排

觸發式工作流程編排核心流程和周期工作流程相似,可參考節點/工作流程編排

調度參數

在工作流程畫布右側的調度配置面板中,在此為工作流程設定全域參數,其內部所有節點均可引用。

  • 引用方式:在節點代碼中,通過 ${workflow.參數名} 格式引用工作流程參數。

  • 參數優先順序:DataWorks中的參數存在層級覆蓋關係,生效優先順序為:節點參數 > 工作流程參數

    更多參數說明,請參見參數設計與流轉

調度策略

當多個工作流程或任務同時被觸發,導致系統資源出現瓶頸時,您可以通過優先順序加權策略實現智能化的資源調度,確保最重要的任務優先執行。

  • 保障核心業務:為核心業務的工作流程設定一個更高的優先順序,使其總能優先於其他非核心工作流程運行。

  • 縮短關鍵流程耗時:在同一個工作流程執行個體內部,通過優先順序加權策略,您可以影響節點的執行順序。例如,使用向下加權策略,可以讓處於關鍵路徑上、擁有更多上遊依賴的節點獲得更高的動態權重,從而優先執行,有效縮短整個工作流程的運行時間長度。

    配置項

    功能說明

    優先順序

    定義工作流程執行個體在調度隊列中的絕對優先順序別。可選層級為1、3、5、7、8(數字越大,優先順序越高)。高優先順序的任務/工作流程總會優先於低優先順序的任務/工作流程擷取調度資源。

    優先順序加權策略

    定義同一優先順序下,工作流程內部各節點(Task)權重的動態計算方式。權重越高的節點將優先獲得執行機會。

    • 不加權:所有節點的權重均為固定基準值。

    • 向下加權:節點的權重會動態調整,其上遊依賴的節點越多,權重越高。此策略有助於DAG(有向非循環圖)中關鍵路徑上的節點優先執行。權重計算方式為:權重初始值 + 其上遊所有節點的優先順序之和

    內部任務最大並行執行個體數

    控制此工作流程在同一時間可啟動並執行最大執行個體數量,用於並發控制和資源保護。當運行中的執行個體數達到上限時,後續被觸發的新執行個體將進入等待狀態。支援設定不限制或自訂一個最大值(上限100,000)。

    說明

    設定上限時若超過資源群組最大可承受能力,則實際的並發瓶頸將由資源群組的物理上限決定。

DataWorks的優先順序系統遵循層級覆蓋規則:運行時指定 > 節點級配置 > 工作流程級配置

  1. 工作流程級配置 (基準):在工作流程的調度策略中配置,作為所有節點的預設設定。

  2. 節點級配置 (局部):在工作流程內部單個節點的調度配置 > 調度策略中為特定節點單獨設定更高優先順序,會覆蓋工作流程層級的設定。

  3. 運行時指定 (臨時):在營運中心手動觸發運行時,通過運行時重設優先順序開關指定的配置。該配置優先順序最高,僅對當次運行生效,不修改任何永久配置。

營運與管理

  • 執行個體監控:所有被觸發或手動啟動並執行執行個體,都可以在 營運中心 > 手動任務營運 > 手動執行個體 頁面查看、重跑、終止和排查日誌。

  • 複製工作流程:在專案目錄中按右鍵工作流程選擇複製,可以快速複製一個包含所有節點和依賴關係的新工作流程。更多說明,參見周期工作流程的複製工作流程

  • 版本管理:在工作流程畫布右側的版本面板,可以查看、對比和還原工作流程的歷史版本。更多說明,參見周期工作流程的版本管理

使用限制與注意事項

  • 生效環境事件觸發機制僅在工作流程發布到生產環境(營運中心)後生效。

  • 節點數量:單個工作流程最多支援400個節點,建議控制在100個以內以簡化維護。

  • 並發上限:最大並行執行個體數上限為100,000,但實際並發能力受您購買的調度資源群組規格限制。

  • 節點級調度:在節點層級配置調度時,僅支援配置優先順序,不支援配置優先順序加權策略

相關文檔