全部產品
Search
文件中心

EventBridge:使用Debezium將PostgreSQL資料同步至訊息佇列Kafka版

更新時間:Dec 27, 2024

本文介紹如何建立Debezium PostgreSQL Source Connector,將PostgreSQL的資料同步至雲訊息佇列 Kafka 版

使用限制

Debezium PostgreSQL Source Connector只能配置一個Task用於消費源端的CDC資料,不支援並發消費配置。

前提條件

步驟一:建立資料表

  1. 登入RDS管理主控台,建立RDS PostgreSQL執行個體。更多資訊,請參見建立RDS PostgreSQL執行個體

    建立執行個體時,請選擇與前提條件中已購買部署的Kafka執行個體相同的VPC,並將此VPC網段加入白名單。加入白名單

  2. 執行個體建立完成後,在執行個體列表頁面單擊目標執行個體,然後在執行個體詳情頁面的左側導覽列,完成以下操作。

    1. 建立一個新帳號,也可使用已有帳號。更多資訊,請參見建立帳號和資料庫

    2. 建立一個資料庫,也可使用已有資料庫。更多資訊,請參見建立帳號和資料庫

    3. 單擊資料庫連接,記錄內網地址和連接埠號碼。

      內網地址

    4. 單擊參數設定,將wal_level參數的運行參數值修改為logical,修改完成後單擊提交參數

  3. 在執行個體詳情頁面,單擊登入資料庫進入Data Management服務平台,完成以下操作。

    1. 按右鍵目標資料庫,選擇模式管理,然後單擊建立模式建立新的模式(Schema)。

      說明

      Connector配置中需使用新建立的Schema,不能用系統Schema(information、pg_catalog、public)代替。

    2. 在建立的Schema中,使用SQL語句建立表格。例如,建立一個列參數分別為idnumber的表格,命令如下。更多資訊,請參見SQL Commands

      CREATE TABLE sql_table(id INT ,number INT);
    3. 執行以下命令,建立初始化wal2json外掛程式,開啟資料訂閱能力。

      SELECT 'init' FROM pg_create_logical_replication_slot('test_slot', 'wal2json');

步驟二:建立Connector任務

  1. 下載Debezium PostgreSQL Source Connector檔案,上傳至提前建立好的OSS Bucket。更多資訊,請參見控制台上傳檔案

    重要

    下載Debezium PostgreSQL Connector檔案時請選擇適配Java 8的版本。

  2. 登入雲訊息佇列 Kafka 版控制台,在概览頁面的资源分布地區,選擇地區。

  3. 在左側導覽列,選擇Connector生態整合 > 任务列表

  4. 任务列表頁面,單擊创建任务

  5. 创建任务面板,設定任務名稱,配置以下配置項。

    • 任務建立

      1. Source(源)設定精靈,選擇数据提供方Apache Kafka Connect,單擊下一步

      2. 连接器配置設定精靈,設定以下配置項,然後單擊下一步

        配置項

        參數

        說明

        Kafka Connect外掛程式

        Bucket儲存桶

        選擇OSS Bucket。

        檔案

        選擇上傳的.ZIP檔案。

        Kafka資源資訊

        Kafka參數配置

        選擇Source Connect。

        Kafka執行個體

        選擇前提條件中建立的執行個體。

        Virtual Private Cloud

        選擇VPC ID。

        交換器

        選擇vSwitch ID。

        安全性群組

        選擇安全性群組。

        Kafka Connect配置資訊

        解析當前ZIP包下的properties檔案

        選擇建立properties檔案。在輸入框中更新相關欄位的取值。

        展開查看欄位描述

        欄位名

        描述

        connector.class

        啟動並執行Connector包名稱,無需修改。

        database.dbname

        資料庫名稱。

        database.hostname

        填寫步驟一:建立資料表中擷取的內網地址。

        database.port

        填寫步驟一:建立資料表中擷取的連接埠號碼。

        database.user

        資料庫登入使用者名稱。

        database.password

        資料庫登入密碼。

        slot.name

        資料庫邏輯複製流的名稱。

        table.whitelist

        資料庫表格列表。不同表格之間用英文逗號(,)分隔,表格的指定規則為{schemaName}.{tableName}

        database.server.name

        目標Topic首碼。目標Topic的命名規範為{database.server.name}.{schemaName}.{tableName}

        重要

        在投遞資料前,請按照命名規範提前建立好目標Topic。

        展開查看範例程式碼

        connector.class=io.debezium.connector.postgresql.PostgresConnector
        database.dbname=test_database
        database.hostname=pgm-xxx.pg.rds.aliyuncs.com
        database.password=xxx
        database.port=5432
        database.user=xxx
        name=debezium-psql-source
        # 外掛程式名稱,本文情境下使用的是wal2json。有效取值包含decoderbufs、wal2json、wal2json_rds、wal2json_streaming以及wal2json_rds_streaming。
        plugin.name=wal2json
        slot.drop_on_stop=true
        slot.name=test_slot
        # 來源資料庫表格列表,不同表格之間用英文逗號(,)分隔,表格的指定規則為<schemaName>.<tableName>。
        table.whitelist=test_schema.test_table
        # 注意,這裡只能有1個task進行消費。
        tasks.max=1
        
        # 目標Topic首碼,目標Topic的命名規格為<database.server.name>.<schemaName>.<tableName>。
        # 例如在本文的情境下,schemaName是kafka_connect_schema,tableName是table2_with_pk
        # 那麼,這個表格的CDC資料會流入目標topic test-prefix.kafka_connect_schema.table2_with_pk
        database.server.name=test-prefix
        
        # 訊息Value格式轉換組件。
        value.converter=org.apache.kafka.connect.json.JsonConverter
        # 訊息 Value 內容中是否包含結構體schema資訊。
        value.converter.schemas.enable=false

        Connector全量參數,請參見Debezium Connector Properties

      3. 实例配置設定精靈,設定以下參數,然後單擊下一步

        配置項

        參數

        說明

        Worker規格

        Worker規格

        選擇合適的Worker規格。

        最小Worker數

        設定為1。

        最大Worker數

        設定為1。

        Kafka Connect Worker配置

        自動建立Kafka Connect Worker依賴資源

        建議勾選此項,會在選擇的Kafka執行個體中自動建立Kafka Connect運行所需的一些Internal Topic以及ConsumerGroup,並將這些必填配置自動填入配置框中。包括以下配置項:

        • Offset Topic:用於儲存來源資料位移量,命名規則為connect-eb-offset-<任務名稱>

        • Config Topic:用於儲存Connectors以及Tasks的配置資訊,命名規則為connect-eb-config-<任務名稱>

        • Status Topic:用於儲存Connectors以及Tasks狀態資訊,命名規則為connect-eb-status-<任務名稱>

        • Kafka Connect Consumer Group:Kafka Connect Worker用於消費Internal Topics的消費組,命名規則為connect-eb-cluster-<任務名稱>

        • Kafka Source Connector Consumer Group:只針對Sink Connector有效,用於消費源Kafka Topic中的資料,命名規則為connector-eb-cluster-<任務名稱>-<connector名稱>

      4. 运行配置設定精靈,將日誌投遞方式設定為投遞至SLS或者投遞至Kafka,在角色授權卡片設定Connect依賴的角色配置,然後單擊儲存

        重要

        建議配置的角色包含AliyunSAEFullAccess許可權,否則可能會導致任務運行失敗。

    • 任務屬性

      設定此任務的重試策略及無效信件佇列。更多資訊,請參見重試和死信

    等待任務狀態變為運行中,此時Connector已經在正常工作中。

步驟三:測試Connector任務

  1. 在Data Management服務平台,向步驟一:建立資料表中建立的資料表插入一條資料。例如,插入一條id為123,number為20000的資料,命令如下。

    INSERT INTO sql_table(id, number) VALUES(123,20000);
  2. 登入雲訊息佇列 Kafka 版控制台,在執行個體列表頁面,單擊目標執行個體。

  3. 在目標執行個體頁面,單擊目標Topic(投遞資料前提前建立好的命名規則為{database.server.name}.{schemaName}.{tableName}的Topic),然後單擊訊息查詢,查看插入的訊息資料,訊息Value樣本如下。

    {"before":null,"after":{"id":123,"number":20000},"source":{"version":"0.9.2.Final","connector":"postgresql","name":"test-prefix","db":"wb","ts_usec":168386295815075****,"txId":10339,"lsn":412719****,"schema":"test_schema","table":"sql_table","snapshot":false,"last_snapshot_record":null},"op":"c","ts_ms":168386295****}

常見報錯

情境一:所有Tasks運行失敗

錯誤資訊:

All tasks under connector mongo-source failed, please check the error trace of the task.

解決方案:在訊息流程入任務詳情頁面,單擊基礎資訊地區的診斷連結,即可跳轉到Connector監控頁面,可以看到Tasks運行失敗的詳細錯誤資訊。

情境二:Kafka Connect退出

錯誤資訊:

Kafka connect exited! Please check the error log /opt/kafka/logs/connect.log on sae application to find out the reason why kafka connect exited and update the event streaming with valid arguments to solve it.

解決方案:由於狀態擷取可能會有延遲,建議您先嘗試重新整理頁面。若重新整理後仍然是失敗狀態,您可以按照以下步驟查看錯誤資訊。

  1. 在訊息流程入任務詳情頁面的Worker資訊地區,單擊SAE應用後的執行個體名稱,跳轉到SAE應用詳情頁面。

  2. 基本資料頁面,單擊執行個體部署資訊頁簽。

  3. 在執行個體右側操作列,單擊Webshell登入Kafka Connect運行環境。執行個體部署資訊

    • 執行vi /home/admin/connector-bootstrap.log命令,查看Connector開機記錄,尋找其中是否包含錯誤資訊。

    • 執行vi /opt/kafka/logs/connect.log命令,查看Connector作業記錄,在其中尋找ERROR或者WARN欄位來查看是否有錯誤資訊。

基於錯誤資訊提示進行修複操作後,可以重新啟動對應任務。

情境三:Connector參數校正失敗

錯誤資訊:

Start or update connector xxx failed. Error code=400. Error message=Connector configuration is invalid and contains the following 1 error(s):
Value must be one of never, initial_only, when_needed, initial, schema_only, schema_only_recovery
You can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`

解決方案:此時需要根據錯誤資訊,找出具體哪個參數出錯,更新對應參數即可。若基於上述錯誤資訊無法定位具體的出錯參數,可以參考上文情境二中的步驟登入Kafka Connect運行環境,執行以下命令,查詢參數是否校正通過。

curl -i -X PUT -H "Accept:application/json" -H  "Content-Type:application/json" -d @$CONNECTOR_PROPERTIES_MAPPING http://localhost:8083/connector-plugins/io.confluent.connect.jdbc.JdbcSinkConnector/config/validate

該指令會返回Connector參數中每個參數是否校正通過,若不通過,則errors屬性非空,如下所示。

"value":{
    "name":"snapshot.mode",
    "value":null,
    "recommended_values":[
        "never",
        "initial_only",
        "when_needed",
        "initial",
        "schema_only",
        "schema_only_recovery"
    ],
    "errors":[
        "Value must be one of never, initial_only, when_needed, initial, schema_only, schema_only_recovery"
    ],
    "visible":true
}