Data Integration目前支援將MySQL、Oracle、PolarDB等源端的資料整庫即時同步至Hologres。本文以MySQL為源端、Kafka為目標端情境為例,為您介紹如何將MySQL整個資料庫的資料全量+增量同步處理至Kafka。
前提條件
操作步驟
一、選擇同步任務類型
進入Data Integration頁面。
登入DataWorks控制台,切換至目標地區後,單擊左側導覽列的,在下拉框中選擇對應工作空間後單擊進入Data Integration。
在左側導覽列單擊同步任務,然後在頁面頂部單擊建立同步任務,進入同步任務的建立頁面,配置如下基本資料。
資料來源和去向:
MySQL。資料去向:
Kafka。任務名稱:自訂同步任務名稱。
任務類型:
整庫即時。同步步驟:選中全量同步和增量同步處理。
二、網路與資源配置
在網路與資源配置地區,選擇同步任務所使用的資源群組。您可以為該任務分配任務資源佔用CU數。
來來源資料源選擇已添加的
MySQL資料來源,去向資料來源選擇已添加的Kafka資料來源後,單擊測試連通性。
確保來來源資料源與去向資料來源均連通成功後,單擊下一步。
三、選擇要同步的庫表
此步驟中,您可以在源端庫表地區選取項目源端資料來源下需要同步的表,並單擊
表徵圖,將其移動至右側已選庫表。

四、目標表映射
在上一步驟選擇完需要同步的表後,將自動在此介面展示當前待同步的表,但目標表的相關屬性預設為待重新整理映射狀態,需要您定義並確認源表與目標表映射關係,即資料的讀取與寫入關係,然後單擊重新整理映射後才可進入下一步操作。您可以直接重新整理映射,或自訂目標表規則後,再重新整理映射。
您可以選中待同步表後,單擊批量重新整理映射,未配置映射規則時,預設表名規則為
${源端庫名}_${表名},若目標端不存在同名表時,將自動建立。您可以在目標Topic名映射自訂列,單擊配置按鈕,自訂目標Topic名規則。
可以使用內建變數和手動輸入的字串拼接成為最終目標Topic名。其中,支援您編輯內建變數,例如,建立一個Topic名規則,將源表名增加尾碼作為目標Topic名。
您可以單擊寫入鍵取值列的配置按鈕,設定寫入鍵。
1. 編輯欄位類型映射
同步任務存在預設的源端欄位類型與目標端欄位類型映射,您可以單擊表格右上方的編輯欄位類型映射,自訂來源端表與目標端表欄位類型映射關係,配置完後單擊應用並重新整理映射。
2. 為目標Topic添加欄位並賦值
當目標Topic為待建立狀態時,您可以為目標Topic在原有表結構基礎上新增欄位。操作如下:
單表添加欄位並賦值:單擊目標Topic欄位賦值列的配置按鈕,在附加欄位頁面,單擊新增欄位,為目標Topic添加欄位並賦值。
批量賦值:選擇多個表,在列表底部選擇為目標表中相同的欄位,進行批量賦值。
說明在賦值時支援賦值常量與變數,您可通過
表徵圖切換賦值模式。
3. 配置DML規則
Data Integration提供預設DML處理規則,同時,您可以根據業務需要在此介面對寫入目標表的DML命令定義處理規則。
單表定義規則:單擊表格DML規則配置列的配置,對目標表單獨定義DML規則。
批量定義規則:選中待同步的所有表,在列表底部選擇。
4. 設定目標Topic屬性
您可以單擊目標Topic名列的
,設定Topic屬性,支援設定Topic分區數和副本數。
5. 設定源端切分列
您可以在源端切分列中下拉選擇源端表中的欄位或選擇不切分。
6. 是否執行全量同步
如果在選擇同步任務類型時,同步步驟勾選了全量同步,您還可以在此處對指定表關閉全量同步。
五、警示配置
為避免任務出錯導致業務資料同步延遲,您可以對同步任務設定警示策略。
單擊頁面右上方的警示配置,進入即時子任務警示設定頁面。
單擊新增警示,配置警示規則。
說明此處定義的警示規則,將對該任務產生的即時同步子任務生效,您可在任務配置完成後,進入即時同步任務運行與管理介面查看並修改該即時同步子任務的監控警示規則。
管理警示規則。
對於已建立的警示規則,您可以通過警示開關控制警示規則是否開啟,同時,您可以根據警示層級將警示發送給不同的人員。
六、進階參數配置
同步任務提供部分參數可供修改,您可以按需對該參數值進行修改,例如通過最大串連數上限限制,避免當前同步方案對資料庫造成過大的壓力從而影響生產。
請在完全瞭解對應參數含義的情況下再進行修改,以免產生不可預料的錯誤或者資料品質問題。
單擊介面右上方的進階參數配置,進入進階參數配置頁面。
在進階參數配置頁面修改相關參數值。
七、DDL能力配置
來來源資料源會包含許多DDL操作,您可以根據業務需求,在介面右上方單擊DDL能力配置,進入DDL能力配置頁面,對不同的DDL訊息設定同步至目標端的處理策略。
不同DDL訊息處理策略請參見:DDL訊息處理規則。
八、資源群組配置
您可以單擊介面右上方的資源群組配置,查看並切換當前的任務所使用的資源群組。
九、執行同步任務
完成所有配置後,單擊頁面底部的完成配置。
在介面,找到已建立的同步任務,單擊操作列的啟動。
單擊工作清單中對應任務的名稱/ID,查看任務的詳細執行過程。
同步任務營運
查看任務運行狀態
建立完成同步任務後,您可以在同步任務頁面查看當前已建立的同步工作清單及各個同步任務的基本資料。
您可以在操作列啟動或停止同步任務,在更多中可以對同步任務進行編輯、查看等操作。
已啟動的任務您可以在執行概況中看到任務啟動並執行基本情況,也可以單擊對應的概況地區查看執行詳情。

MySQL到Kafka的整庫即時同步任務分為三個步驟:
結構遷移:包含目標表的建立方式(已有表/自動建表),如果是自動建表,會展示DDL語句。
全量初始化:包含離線同步的表資訊、同步的進度以及寫入的條數。
即時資料同步:包含即時同步的統計資訊(即時的進度、DDL記錄、DML記錄和警示資訊)。
任務重跑
在某些特殊情況下,如果您需要增減表、修改目標表Schema資訊或者表名資訊時,您還可以單擊同步任務操作列的重跑,系統會將新增的表或有變更的表進行同步,之前同步過的表或者未修改的表將不會再進行同步。
不修改任務配置,直接單擊重跑操作,重新運行全量初始化+即時同步。
編輯任務,進行增減表操作,單擊完成配置。這個時候任務的操作列會顯示應用程式更新,單擊應用程式更新會直接觸發修改後的任務重跑。新增的表才會進行同步,之前同步過的表不會再同步。
附錄:寫入Kafka訊息格式定義
完成配置即時同步任務的操作後,執行同步任務會將源端資料庫讀取的資料,以JSON格式寫入到Kafka topic中。除了會將設定的源端表中已有資料全部寫入Kafka對應Topic中,還會啟動即時同步將增量資料持續寫入Kafka對應Topic中,同時源端表增量DDL變更資訊也會以JSON格式寫入Kafka對應Topic中。您可以通過附錄:訊息格式擷取寫入Kafka的訊息的狀態及變更等資訊。
通過離線同步任務寫入Kafka的資料JSON結構中的payload.sequenceId、payload.timestamp.eventTime和payload.timestamp.checkpointTime欄位均設定為-1。