本文介紹如何建立Debezium PostgreSQL Source Connector,將PostgreSQL的資料同步至雲訊息佇列 Kafka 版。
使用限制
Debezium PostgreSQL Source Connector只能配置一個Task用於消費源端的CDC資料,不支援並發消費配置。
前提條件
步驟一:建立資料表
登入RDS管理主控台,建立RDS PostgreSQL執行個體。更多資訊,請參見建立RDS PostgreSQL執行個體。
建立執行個體時,請選擇與前提條件中已購買部署的Kafka執行個體相同的VPC,並將此VPC網段加入白名單。

執行個體建立完成後,在執行個體列表頁面單擊目標執行個體,然後在執行個體詳情頁面的左側導覽列,完成以下操作。
在執行個體詳情頁面,單擊登入資料庫進入Data Management服務平台,完成以下操作。
按右鍵目標資料庫,選擇模式管理,然後單擊建立模式建立新的模式(Schema)。
說明Connector配置中需使用新建立的Schema,不能用系統Schema(information、pg_catalog、public)代替。
在建立的Schema中,使用SQL語句建立表格。例如,建立一個列參數分別為id和number的表格,命令如下。更多資訊,請參見SQL Commands。
CREATE TABLE sql_table(id INT ,number INT);執行以下命令,建立初始化wal2json外掛程式,開啟資料訂閱能力。
SELECT 'init' FROM pg_create_logical_replication_slot('test_slot', 'wal2json');
步驟二:建立Connector任務
下載Debezium PostgreSQL Source Connector檔案,上傳至提前建立好的OSS Bucket。更多資訊,請參見控制台上傳檔案。
重要下載Debezium PostgreSQL Connector檔案時請選擇適配Java 8的版本。
登入雲訊息佇列 Kafka 版控制台,在概览頁面的资源分布地區,選擇地區。
在左側導覽列,選擇。
在任务列表頁面,單擊创建任务。
在创建任务面板,設定任務名稱,配置以下配置項。
任務建立
在Source(源)設定精靈,選擇数据提供方為Apache Kafka Connect,單擊下一步。
在连接器配置設定精靈,設定以下配置項,然後單擊下一步。
配置項
參數
說明
Kafka Connect外掛程式
Bucket儲存桶
選擇OSS Bucket。
檔案
選擇上傳的.ZIP檔案。
Kafka資源資訊
Kafka參數配置
選擇Source Connect。
Kafka執行個體
選擇前提條件中建立的執行個體。
Virtual Private Cloud
選擇VPC ID。
交換器
選擇vSwitch ID。
安全性群組
選擇安全性群組。
Kafka Connect配置資訊
解析當前ZIP包下的properties檔案
選擇建立properties檔案。在輸入框中更新相關欄位的取值。
Connector全量參數,請參見Debezium Connector Properties。
在实例配置設定精靈,設定以下參數,然後單擊下一步。
配置項
參數
說明
Worker規格
Worker規格
選擇合適的Worker規格。
最小Worker數
設定為1。
最大Worker數
設定為1。
Kafka Connect Worker配置
自動建立Kafka Connect Worker依賴資源
建議勾選此項,會在選擇的Kafka執行個體中自動建立Kafka Connect運行所需的一些Internal Topic以及ConsumerGroup,並將這些必填配置自動填入配置框中。包括以下配置項:
Offset Topic:用於儲存來源資料位移量,命名規則為
connect-eb-offset-<任務名稱>。Config Topic:用於儲存Connectors以及Tasks的配置資訊,命名規則為
connect-eb-config-<任務名稱>。Status Topic:用於儲存Connectors以及Tasks狀態資訊,命名規則為
connect-eb-status-<任務名稱>。Kafka Connect Consumer Group:Kafka Connect Worker用於消費Internal Topics的消費組,命名規則為
connect-eb-cluster-<任務名稱>。Kafka Source Connector Consumer Group:只針對Sink Connector有效,用於消費源Kafka Topic中的資料,命名規則為
connector-eb-cluster-<任務名稱>-<connector名稱>。
在运行配置設定精靈,將日誌投遞方式設定為投遞至SLS或者投遞至Kafka,在角色授權卡片設定Connect依賴的角色配置,然後單擊儲存。
重要建議配置的角色包含AliyunSAEFullAccess許可權,否則可能會導致任務運行失敗。
任務屬性
設定此任務的重試策略及無效信件佇列。更多資訊,請參見重試和死信。
等待任務狀態變為運行中,此時Connector已經在正常工作中。
步驟三:測試Connector任務
在Data Management服務平台,向步驟一:建立資料表中建立的資料表插入一條資料。例如,插入一條id為123,number為20000的資料,命令如下。
INSERT INTO sql_table(id, number) VALUES(123,20000);登入雲訊息佇列 Kafka 版控制台,在執行個體列表頁面,單擊目標執行個體。
在目標執行個體頁面,單擊目標Topic(投遞資料前提前建立好的命名規則為
{database.server.name}.{schemaName}.{tableName}的Topic),然後單擊訊息查詢,查看插入的訊息資料,訊息Value樣本如下。{"before":null,"after":{"id":123,"number":20000},"source":{"version":"0.9.2.Final","connector":"postgresql","name":"test-prefix","db":"wb","ts_usec":168386295815075****,"txId":10339,"lsn":412719****,"schema":"test_schema","table":"sql_table","snapshot":false,"last_snapshot_record":null},"op":"c","ts_ms":168386295****}
常見報錯
情境一:所有Tasks運行失敗
錯誤資訊:
All tasks under connector mongo-source failed, please check the error trace of the task.解決方案:在訊息流程入任務詳情頁面,單擊基礎資訊地區的診斷連結,即可跳轉到Connector監控頁面,可以看到Tasks運行失敗的詳細錯誤資訊。
情境二:Kafka Connect退出
錯誤資訊:
Kafka connect exited! Please check the error log /opt/kafka/logs/connect.log on sae application to find out the reason why kafka connect exited and update the event streaming with valid arguments to solve it.解決方案:由於狀態擷取可能會有延遲,建議您先嘗試重新整理頁面。若重新整理後仍然是失敗狀態,您可以按照以下步驟查看錯誤資訊。
在訊息流程入任務詳情頁面的Worker資訊地區,單擊SAE應用後的執行個體名稱,跳轉到SAE應用詳情頁面。
在基本資料頁面,單擊執行個體部署資訊頁簽。
在執行個體右側操作列,單擊Webshell登入Kafka Connect運行環境。

執行
vi /home/admin/connector-bootstrap.log命令,查看Connector開機記錄,尋找其中是否包含錯誤資訊。執行
vi /opt/kafka/logs/connect.log命令,查看Connector作業記錄,在其中尋找ERROR或者WARN欄位來查看是否有錯誤資訊。
基於錯誤資訊提示進行修複操作後,可以重新啟動對應任務。
情境三:Connector參數校正失敗
錯誤資訊:
Start or update connector xxx failed. Error code=400. Error message=Connector configuration is invalid and contains the following 1 error(s):
Value must be one of never, initial_only, when_needed, initial, schema_only, schema_only_recovery
You can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`解決方案:此時需要根據錯誤資訊,找出具體哪個參數出錯,更新對應參數即可。若基於上述錯誤資訊無法定位具體的出錯參數,可以參考上文情境二中的步驟登入Kafka Connect運行環境,執行以下命令,查詢參數是否校正通過。
curl -i -X PUT -H "Accept:application/json" -H "Content-Type:application/json" -d @$CONNECTOR_PROPERTIES_MAPPING http://localhost:8083/connector-plugins/io.confluent.connect.jdbc.JdbcSinkConnector/config/validate該指令會返回Connector參數中每個參數是否校正通過,若不通過,則errors屬性非空,如下所示。
"value":{
"name":"snapshot.mode",
"value":null,
"recommended_values":[
"never",
"initial_only",
"when_needed",
"initial",
"schema_only",
"schema_only_recovery"
],
"errors":[
"Value must be one of never, initial_only, when_needed, initial, schema_only, schema_only_recovery"
],
"visible":true
}