DataWorksData Integration提供了單表即時同步任務,旨在實現不同資料來源之間低延遲、高輸送量的資料複製與流轉。該功能基於先進的Realtime Compute引擎,能夠捕獲源端資料的即時變更(增、刪、改),並將其快速應用到目標端。本文以Kafka單表即時同步至MaxCompute為例,講述單表即時任務的配置方式。
準備工作
資料來源準備
已建立來源與去向資料來源,資料來源配置詳見:資料來源管理。
確保資料來源支援即時同步能力,參見:支援的資料來源及同步方案。
部分資料來源需要開啟日誌,如Hologres、Oracle等。不同的資料來源開啟方式不同,詳見資料來源配置:資料來源列表。
資源群組:已購買並配置Serverless資源群組。
網路連通:資源群組與資料來源之間需完成網路連通配置。
步驟一:建立同步任務
登入DataWorks控制台,切換至目標地區後,單擊左側導覽列的,在下拉框中選擇對應工作空間後單擊進入Data Integration。
在左側導覽列單擊同步任務,然後在頁面頂部單擊建立同步任務,並配置任務資訊,此處以Kafka即時寫入MaxCompute為例:
資料來源類型:
Kafka。資料去向類型:
MaxCompute。具體類型:
單表即時。同步步驟:
結構遷移:自動在目標端建立與源端匹配的資料庫物件(如表、欄位、資料類型等),但不包含資料。
增量同步(可選):在全量同步完成後,持續地捕獲源端發生的變更資料(新增、修改、刪除),並將其同步至目標端。
若源端為Hologres還支援全量同步,即先將已有資料全量同步至目標表後,自動進行資料的增量同步處理。
支援的資料來源及同步方案請參見:支援的資料來源及同步方案。
步驟二:配置資料來源與運行資源
來源數據源選擇已添加的
Kafka資料來源,去向數據源選擇已添加的MaxCompute資料來源。在運行資源地區,選擇同步任務所使用的資源組,並為該任務分配資源組CU。支援為全量同步和增量同步處理分別設定CU,以精確控制資源,避免浪費。如果您的同步任務因資源不足出現OOM現象,請適當調整資源群組的CU佔用取值。
並確保來來源資料源與去向資料來源均通過連通性檢查。
步驟三:配置同步方案
1. 配置資料來源
在配置頁簽,選擇Kafka資料來源中需要同步的Topic。
其他配置可使用任務建立時產生的預設值,也可根據需要進行修改,參數詳見:Kafka官方文檔 。
單擊右上方的數據採樣。
在彈出對話方塊中指定開始時間和採樣條數後,單擊開始採集按鈕,可以對指定的Kafka Topic進行資料採樣,同時您可以預覽Topic中的資料,為後續資料處理節點的資料預覽和可視化配置提供輸入。
在輸出字段配置頁簽,按需勾選同步任務需同步的欄位。
Kafka側欄位中預設提供6個欄位。
欄位名
含義
__key__
Kafka記錄的Key。
__value__
Kafka記錄的Value。
__partition__
Kafka記錄所在分區號,分區號為從0開始的整數。
__headers__
Kafka記錄的Headers。
__offset__
Kafka記錄在所在分區的位移量,位移量為從0開始的整數。
__timestamp__
Kafka記錄的13位整數毫秒時間戳記。
您也可以在後續的資料處理節點對欄位進行更多轉換處理。
2. 資料處理
開啟數據處理按鈕,目前提供5種資料處理方式(資料脫敏、字串替換、資料過濾、JSON解析和欄位編輯與賦值),您可根據需要做順序編排,在任務運行時會按照編排的資料處理先後順序執行資料處理。

每完成一個資料處理節點配置,可以單擊右上方的數據輸出預覽按鈕:
在輸入資料下方的表格中,可以看到上個環節數據採樣的結果。您可以單擊重新獲取上遊輸出,來重新整理結果。
如果上遊沒有輸出結果,也可以通過手工構造數據來類比前置輸出。
單擊預覽,可以查看上遊環節輸出的資料,經過資料處理組件處理後輸出的結果。

資料輸出預覽和資料處理強依賴Kafka來源的數據採樣,在執行資料處理前需要先在Kafka來源表單中完成資料採樣。
3. 配置資料去向
在數據去向地區,選擇Tunnel資源群組,預設選擇“公用傳輸資源”,即MC的免費quota。
選擇要寫入目標表是建立表還是使用已有表。
如果是建立表,可在下拉式清單中選擇新建,預設會建立與資料來源端結構相同的表,您可以手動修改目標端表名和表結構。
如果是使用已有表,請下拉選擇需要同步的目標表。
(可選)編輯表結構。
單擊表名後的編輯按鈕,可編輯該表的表結構,支援單擊根據上遊節點輸出列重新生成表結構按鈕,自動根據上遊節點輸出資料行,產生表結構。您可以在自動產生的表結構中選擇一列配置為主鍵。
4. 配置欄位對應
選擇資料來源和資料去向後,需要指定讀取端和寫入端列的映射關係。配置欄位對應關係後,任務將根據欄位對應關係,將源端欄位寫入目標端對應類型的欄位中。
系統會自動按照同名映射原則產生上遊列與目標表列之間的映射,您可根據需要進行調整,支援一個上遊列映射到多個目標表列,不允許多個上遊列映射到一個目標表列,當上遊列未配置到目標表列的映射時,對應列不會寫入目標表。
Kafka側欄位可自訂配置JSON解析,可以通過資料處理組件擷取value欄位內容,實現更精細的欄位配置。

分區設定(可選)。
時間自動分區是根據業務時間(此處為_timestamp)欄位進行分區的,一級分區為年,二級分區為月,以此類推。
根據欄位內容動態分區通過指定源端表某欄位與目標MaxCompute表分區欄位對應關係,實現源端對應欄位所在資料行寫入到MaxCompute表對應的分區中。
步驟四:進階配置
同步任務提供進階參數可供精細化配置,系統設有預設值,多數情況下無需修改。如有修改必要,您可以:
單擊介面右上方的高級配置,進入高級參數配置頁面。
說明資料開發的進階配置位於任務配置介面右側頁簽。
支援為同步任務的讀端和寫端分別設定參數。修改自動化佈建運行時配置,設定為false,可以自訂運行時配置。
根據參數提示,修改參數值,參數含義見參數名稱後的解釋。部分參數的配置建議可參考即時同步進階參數。
請在完全瞭解參數含義與作用後果再進行修改,以免產生不可預料的錯誤或者資料品質問題。
步驟五:類比運行
完成上述所有任務配置後,您可以單擊左下角的模擬運行來調試任務,類比整個任務針對少量採樣資料的處理,查看資料寫入目標表後的結果。當任務配置錯誤、類比運行過程中異常或者產生髒資料時,會即時反饋出異常資訊,能夠協助您快速評估任務配置的正確性,以及是否能得到預期結果。
在彈出的對話方塊中設定採樣參數(開始時間和採樣條數)。
單擊開始採集得到採樣資料。
單擊預覽結果按鈕,類比任務運行,並查看輸出結果。
類比運行輸出的結果僅作預覽,不會寫入目標端資料來源,對生產資料造成影響。
步驟六:發布並執行任務
完成所有配置後,單擊頁面底部的保存,完成任務配置。
Data Integration的任務需要發布至生產環境運行,因此建立或者編輯任務均需執行发布操作後方可生效。發布時,若勾選发布后直接启动运行,則在發布時會同步啟動任務。否則,發布完成後,需要進入介面,在目標任務的操作列,手動啟動任務。
單擊任務列表中對應任務的名稱/ID,查看任務的詳細執行過程。
步驟七:配置警示規則
任務發布上線運行後,可以為其配置警示規則,以便在任務出現異常時第一時間獲得通知,確保生產環境的穩定性和資料時效性。在Data Integration的工作清單,單擊目標任務操作列的。
1.新增警示

(1) 單擊新建規則,配置警示規則。
您可以通過設定報警原因,對任務的業務延遲、Failover、任務狀態、DDL通知、任務資源利用率等指標進行監控,並根據指定的閾值設定CRITICAL或WARNING兩種不同層級的警示方式。
設定警示方式後,可通過設定高級參數配置,控制警示資訊發送的時間間隔,防止一次性發送資訊太多,造成浪費和訊息堆積。
若警示原因選擇業務延遲、任務狀態和任務資源利用率,也支援開啟恢複通知,方便任務恢複正常後,通知接收人。
(2) 管理警示規則。
對於已建立的警示規則,您可以通過警示開關控制警示規則是否開啟,同時,您可以根據警示層級將警示發送給不同的人員。
2.查看警示
單擊展開工作清單的,進入警示事件,可以查看已經發生的警示資訊。
後續步驟
任務啟動後,您可以點擊任務名稱,查看運行詳情,進行任務營運和調優。
常見問題
即時同步任務常見問題請參見即時同步常見問題。
更多案例
Kafka單表即時同步至ApsaraDB for OceanBase