當資料處理依賴於不可預測的外來事件(如OSS新檔案上傳、Kafka新訊息到達)時,傳統的周期調度任務會因固定時間的輪詢等待而造成資源浪費和資料處理延遲。DataWorks觸發器專為此類情境設計,它通過即時監聽外來事件訊號來按需啟動關聯的資料工作流程,讓資料管道實現真正的事件驅動,從而獲得更高的自動化水平和資料處理時效性。本文將詳細介紹如何建立、配置和管理事件調度觸發器。
功能介紹
DataWorks觸發器 (Trigger) 是一個用於實現事件驅動調度的功能組件。其核心功能是監測預先定義的事件,當事件發生時,觸發器負責啟動關聯的觸發式工作流程,並將事件的相關資訊作為參數傳遞給該工作流程。
多源事件監測:觸發器可以配置為監測來自不同來源的事件。
外部儲存事件:監測OSS Bucket中的新檔案建立。
外部訊息事件:監測Kafka、RocketMQ等訊息佇列中新訊息的到達。
動態參數捕獲與傳遞:當監測到事件時,觸發器會捕獲該事件的上下文資訊,並將其作為參數向下遊傳遞。
對於OSS事件,捕獲的資訊包括檔案資訊等。
對於訊息佇列事件,捕獲的資訊包括完整的訊息內容(如Key、Value、Headers等)。
這些被傳遞的參數,可供被觸發的工作流程內部節點在執行時引用,從而實現基於事件內容的資料處理。
支援的類型:
訊息佇列:Kafka、RocketMQ、RabbitMQ。
儲存物件:OSS。
適用範圍
地區範圍:僅華東1(杭州)、華北2(北京)、華北3(張家口)、華北6(烏蘭察布)、華南1(深圳)、中國香港、新加坡地區支援使用該功能。
適用版本:僅DataWorks專業版以上支援使用該功能。如果您的目前的版本不支援此功能,您可以升級DataWorks版本到專業版或更高版本。
計費說明
觸發式工作流程配置了觸發器並通過事件觸發運行工作流程,除了任務調度運行產生費用,還會在事件匯流排側產生費用。費用詳情見:事件流計費說明,計費方式為按事件量計費。
準備工作
觸發器依賴事件匯流排EventBridge的事件規則,需要開通事件匯流排EventBridge並授權和輕量訊息佇列(原 MNS)。
已在工作空間同地區存在要監聽的對象,如OSS、雲訊息佇列(Kafka、RocketMQ、RabbitMQ)等執行個體,並具備該執行個體的存取權限。
已添加使用者至指定工作空間並授予開發、營運、空間管理員角色許可權。授權詳情請參見增加空間成員並管理成員角色許可權。
進入觸發器管理頁面
進入營運中心頁面。
登入DataWorks控制台,切換至目標地區後,單擊左側導覽列的,在下拉框中選擇對應工作空間後單擊進入營運中心。
在營運中心左側導覽列單擊,然後切換至觸發器管理頁簽。
建立觸發器
若您已被添加為工作空間的開發、營運或管理員角色,可以在觸發器管理頁簽建立觸發器。
單擊建立觸發器按鈕,進入建立觸發器配置頁面。
說明DataWorks事件觸發依賴於事件匯流排EventBridge,初次使用或缺少AliyunServiceRoleForDataWorksScheduler服務關聯角色時,您可單擊一鍵授權進行授權。
執行該操作所需的使用者權限:
ram:CreateServiceLinkedRole,具體可參見建立服務關聯角色所需的最小許可權。
根據以下說明完成相關參數的配置。
Kafka觸發器
參數
配置說明
適用工作空間
選擇可使用該觸發器的工作空間,僅支援新版資料開發的工作空間。
適用環境
觸發器只適用於生產環境,在開發環境運行時,需手動填寫參數。
責任人
下拉配置觸發器的責任人。
觸發器類型
選擇雲訊息佇列Kafka版。
觸發事件
支援
alikafka:Topic:Message類型觸發事件,即通過Kafka訊息觸發,訊息發送方式參見:Kafka收發訊息。Kafka執行個體
選擇與工作空間同地區下的Kafka執行個體。如沒有可用的執行個體,前往建立:購買Kafka執行個體。
Topic
指定觸發器需要監聽的Topic對象,如果沒有可用的Topic,前往建立:建立Topic。
Key
可預設訊息的Key值,只有Key值完全符合,才觸發任務運行。非必填,若未填寫,則消費任意一個訊息,均會觸發工作流程運行。
ConsumerGroupId
可選擇快速建立或使用已有。若使用快速建立,系統會自動建立一個自動產生名稱的Group ID。
訊息格式樣本
Kafka訊息的固定樣本。支援在觸發工作流程
可以在內部任務中通過${workflow.triggerMessage}方式,擷取到完整的訊息體,也可通過${workflow.triggerMessage.xxx}擷取到訊息體內指定欄位的值。
RocketMQ觸發器
參數
配置說明
適用工作空間
選擇可使用該觸發器的工作空間,僅支援新版資料開發的工作空間。
適用環境
觸發器只適用於生產環境,在開發環境運行時,需手動填寫參數。
責任人
下拉配置觸發器的責任人。
觸發器類型
選擇雲訊息佇列RocketMQ版。
說明暫不支援5.x以前的版本,預設使用5.x版本
觸發事件
支援
mq:Topic:SendMessage類型觸發事件,即通過消費RocketMQ訊息觸發。RocketMQ執行個體
選擇與工作空間同地區下的RocketMQ執行個體,如沒有可用的執行個體,前往建立:RocketMQ執行個體管理。
Topic
指定觸發器需要監聽的Topic對象。如沒有可用的Topic,前往建立:Topic管理。
Tag
可預設訊息的Tag值,只有Tag值完全符合,才觸發任務運行。非必填,若未填寫,則消費任意一個訊息,均會觸發工作流程運行。
安全性群組
若RocketMQ執行個體為訂用帳戶類型,支援選擇安全性群組。
消費者組
可選擇快速建立或使用已有。若使用快速建立,系統會自動建立一個自動產生名稱的Group ID。
訊息格式樣本
RocketMQ訊息的固定樣本。支援在觸發工作流程
可以在內部任務中通過${workflow.triggerMessage}方式,擷取到完整的訊息體,也可通過${workflow.triggerMessage.xxx}擷取到訊息體內指定欄位的值。
RabbitMQ觸發器
參數
配置說明
適用工作空間
選擇可使用該觸發器的工作空間,僅支援新版資料開發的工作空間。
適用環境
觸發器只適用於生產環境,在開發環境運行時,需手動填寫參數。
責任人
下拉配置觸發器的責任人。
觸發器類型
選擇雲訊息佇列RabbitMQ版。
觸發事件
支援
amqp:Queue:SendMessage類型觸發事件,即通過消費RabbitMQ訊息觸發。RabbitMQ執行個體
選擇與工作空間同地區下的RabbitMQ執行個體,如沒有可用執行個體,請建立:建立RabbitMQ執行個體。
Vhost
RabbitMQ的虛擬機器主機名稱,用於對Queue進行邏輯隔離。如沒有Vhost,建立參考:Vhost管理。
Queue
指定觸發器監聽的Queue對象。若沒有合適的Queue,建立參考:Queue管理。
訊息格式樣本
RabbitMQ訊息體樣本。支援在觸發工作流程
可以在內部任務中通過${workflow.triggerMessage}方式,擷取到完整的訊息體,也可通過${workflow.triggerMessage.xxx}擷取到訊息體內指定欄位的值。
OSS觸發器
參數
配置說明
適用工作空間
選擇可使用該觸發器的工作空間,僅支援新版資料開發的工作空間。
適用環境
觸發器只適用於生產環境,在開發環境運行時,需手動填寫參數。
責任人
下拉配置觸發器的責任人。
觸發器類型
Object Storage Service。
觸發事件
支援以下三種事件類型:
oss:ObjectCreated:PutObject:上傳檔案。
oss:ObjectCreated:PostObject:通過HTML表單上傳檔案。
oss:ObjectCreated:CompleteMultipartUpload:檔案分區上傳已完成。
Bucket名稱
從下拉式清單中選擇作為事件來源的OSS Bucket名稱。如未建立,可建立OSS儲存空間。
檔案名稱
指定觸發事件的檔案名稱,支援萬用字元匹配:
檔案首碼匹配:
配置樣本:
task*。配置說明:您在OSS上傳以
task為首碼的檔案(例如task10.txt)時,可觸發事件。
檔案尾碼匹配:
配置樣本:
*task.txt。配置說明:您在OSS上傳以
task.txt為尾碼的檔案(例如work_task.txt)時,可觸發事件。
靈活匹配:
配置樣本:
*task*。配置說明:您在OSS上傳
task字元相關的檔案(例如work_task.txt)時,可觸發事件。
單擊確認,完成觸發器的建立。
使用觸發器
觸發器需配置觸發式工作流程一起使用,且只有發布到營運中心的觸發式工作流程才可以通過觸發器進行觸發運行。
若要通過觸發器實現事件調度,需結合觸發式工作流程一起使用。觸發式工作流程在提交至營運中心後,若命中監聽的事件對象,即可自動觸發工作流程內的任務運行。
管理觸發器
在觸發器管理頁簽,找到目標觸發器後,可進行查看引用任務、修改和版本查看及復原等相關操作。
查看引用任務:當觸發器被觸發式調度工作流程所引用時,您可單擊操作欄中的查看引用任務 ,在查看引用任務頁面中查看該觸發器被哪些觸發式調度工作流程引用。
修改觸發器:單擊操作欄中的修改 ,在修改觸發器頁面中編輯該觸發器的相關資訊後,單擊確認完成修改。
說明修改完成後,系統會自動為觸發器建立一個新版本。
查看版本:
單擊操作欄中的版本,可在查看版本頁面查看該觸發器的所有歷史版本。
可單擊操作欄中的查看瞭解某個版本的觸發器詳情。
支援版本復原。如需復原到某個歷史版本,單擊歷史版本後面的復原,在彈出的提示框中填寫復原備忘資訊後,確定完成復原。
說明復原時,系統會根據您選擇的復原版本自動產生一個新版本。
刪除觸發器:刪除觸發器前,請確保已下線並刪除引用該觸發器的所有引用任務。然後單擊刪除按鈕,並在提示框中確認刪除操作。
後續使用
觸發器建立並配置完成後,可以在觸發式工作流程中使用,詳情參見:觸發式工作流程。