全部產品
Search
文件中心

PolarDB:配置CDC以同步資料變更

更新時間:Sep 12, 2025

變更資料擷取(Change Data Capture, CDC)功能允許您即時捕獲資料庫中的資料修改(INSERT/UPDATE/DELETE),並將其以事件流的形式同步到下遊系統,如資料倉儲、分析平台(如Flink)或其他資料庫執行個體。

PolarDB PostgreSQL分布式版叢集中,搭建CDC的關鍵在於理解資料的來源:

  • 分布表:資料變更發生在各個資料節點(DN)上,因此下遊需要分別訂閱所有DN節點才能擷取完整的變更資料。

  • 複製表:資料變更會同步到所有節點,但為了避免重複,變更流統一由主計算節點(主CN)發布,下遊只需訂閱主CN即可。

準備工作:檢查與配置

在開始之前,請確保資料庫叢集的相關參數已正確設定。這些參數是開啟邏輯複製功能的前提。

說明

預設情況下,PolarDB PostgreSQL分布式版已配置好以下參數。如果檢查結果不符,請提交工單聯絡我們處理。

  • 在主CN節點執行以下 SQL,檢查所有節點的polar_cluster.enable_change_data_capture是否為on

    SELECT success, result FROM run_command_on_all_nodes($$ SHOW polar_cluster.enable_change_data_capture $$);

    預期結果如下:所有節點的result列都應為on

     success | result 
    ---------+--------
     t       | on
     t       | on
     t       | on
     t       | on
  • 在主CN節點上執行以下SQL,檢查所有節點的wal_level是否為logical

    SELECT success, result FROM run_command_on_all_nodes($$ SHOW wal_level $$);

    預期結果如下:所有節點的result列都應為logical

    success | result  
    ---------+---------
     t       | logical
     t       | logical
     t       | logical
     t       | logical

步驟一:在發布端建立發布和複製槽

發布端(即您的PolarDB PostgreSQL分布式版叢集)的配置分為兩步:建立發布(Publication)和建立複製槽(Replication Slot)。

建立發布

發布是定義哪些表的變更需要被捕獲的集合。此操作只需在主CN上執行一次,系統會自動將其同步到所有節點。

CREATE PUBLICATION <publication_name> FOR TABLE <table_name1>, <table_name2>;
說明
  • 請勿使用FOR ALL TABLES選項建立發布,應明確指定需要發布的表。

  • <publication_name>為發布名,<table_name1>/<table_name2>為要發布的分布表/複製表。發布端僅需發布分布表的邏輯表名,無需發布物理分區的表名。

建立複製槽

複製槽用於為下遊訂閱者保留WAL日誌,防止其在被消費前被清理。由於資料變更來源於主CN和所有DN,您需要在這些節點上都建立複製槽。

在主CN節點上執行以下SQL,即可在所有相關節點(主CN和所有DN)上大量建立同名的複製槽:

WITH nodes AS (
    SELECT
        nodename,
        nodeport,
        $$ SELECT pg_create_logical_replication_slot('<publication_slot_name>', 'pgoutput', false) $$ AS cmd
    FROM pg_dist_node
    WHERE nodeid = 1 OR shouldhaveshards = true
)
SELECT result.*
FROM (
    SELECT
        array_agg(nodename) as nodenames,
        array_agg(nodeport) as nodeports,
        array_agg(cmd) as cmds
    FROM nodes
) params,
LATERAL master_run_on_worker(nodenames, nodeports, cmds, true) AS result;
說明

<publication_slot_name>為複製槽名稱。

預期的結果如下,其中success列為t表示複製槽建立成功。否則複製槽建立失敗,通過result列可以查看具體原因:

   node_name    | node_port | success |               result               
----------------+-----------+---------+------------------------------------
 10.xxx.xxx.xxx |      3007 | t       | (<publication_slot_name>,0/C024D7D0)
 10.xxx.xxx.xxx |      3006 | t       | (<publication_slot_name>,0/C33B6668)
 10.xxx.xxx.xxx |      3003 | t       | (<publication_slot_name>,0/C33949B0)
(3 rows)

步驟二:在訂閱端建立訂閱

訂閱端(如Debezium、Flink或另一個PostgreSQL執行個體)需要為每一個建立了複製槽的節點(主CN和所有DN)分別建立一個訂閱串連,以接收完整的變更資料。

說明

每個訂閱串連的配置基本相同,只需修改主機地址和連接埠號碼。

Debezium樣本

如果使用Debezium作為訂閱端,那麼每個訂閱端僅database.hostnamedatabase.port兩個配置項需要修改為相應節點的主地址和連接埠號碼,其餘配置完全相同。以訂閱其中一個DN節點的配置為例:

{
    "name": "xxx",
    "config": {
        "connector.class" : "io.debezium.connector.postgresql.PostgresConnector",
        "database.hostname" : "<DN1主地址>",
        "database.port" : "<DN1連接埠>",
        "database.user" : "<您的使用者名稱>",
        "database.password" : "<您的密碼>",
        "database.dbname" : "postgres",
        "slot.name": "<複製槽名稱>",
        "publication.name": "<發布名稱>",
        ...
    }
}

PostgreSQL樣本

如果使用PostgreSQL作為訂閱端,那麼每個訂閱端僅hostport兩個配置項需要修改為相應節點的主地址和連接埠號碼,其餘配置完全相同。以訂閱其中一個DN節點的配置為例:

CREATE SUBSCRIPTION test_subscription
    CONNECTION 'dbname=postgres host=<DN1主地址> port=<DN1連接埠> user=<您的使用者名稱> password=<您的密碼>'
    PUBLICATION <發布名稱>
    WITH (create_slot=false, slot_name='<複製槽名稱>');

步驟三:驗證CDC鏈路

在所有訂閱端配置完成後,您可以在主CN上執行以下SQL,檢查所有發布節點的複製槽是否都已啟用。

WITH nodes AS (
    SELECT
        nodename,
        nodeport,
        $$ SELECT active FROM pg_replication_slots WHERE slot_name = '<publication_slot_name>' $$ AS cmd
    FROM pg_dist_node
    WHERE nodeid = 1 OR shouldhaveshards = true
)
SELECT result.*
FROM (
    SELECT
        array_agg(nodename) as nodenames,
        array_agg(nodeport) as nodeports,
        array_agg(cmd) as cmds
    FROM nodes
) params,
LATERAL master_run_on_worker(nodenames, nodeports, cmds, true) AS result;

預期得到的結果如下。如果result列值為t則表示相應節點與訂閱端的複製關係已經建立。此時,您可以在源端表中插入或修改資料,並觀察訂閱端是否能接收到變更。

  node_name     | node_port | success | result 
----------------+-----------+---------+--------
 10.xxx.xxx.xxx |      3007 | t       | t
 10.xxx.xxx.xxx |      3006 | t       | t
 10.xxx.xxx.xxx |      3003 | t       | t
(3 rows)

維護與清理

如果下遊不再需要訂閱資料,則應該在下遊停止訂閱後,及時清理髮布端的複製槽,以釋放WAL日誌空間。

說明

清理複製槽將導致關聯的WAL日誌被永久刪除,未被消費的資料變更將丟失且無法恢複。如果只是臨時暫停,請勿執行此操作清理複製槽。

在主CN上執行以下SQL,即可在所有相關節點上大量刪除複製槽:

WITH nodes AS (
    SELECT
        nodename,
        nodeport,
        $$ SELECT pg_drop_replication_slot('<publication_slot_name>') $$ AS cmd
    FROM pg_dist_node
    WHERE nodeid = 1 OR shouldhaveshards = true
)
SELECT result.*
FROM (
    SELECT
        array_agg(nodename) as nodenames,
        array_agg(nodeport) as nodeports,
        array_agg(cmd) as cmds
    FROM nodes
) params,
LATERAL master_run_on_worker(nodenames, nodeports, cmds, true) AS result;