本文介紹如何建立Debezium MySQL Source Connector,將MySQL的資料同步至ApsaraMQ for Kafka。
前提條件
已開通Object Storage Service服務並建立儲存空間(Bucket)。更多資訊,請參見控制台建立儲存空間。
已開通Serverless應用引擎服務。更多資訊,請參見準備工作。
已建立專用網路及交換器。更多資訊,請參見步驟一:建立專用網路和交換器。
已購買並部署ApsaraMQ for Kafka執行個體。更多資訊,請參見步驟二:購買和部署執行個體。
步驟一:建立資料表
登入RDS管理主控台,建立RDS MySQL執行個體。更多資訊,請參見建立RDS MySQL執行個體。
建立執行個體時,請選擇與前提條件中已購買部署的Kafka執行個體相同的VPC,並將此VPC網段加入白名單。

執行個體建立完成後,在執行個體列表頁面單擊目標執行個體,完成以下操作。
在左側導覽列單擊基本資料,然後單擊登入資料庫,進入Data Management服務平台後,完成以下操作。
在左側雙擊目標資料庫名稱,切換到已建立的資料庫。

在SQL Console頁簽,使用SQL語句建立表格。例如,建立一個列參數分別為id和number的表格,命令如下。更多資訊,請參見SQL Commands。
CREATE TABLE sql_table(id INT ,number INT);
步驟二:建立Connector任務
下載Debezium MySQL Source Connector檔案,上傳至提前建立好的OSS bucket,更多資訊,請參見控制台上傳檔案。
重要下載Debezium MySQL Connector檔案時請選擇適配Java 8的版本。
登入雲訊息佇列 Kafka 版控制台,在概览頁面的资源分布地區,選擇地區。
在左側導覽列,選擇。
在概覽頁面的訊息流程入(Source)地區,找到自訂 Kafka Connect卡片並單擊卡片上的建立。
在工作清單頁面,單擊建立任務,在建立任務頁面,配置以下資訊。
在Source(源)嚮導頁面,選擇資料提供方為Apache Kafka Connect,單擊下一步。
在連接器配置設定精靈,設定以下參數,然後單擊下一步。
配置項
參數
說明
Kafka Connect 外掛程式
Bucket儲存桶
在下拉式清單中選擇OSS Bucket。
檔案僅支援 ZIP 格式
在下拉式清單中選擇上傳的.ZIP檔案。
Kafka 資源資訊
Kafka 參數配置
選擇Source Connect。
Kafka執行個體
選擇前提條件中建立的執行個體。
Virtual Private Cloud
預設選擇部署Kafka執行個體時選擇的VPC ID且不可更改。
交換器
預設選擇部署Kafka執行個體時選擇的vSwitch ID且不可更改。
安全性群組
選擇安全性群組。
Kafka Connect 配置資訊
解析當前ZIP包下的properties檔案
選擇.ZIP檔案中包含的SourceConnector對應的.properties檔案。路徑為/etc/xxx.properties。在輸入框中更新下欄欄位的取值。
tasks.max:Task的最大數量。該情境下只支援設定為1。
database.hostname:填寫步驟一:建立資料表中擷取的來源資料庫內網地址。
database.port:填寫步驟一:建立資料表中擷取的連接埠號碼。
database.user:資料庫登入帳號。
database.password:資料庫登入密碼。
database.server.name:MySQL資料庫服務的邏輯名稱。
僅允許包含英文字母、數字以及底線(_)。
該欄位用於組成資料庫及表格的目標Topic的名稱,資料庫目標Topic的命名規則為
{database.server.name},表格目標Topic的命名規則為{database.server.name}.{databaseName}.{tableName}。請在運行Connector任務前按照命名規則提前建立對應的目標Topic。
database.include.list:來源資料庫名稱。若有多個資料庫,用英文逗號(,)分隔。
table.include.list:源表格名稱,單個表格的格式為
{databaseName}.{tableName};若有多個表格,用英文逗號(,)分隔。database.history.kafka.bootstrap.servers:Kafka執行個體串連地址。
該執行個體用於記錄資料庫所有Schema變動記錄。
可使用Connector任務中已配置的目標Kafka執行個體,也可使用新執行個體。
執行個體需要與資料庫執行個體處於同一個VPC中。
該執行個體需要開啟自由使用Group能力。更多資訊,請參見自由使用Group。
database.history.kafka.topic:此Topic用於記錄資料庫所有Schema的變動記錄,請在運行Connector任務前提前建立。
include.schema.changes:是否監控Schema變動記錄,若取值為true,則會將這些變動記錄寫入名為
{database.server.name}的Topic中。
Connector全量參數,請參見Debezium Connector Properties。
在執行個體配置嚮導頁面,設定以下參數,然後單擊下一步。
配置項
參數
說明
Worker 規格
Worker規格
選擇合適的Worker規格。
最小 Worker 數
Worker 擴容的最小數量,該配置不能少於1。
最大 Worker 數
Worker 擴容的最大數量,該配置不能大於 50,您也可以提交工單申請更多容量。
Kafka Connect Worker 配置
自動建立Kafka Connect Worker依賴資源
建議勾選此項,此時會在選擇的Kafka執行個體中自動建立Kafka Connect運行所需的一些Internal Topic以及ConsumerGroup,並將這些必填配置自動填入配置框中,包括以下配置項:
Offset Topic:用於儲存來源資料位移量,命名規則為
connect-eb-offset-<任務名稱>。Config Topic:用於儲存Connectors以及Tasks的配置資訊,命名規則為
connect-eb-config-<任務名稱>。Status Topic:用於儲存Connectors以及Tasks狀態資訊,命名規則為
connect-eb-status-<任務名稱>。Kafka Connect Consumer Group:Kafka Connect Worker用於消費Internal Topics的消費組,命名規則為
connect-eb-cluster-<任務名稱>。Kafka Source Connector Consumer Group:只針對Sink Connector有效,用於消費源Topic中的資料,命名規則為
connector-eb-cluster-<任務名稱>-<connector名稱>。
在回合組態地區嚮導頁面,將日誌投遞方式設定為投遞至SLS或者投遞至Kafka,在角色授權地區設定Connect依賴的角色配置,然後單擊儲存。
重要建議配置的角色包含AliyunSAEFullAccess許可權,否則可能會導致任務運行失敗。
等待任務狀態變為運行中,此時Connector已經在正常工作中。
步驟三:測試Connector任務
在Data Management服務平台,向步驟一:建立資料表中建立的資料表插入一條資料。例如,插入一條id為123,number為20000的資料,命令如下。
INSERT INTO sql_table(id, number) VALUES(123,20000);登入雲訊息佇列 Kafka 版控制台,在執行個體列表頁面,單擊目標執行個體。
在目標執行個體頁面,單擊目標Topic,然後單擊訊息查詢,查看插入的訊息資料,訊息Value樣本如下。
資料庫目標Topic(命名規則為
{database.server.name})樣本:{ "source":{ "version":"1.5.0.Final", "connector":"mysql", "name":"fulfillment", "ts_ms":1686283675404, "snapshot":"true", "db":"eb-ceshi", "sequence":null, "table":"sql_table", "server_id":0, "gtid":null, "file":"mysql-bin.000006", "pos":188032, "row":0, "thread":null, "query":null }, "databaseName":"eb-cesh", "schemaName":null, "ddl":"DROP TABLE IF EXISTS sql_table", "tableChanges":[ ] }資料表格目標Topic(命名規則為
{database.server.name}.{databaseName}.{tableName})樣本:{ "before":null, "after":{ "id":123, "number":20000 }, "source":{ "version":"1.5.0.Final", "connector":"mysql", "name":"fulfillment", "ts_ms":1686283675675, "snapshot":"last", "db":"eb-cesh", "sequence":null, "table":"sql_table", "server_id":0, "gtid":null, "file":"mysql-bin.000006", "pos":188032, "row":0, "thread":null, "query":null }, "op":"r", "ts_ms":1686283675675, "transaction":null }database.history.kafka.topic欄位指定的Topic樣本:
{ "source":{ "server":"fulfillment" }, "position":{ "ts_sec":1686283675, "file":"mysql-bin.000006", "pos":188032, "gtids":"be4286e6-05ce-11ee-b8c2-00163e20****:1-****", "snapshot":true }, "databaseName":"eb-cesh", "ddl":"CREATE DATABASE `wbdb` CHARSET utf8 COLLATE utf8_general_ci", "tableChanges":[ ] }
常見報錯
情境一:所有Tasks運行失敗
錯誤資訊:
All tasks under connector mongo-source failed, please check the error trace of the task.解決方案:在訊息流程入任務詳情頁面,單擊基礎資訊地區的診斷連結,即可跳轉到Connector監控頁面,可以看到Tasks運行失敗的詳細錯誤資訊。
情境二:Kafka Connect退出
錯誤資訊:
Kafka connect exited! Please check the error log /opt/kafka/logs/connect.log on sae application to find out the reason why kafka connect exited and update the event streaming with valid arguments to solve it.解決方案:由於狀態擷取可能會有延遲,建議您先嘗試重新整理頁面。若重新整理後仍然是失敗狀態,您可以按照以下步驟查看錯誤資訊。
在目標任務的概覽頁面,找到Worker資訊地區,單擊SAE應用右側的
表徵圖,跳轉至SAE控制台。
在基礎資訊頁面,單擊執行個體列表頁簽,然後單擊目標執行個體操作列的Webshell。
執行
vi /home/admin/connector-bootstrap.log命令,查看Connector開機記錄,尋找其中是否包含錯誤資訊。執行
vi /opt/kafka/logs/connect.log命令,查看Connector作業記錄,在其中尋找ERROR或者WARN欄位,以查看是否有錯誤資訊。
基於錯誤資訊提示進行修複操作後,可以重新啟動對應任務。
情境三:Connector參數校正失敗
錯誤資訊:
Start or update connector xxx failed. Error code=400. Error message=Connector configuration is invalid and contains the following 1 error(s):
Value must be one of never, initial_only, when_needed, initial, schema_only, schema_only_recovery
You can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`解決方案:此時需要根據錯誤資訊,找出具體哪個參數出錯,更新對應參數即可。若基於上述錯誤資訊無法定位具體的出錯參數,可以參考上文情境二中的步驟登入Kafka Connect運行環境,執行以下命令,查詢參數是否校正通過。
curl -i -X PUT -H "Accept:application/json" -H "Content-Type:application/json" -d @$CONNECTOR_PROPERTIES_MAPPING http://localhost:8083/connector-plugins/io.confluent.connect.jdbc.JdbcSinkConnector/config/validate該指令會返回Connector參數中每個參數是否校正通過,若不通過,則errors屬性非空,如下所示。
"value":{
"name":"snapshot.mode",
"value":null,
"recommended_values":[
"never",
"initial_only",
"when_needed",
"initial",
"schema_only",
"schema_only_recovery"
],
"errors":[
"Value must be one of never, initial_only, when_needed, initial, schema_only, schema_only_recovery"
],
"visible":true
}情境四:擷取Topic中繼資料逾時
錯誤資訊:
org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata解決方案:出現此報錯的原因是您在啟動Connector任務前未按照命名規則提前建立資料庫目標Topic{database.server.name}和表格目標Topic{database.server.name}.{databaseName}.{tableName},建議您停用任務後建立目標Topic再啟用任務。
