All Products
Search
Document Center

PolarDB:Configure CDC to sync data changes

Last Updated:Aug 28, 2025

Change Data Capture (CDC) lets you capture real-time data modifications (INSERT/UPDATE/DELETE) in a database. You can then sync these changes as an event stream to downstream systems, such as data warehouses, analytics platforms such as Flink, or other database instances.

In a PolarDB for PostgreSQL (distributed edition) cluster, setting up CDC depends on where the data originates:

  • Distributed tables: Data changes occur on individual data nodes (DNs). To obtain a complete set of changes, your downstream system must subscribe to every DN.

  • Replicated tables: Data changes are copied to all nodes. To prevent duplicate data, the primary compute node (CN) publishes a single event stream. Your downstream system needs to subscribe only to the primary CN.

Preparations

Before you begin, ensure that the required cluster parameters are set correctly. These parameters are necessary to enable logical replication.

Note

By default, these parameters are already configured in PolarDB for PostgreSQL (distributed edition). If your configuration is different, submit a ticket for support.

  • On the primary CN, run the following SQL statement to check if polar_cluster.enable_change_data_capture is on for all nodes.

    SELECT success, result FROM run_command_on_all_nodes($$ SHOW polar_cluster.enable_change_data_capture $$);

    The expected result is shown below. The result column for all nodes should be on.

     success | result 
    ---------+--------
     t       | on
     t       | on
     t       | on
     t       | on
  • On the primary CN, run the following SQL statement to check if wal_level is logical for all nodes.

    SELECT success, result FROM run_command_on_all_nodes($$ SHOW wal_level $$);

    The expected result is shown below. The result column for all nodes should be logical.

    success | result  
    ---------+---------
     t       | logical
     t       | logical
     t       | logical
     t       | logical

Step 1: Create a publication and replication slots on the publisher

Configuring the publisher, which is your PolarDB for PostgreSQL (distributed edition) cluster, involves two steps: creating a publication and creating replication slots.

Create a publication

A publication is a set of tables whose changes will be captured. You can execute this operation once on the primary CN, and the system automatically synchronizes it to all nodes.

CREATE PUBLICATION <publication_name> FOR TABLE <table_name1>, <table_name2>;
Note
  • Do not use the FOR ALL TABLES option. You must specify the tables to publish explicitly.

  • <publication_name> is the name of the publication. <table_name1>/<table_name2> specifies the distributed or replicated tables to publish. For distributed tables, you only need to publish the logical table names, not the names of their physical shards.

Create replication slots

Replication slots retain Write-Ahead Logging (WAL) logs for downstream subscribers, which prevents the logs from being deleted before they are consumed. Because data changes originate from the primary CN and all DNs, you must create replication slots on each of these nodes.

You can run the following SQL statement on the primary CN to create replication slots with the same name on all relevant nodes (the primary CN and all DNs) in a batch:

WITH nodes AS (
    SELECT
        nodename,
        nodeport,
        $$ SELECT pg_create_logical_replication_slot('<publication_slot_name>', 'pgoutput', false) $$ AS cmd
    FROM pg_dist_node
    WHERE nodeid = 1 OR shouldhaveshards = true
)
SELECT result.*
FROM (
    SELECT
        array_agg(nodename) as nodenames,
        array_agg(nodeport) as nodeports,
        array_agg(cmd) as cmds
    FROM nodes
) params,
LATERAL master_run_on_worker(nodenames, nodeports, cmds, true) AS result;
Note

<publication_slot_name> is the name of the replication slot.

The expected result is shown below. A value of t in the success column indicates that the replication slot was created successfully. If the creation fails, the result column shows the reason.

   node_name    | node_port | success |               result               
----------------+-----------+---------+------------------------------------
 10.xxx.xxx.xxx |      3007 | t       | (<publication_slot_name>,0/C024D7D0)
 10.xxx.xxx.xxx |      3006 | t       | (<publication_slot_name>,0/C33B6668)
 10.xxx.xxx.xxx |      3003 | t       | (<publication_slot_name>,0/C33949B0)
(3 rows)

Step 2: Create subscriptions on the subscriber

The subscriber, such as Debezium, Flink, or another PostgreSQL instance, must establish a separate subscription for each node that has a replication slot (the primary CN and all DNs). This is necessary to receive a complete stream of data changes.

Note

The configuration for each subscription is almost identical. You only need to change the host and port for each node.

Debezium example

If you use Debezium as the subscriber, you only need to change the database.hostname and database.port configuration items for each subscription. Set them to the primary endpoint and port of the corresponding node. The rest of the configuration is the same. The following example shows the configuration for subscribing to one DN:

{
    "name": "xxx",
    "config": {
        "connector.class" : "io.debezium.connector.postgresql.PostgresConnector",
        "database.hostname" : "<DN1_primary_endpoint>",
        "database.port" : "<DN1_port>",
        "database.user" : "<your_username>",
        "database.password" : "<your_password>",
        "database.dbname" : "postgres",
        "slot.name": "<replication_slot_name>",
        "publication.name": "<publication_name>",
        ...
    }
}

Sample patterns for PostgreSQL

If you use PostgreSQL as the subscriber, you only need to change the host and port parameters in the connection string for each subscription. Set them to the primary endpoint and port of the corresponding node. The rest of the configuration is the same. The following example shows how to subscribe to one DN:

CREATE SUBSCRIPTION test_subscription
    CONNECTION 'dbname=postgres host=<DN1_primary_endpoint> port=<DN1_port> user=<your_username> password=<your_password>'
    PUBLICATION <publication_name>
    WITH (create_slot=false, slot_name='<replication_slot_name>');

Step 3: Verify the CDC link

After you configure all subscribers, you can run the following SQL statement on the primary CN to check if the replication slots on all publishing nodes are active.

WITH nodes AS (
    SELECT
        nodename,
        nodeport,
        $$ SELECT active FROM pg_replication_slots WHERE slot_name = '<publication_slot_name>' $$ AS cmd
    FROM pg_dist_node
    WHERE nodeid = 1 OR shouldhaveshards = true
)
SELECT result.*
FROM (
    SELECT
        array_agg(nodename) as nodenames,
        array_agg(nodeport) as nodeports,
        array_agg(cmd) as cmds
    FROM nodes
) params,
LATERAL master_run_on_worker(nodenames, nodeports, cmds, true) AS result;

The expected result is shown below. A value of t in the result column indicates that the replication link between the node and its subscriber is active. You can now insert or modify data in the source table and check if the subscriber receives the changes.

  node_name     | node_port | success | result 
----------------+-----------+---------+--------
 10.xxx.xxx.xxx |      3007 | t       | t
 10.xxx.xxx.xxx |      3006 | t       | t
 10.xxx.xxx.xxx |      3003 | t       | t
(3 rows)

Maintenance and cleanup

If a downstream system no longer needs the data, you must stop the subscription. Then, promptly delete the replication slots on the publisher to free up WAL log space.

Note

Deleting a replication slot permanently removes its associated WAL logs. Any unconsumed data changes are lost and cannot be recovered. If you only need to temporarily pause the subscription, do not delete the replication slot.

You can run the following SQL statement on the primary CN to delete the replication slots from all relevant nodes in a batch:

WITH nodes AS (
    SELECT
        nodename,
        nodeport,
        $$ SELECT pg_drop_replication_slot('<publication_slot_name>') $$ AS cmd
    FROM pg_dist_node
    WHERE nodeid = 1 OR shouldhaveshards = true
)
SELECT result.*
FROM (
    SELECT
        array_agg(nodename) as nodenames,
        array_agg(nodeport) as nodeports,
        array_agg(cmd) as cmds
    FROM nodes
) params,
LATERAL master_run_on_worker(nodenames, nodeports, cmds, true) AS result;