AnalyticDB for MySQL支援資料同步APS(AnalyticDB Pipeline Service)功能,您可以建立Kafka同步鏈路,通過同步鏈路從指定時間位點,即時同步Kafka中的資料入倉。本文主要介紹添加Kafka資料來源、建立Kafka同步鏈路並啟動任務的操作步驟。
前提條件
AnalyticDB for MySQL叢集的產品系列為企業版、基礎版或湖倉版。
已建立資料庫帳號。
如果是通過阿里雲帳號訪問,只需建立高許可權帳號。
如果是通過RAM使用者訪問,需要建立高許可權帳號和普通帳號並且將RAM使用者綁定到普通帳號上。
已建立ApsaraMQ for Kafka(簡稱Kafka)執行個體,且與AnalyticDB for MySQL叢集部署在同一VPC。
已建立Kafka Topic,並發送訊息。詳情請參見訊息佇列Kafka版快速入門操作流程。
注意事項
僅支援同步JSON格式的Kafka資料。
Kafka中建立的Topic資料超過一定的時間會被自動清理,如果Topic資料到期,同時資料同步任務失敗,重新啟動同步任務時讀取不到被清理掉的資料,會有遺失資料的風險。因此請適當調大Topic資料的生命週期,並在資料同步任務失敗時及時聯絡支援人員。
擷取Kafka範例資料在大於8 KB的情況下,Kafka API會將資料進行截斷,導致解析範例資料為JSON格式時失敗,從而無法自動產生欄位對應資訊。
kafka源端表結構發生變化時,不會觸發DDL自動變更,即變更不會同步至AnalyticDB for MySQL。
費用說明
AnalyticDB for MySQL叢集的ACU彈性資源費用,請參見湖倉版計費項目和企業版和基礎版計費項目。
操作步驟
步驟一:建立資料來源
如果您已添加Kafka資料來源,可跳過該步驟,直接建立同步鏈路。
登入雲原生資料倉儲AnalyticDB MySQL控制台,在左上方選擇叢集所在地區。在左側導覽列,單擊集群清單,然後單擊目的地組群ID。
在左側導覽列,單擊數據接入>數據源管理。
單擊左上方新建數據源。
在新建數據源頁面進行參數配置。參數說明如下表所示:
參數名稱
參數說明
數據源類型
選擇資料來源類型Kafka。
數據源名稱
系統預設按資料來源類型與目前時間產生名稱,可按需修改。
數據源描述
資料來源備忘描述,例如應用情境、應用業務限制等。
部署模式
目前僅支援阿里雲執行個體。
Kafka實例
Kafka執行個體ID。
登入雲訊息佇列 Kafka 版控制台,在集群清單頁面查看執行個體ID。
Kafka Topic
在Kafka中建立的Topic名稱。
登入雲訊息佇列 Kafka 版控制台,在目標執行個體的Topic 管理頁面查看Topic名稱。
消息數據格式
Kafka訊息資料格式,目前僅支援JSON。
參數配置完成後,單擊創建。
步驟二:建立同步鏈路
在左側導覽列,單擊SLS/Kafka數據同步。
在左上方,單擊新建同步鏈路,然後單擊Kafka數據源頁簽。
在新建同步鏈路頁面,進行資料來源的數據源及目標端配置、目標庫表配置及同步配置。
數據源及目標端配置的參數說明如下:
參數名稱
參數說明
數據鏈路名稱
資料鏈路名稱。系統預設按資料來源類型與目前時間產生名稱,可按需修改。
數據源
選擇已有的Kafka資料來源,也可建立資料來源。
資料來源格式
目前僅支援JSON。
目標端類型
選擇:數倉-ADB儲存。
ADB賬號
AnalyticDB for MySQL叢集的資料庫帳號。
ADB密碼
AnalyticDB for MySQL叢集資料庫帳號的密碼。
目標庫表配置參數說明如下:
參數名稱
參數說明
庫名
AnalyticDB for MySQL叢集的資料庫名稱。
錶名
AnalyticDB for MySQL叢集的表名稱。
樣例數據
自動從Kafka Topic中擷取的最新資料作為範例資料。
說明Kafka Topic中的資料需為JSON格式,若存在其他格式的資料,資料同步時會報錯。
JSON解析層級
設定JSON的嵌套解析層數,取值說明:
0:不做解析。
1(預設值):解析一層。
2:解析兩層。
3:解析三層。
4:解析四層。
JSON的嵌套解析策略,請參見通過資料同步功能APS同步Kafka資料(推薦)。
Schema字段映射
展示範例資料經過JSON解析後的Schema資訊。可在此調整目標欄位名、類型或按需增刪欄位等。
同步配置的參數說明如下:
參數名稱
參數說明
投遞起始點位
同步任務啟動時會從選擇的時間點開始消費Kafka資料。您可以選擇任意一個時間點,系統則會從Kafka中第一條大於等於該時間點的資料開始消費。
髒數據處理模式
同步資料時,若目標表中的欄位類型與源端實際同步的Kafka資料類型不匹配,則會導致同步失敗。例如源端的資料是
abc,而目標表中的欄位類型是int,此時會因為無法轉換而導致同步異常。髒資料處理模式取值如下:
中斷同步(預設值):資料同步終止,您需修改目標表的欄位類型或修改為其他髒資料處理模式,再重啟同步任務。
按NULL處理:髒資料欄位按NULL值寫入目標表。
例如:Kafka一行資料有3個欄位(col1、col2、col3),其中col2欄位為髒資料,則col2欄位資料轉為NULL值寫入,col1、col3欄位資料正常寫入。
Job型資源組
指定任務啟動並執行Job型資源群組。
增量同步所需ACU數
指定任務啟動並執行Job型資源群組ACU數。最小ACU數為2,最大ACU數為Job型資源群組可用計算最大資源數。建議適當調大所需ACU數,可以提升入倉效能及任務穩定性。
說明建立資料同步任務時,使用Job型資源群組中的彈性資源。資料同步任務會長期佔用資源,因此系統會從資源群組中扣除該任務佔用的資源。例如,Job型資源群組的計算最大資源為48 ACU,已建立了一個8 ACU的同步任務,在該資源群組中建立另一個同步任務時,可選的最大ACU數為40。
加入白名單
因資料同步網路打通需要,允許將kafka交換器網段添加到AnalyticDB for MySQL叢集的白名單中。
上述參數配置完成後,單擊提交。
步驟三:啟動資料同步任務
在SLS/Kafka數據同步頁面,選擇建立成功的資料同步任務,在操作列單擊啟動。
單擊左上方査詢,狀態變為正在運行即資料同步任務啟動成功。