Data Integration目前支援將DataHub、Hologres等源頭的資料單表即時同步至Kafka。即時ETL同步任務根據來源Hologres表結構對目標Kafka的Topic進行初始化,將Hologres資料即時同步至Kafka以供消費。本文為您介紹如何配置Hologres單表即時同步到Kafka。
使用限制
Kafka的版本需在0.10.2至3.6.0之間。
Hologres的版本要求必須為2.1及以上。
不支援Hologres分區表的增量同步處理。
不支援Hologres表DDL變更訊息同步。
Hologres增量同步處理支援的資料類型包括:INTEGER、BIGINT、TEXT、CHAR(n)、VARCHAR(n)、REAL、JSON、SERIAL、OID、INT4[]、INT8[]、FLOAT8[]、BOOLEAN[]、TEXT[]、JSONB。
需開啟源端的Hologres資料庫的表Hologres Binlog,詳情可參見訂閱Hologres Binlog。
前提條件
已購買Serverless資源群組。
已建立Hologres和Kafka資料來源,詳情請參見建立Data Integration資料來源。
已完成資源群組與資料來源間的網路連通,詳情請參見網路連通方案。
操作步驟
一、選擇同步任務類型
進入Data Integration頁面。
登入DataWorks控制台,切換至目標地區後,單擊左側導覽列的,在下拉框中選擇對應工作空間後單擊進入Data Integration。
在左側導覽列單擊同步任務,然後在頁面頂部單擊建立同步任務,進入同步任務的建立頁面,配置如下基本資料。
資料來源和去向:
Hologres→Kafka新任務名稱:自訂同步任務名稱。
同步類型:
單表即時。同步步驟:選擇
全量同步。
二、網路與資源配置
在網路與資源配置地區,選擇同步任務所使用的資源群組。您可以為該任務分配任務資源佔用CU數。
來來源資料源選擇已添加的
Hologres資料來源,去向資料來源選擇已添加的Kafka資料來源後,單擊測試連通性。
確保來來源資料源與去向資料來源均連通成功後,單擊下一步。
三、配置同步鏈路
1、配置Hologres來源
在頁面上方單擊資料來源Hologres,編輯Holo來源資訊。

在Holo來源資訊地區,選擇要讀取的Hologres表所在的Schema,以及來源表。
單擊右上方的資料採樣。
在資料輸出預覽對話方塊中指定好採樣條數,單擊開始採集按鈕,可以對指定的Hologres進行資料採樣,預覽Hologres中的資料,為後續資料處理節點的資料預覽和可視化配置提供輸入。
2、配置Kafka去向資訊
在頁面上方單擊資料去向Kafka,編輯Kafka去向資訊。

在Kafka去向資訊地區,選擇要寫入的Kafka主題(Topic)。
按需設定是否合并來源Binglog Update訊息,開啟後會將源端Binlog的一個Update動作對應的兩條Update訊息合并為一條訊息寫入。
設定輸出格式、鍵取值列以及Kafka Producer參數。
輸出格式:確認寫入Kafka記錄的Value內容格式,支援Canal CDC和JSON。的詳細資料,請參見附錄:輸出格式說明。
鍵取值列:選取的是源端列,對應列值會序列化為字串後,用逗號拼接作為寫入Kafka Topic中記錄的key。
說明列值序列化規則與JSON中的Hologres中列類型序列化規則說明一致。
Kafka Topic中記錄的Key值決定寫入的分區,相同Key值寫入同一分區,為了保證下遊消費Kafka Topic時資料能夠保持順序,建議選擇Hologres表主鍵作為鍵取值列。
如果不選擇任何源側列作為鍵取值列,Kafka Topic 中記錄的Key值為null,會導致Kafka寫入分區呈隨機寫入。
Kafka Producer參數:是影響寫入一致性、穩定性和異常處理行為的參數,一般情況預設配置即可,如有定製化需求可以指定特定參數,各個Kafka叢集版本支援的Producer參數可參考Kafka官方文檔。
四、警示配置
為避免任務出錯導致業務資料同步延遲,您可以對同步任務設定警示策略。
單擊頁面右上方的警示配置,進入即時子任務警示設定頁面。
單擊新增警示,配置警示規則。
說明此處定義的警示規則,將對該任務產生的即時同步子任務生效,您可在任務配置完成後,進入即時同步任務運行與管理介面查看並修改該即時同步子任務的監控警示規則。
管理警示規則。
對於已建立的警示規則,您可以通過警示開關控制警示規則是否開啟,同時,您可以根據警示層級將警示發送給不同的人員。
五、進階參數配置
同步任務提供部分參數可供修改,您可以按需對該參數值進行修改。
請在完全瞭解對應參數含義的情況下再進行修改,以免產生不可預料的錯誤或者資料品質問題。
單擊介面右上方的進階參數配置,進入進階參數配置頁面。
在進階參數配置頁面修改相關參數值。
六、資源群組配置
您可以單擊介面右上方的資源群組配置,查看並切換當前的任務所使用的資源群組。
七、執行同步任務
完成所有配置後,單擊頁面底部的完成配置。
在介面,找到已建立的同步任務,單擊操作列的啟動。
單擊工作清單中對應任務的名稱/ID,查看任務的詳細執行過程。
同步任務營運
查看任務運行狀態
建立完成同步任務後,您可以在同步任務頁面查看當前已建立的同步工作清單及各個同步任務的基本資料。

您可以在操作列啟動或停止同步任務,在更多中可以對同步任務進行編輯、查看等操作。
已啟動的任務您可以在執行概況中看到任務啟動並執行基本情況,也可以單擊對應的概況地區查看執行詳情。

Hologres到Kafka的單表即時同步任務分為三個步驟:
結構遷移:包含目標表的建立方式(已有表或自動建表),如果是自動建表,將會為您展示建表的DDL。
全量初始化:如果您的任務同步步驟選擇了全量同步,此處將展示全量初始化進度。
即時資料同步:包含即時同步的統計資訊,包含即時的讀寫流量、髒資料、Failover和作業記錄。
任務重跑
在某些特殊情況下,如果需要修改同步欄位、調整目標表欄位或表名資訊時,您還可以單擊同步任務操作列的重跑,系統會將調整的欄位、變更的目標包等資訊進行同步,之前同步過未修改的表將不會再進行同步。
不修改任務配置,直接單擊重跑操作,重新運行一次同步任務。
編輯任務,進行修改操作後,單擊完成配置。此時任務的操作會變成應用程式更新,單擊應用程式更新會直接觸發修改後的任務重跑。即時同步任務會按照新的配置運行。
附錄:輸出格式說明
Canal CDC
Canal CDC是Alibaba Canal定義的一種CDC資料格式。
Json
Json是將Hologres Binlog中的變更記錄,以列表名作為Key,將列表的資料內容序列化為字串後作為value,組裝為JSON格式字串寫入Kafka Topic中。
