全部產品
Search
文件中心

AnalyticDB:通過資料同步功能APS同步Kafka資料

更新時間:Dec 02, 2025

AnalyticDB for MySQL支援資料同步APS(AnalyticDB Pipeline Service)功能,您可以建立Kafka同步鏈路,通過同步鏈路從指定時間位點,即時同步Kafka中的資料入倉。本文主要介紹添加Kafka資料來源、建立Kafka同步鏈路並啟動任務的操作步驟。

前提條件

注意事項

  • 僅支援同步JSON格式的Kafka資料。

  • Kafka中建立的Topic資料超過一定的時間會被自動清理,如果Topic資料到期,同時資料同步任務失敗,重新啟動同步任務時讀取不到被清理掉的資料,會有遺失資料的風險。因此請適當調大Topic資料的生命週期,並在資料同步任務失敗時及時聯絡支援人員。

  • 擷取Kafka範例資料在大於8 KB的情況下,Kafka API會將資料進行截斷,導致解析範例資料為JSON格式時失敗,從而無法自動產生欄位對應資訊。

  • kafka源端表結構發生變化時,不會觸發DDL自動變更,即變更不會同步至AnalyticDB for MySQL

費用說明

AnalyticDB for MySQL叢集的ACU彈性資源費用,請參見湖倉版計費項目企業版和基礎版計費項目

操作步驟

步驟一:建立資料來源

說明

如果您已添加Kafka資料來源,可跳過該步驟,直接建立同步鏈路

  1. 登入雲原生資料倉儲AnalyticDB MySQL控制台,在左上方選擇叢集所在地區。在左側導覽列,單擊集群清單,然後單擊目的地組群ID。

  2. 在左側導覽列,單擊數據接入>數據源管理

  3. 單擊左上方新建數據源

  4. 新建數據源頁面進行參數配置。參數說明如下表所示:

    參數名稱

    參數說明

    數據源類型

    選擇資料來源類型Kafka

    數據源名稱

    系統預設按資料來源類型與目前時間產生名稱,可按需修改。

    數據源描述

    資料來源備忘描述,例如應用情境、應用業務限制等。

    部署模式

    目前僅支援阿里雲執行個體。

    Kafka實例

    Kafka執行個體ID。

    登入雲訊息佇列 Kafka 版控制台,在集群清單頁面查看執行個體ID。

    Kafka Topic

    在Kafka中建立的Topic名稱。

    登入雲訊息佇列 Kafka 版控制台,在目標執行個體的Topic 管理頁面查看Topic名稱。

    消息數據格式

    Kafka訊息資料格式,目前僅支援JSON。

  5. 參數配置完成後,單擊創建

步驟二:建立同步鏈路

  1. 在左側導覽列,單擊SLS/Kafka數據同步

  2. 在左上方,單擊新建同步鏈路,然後單擊Kafka數據源頁簽

  3. 新建同步鏈路頁面,進行資料來源的數據源及目標端配置目標庫表配置同步配置

    • 數據源及目標端配置的參數說明如下:

      參數名稱

      參數說明

      數據鏈路名稱

      資料鏈路名稱。系統預設按資料來源類型與目前時間產生名稱,可按需修改。

      數據源

      選擇已有的Kafka資料來源,也可建立資料來源。

      資料來源格式

      目前僅支援JSON。

      目標端類型

      選擇:數倉-ADB儲存

      ADB賬號

      AnalyticDB for MySQL叢集的資料庫帳號。

      ADB密碼

      AnalyticDB for MySQL叢集資料庫帳號的密碼。

    • 目標庫表配置參數說明如下:

      參數名稱

      參數說明

      庫名

      AnalyticDB for MySQL叢集的資料庫名稱。

      錶名

      AnalyticDB for MySQL叢集的表名稱。

      樣例數據

      自動從Kafka Topic中擷取的最新資料作為範例資料。

      說明

      Kafka Topic中的資料需為JSON格式,若存在其他格式的資料,資料同步時會報錯。

      JSON解析層級

      設定JSON的嵌套解析層數,取值說明:

      • 0:不做解析。

      • 1(預設值):解析一層。

      • 2:解析兩層。

      • 3:解析三層。

      • 4:解析四層。

      JSON的嵌套解析策略,請參見通過資料同步功能APS同步Kafka資料(推薦)

      Schema字段映射

      展示範例資料經過JSON解析後的Schema資訊。可在此調整目標欄位名、類型或按需增刪欄位等。

    • 同步配置的參數說明如下:

      參數名稱

      參數說明

      投遞起始點位

      同步任務啟動時會從選擇的時間點開始消費Kafka資料。您可以選擇任意一個時間點,系統則會從Kafka中第一條大於等於該時間點的資料開始消費。

      髒數據處理模式

      同步資料時,若目標表中的欄位類型與源端實際同步的Kafka資料類型不匹配,則會導致同步失敗。例如源端的資料是abc,而目標表中的欄位類型是int,此時會因為無法轉換而導致同步異常。

      髒資料處理模式取值如下:

      • 中斷同步(預設值):資料同步終止,您需修改目標表的欄位類型或修改為其他髒資料處理模式,再重啟同步任務。

      • 按NULL處理:髒資料欄位按NULL值寫入目標表。

      例如:Kafka一行資料有3個欄位(col1、col2、col3),其中col2欄位為髒資料,則col2欄位資料轉為NULL值寫入,col1、col3欄位資料正常寫入。

      Job型資源組

      指定任務啟動並執行Job型資源群組。

      增量同步所需ACU數

      指定任務啟動並執行Job型資源群組ACU數。最小ACU數為2,最大ACU數為Job型資源群組可用計算最大資源數。建議適當調大所需ACU數,可以提升入倉效能及任務穩定性。

      說明

      建立資料同步任務時,使用Job型資源群組中的彈性資源。資料同步任務會長期佔用資源,因此系統會從資源群組中扣除該任務佔用的資源。例如,Job型資源群組的計算最大資源為48 ACU,已建立了一個8 ACU的同步任務,在該資源群組中建立另一個同步任務時,可選的最大ACU數為40。

      加入白名單

      因資料同步網路打通需要,允許將kafka交換器網段添加到AnalyticDB for MySQL叢集的白名單中。

  4. 上述參數配置完成後,單擊提交

步驟三:啟動資料同步任務

  1. SLS/Kafka數據同步頁面,選擇建立成功的資料同步任務,在操作列單擊啟動

  2. 單擊左上方査詢,狀態變為正在運行即資料同步任務啟動成功。