Change Data Capture (CDC) captures row-level changes—INSERT, UPDATE, and DELETE—from PolarDB for PostgreSQL (distributed edition) and streams them to downstream systems such as data warehouses, analytics platforms like Flink, or other database instances.
PolarDB for PostgreSQL (distributed edition) uses PostgreSQL logical replication as the underlying protocol. The cluster consists of one primary compute node (CN) and one or more data nodes (DNs). How your tables are distributed determines which nodes your subscriber must connect to:
| Table type | Where changes originate | Subscriber must connect to |
|---|---|---|
| Distributed tables | Each DN independently | All DNs |
| Replicated tables | Primary CN (single event stream) | Primary CN only |
Key concepts
Before you begin, familiarize yourself with the three components involved in a CDC pipeline:
| Component | Description |
|---|---|
| Publication | A named set of tables on the publisher (your PolarDB cluster) whose changes will be captured. |
| Replication slot | A slot on each publishing node that tracks how far the subscriber has consumed the write-ahead log (WAL). The slot prevents the database from discarding changes before they are delivered. |
| Subscription | A connection from the subscriber to a publishing node. Each subscription consumes the change stream from one node independently. |
Prerequisites
Before you begin, verify that the two required parameters are set correctly on all nodes. These parameters enable logical decoding—the mechanism that extracts row-level changes from the WAL.
Check `polar_cluster.enable_change_data_capture`
This parameter activates CDC at the cluster level. Run the following statement on the primary CN:
SELECT success, result FROM run_command_on_all_nodes($$ SHOW polar_cluster.enable_change_data_capture $$);
Expected output: result is on for all nodes.
success | result
---------+--------
t | on
t | on
t | on
t | on
Check `wal_level`
This parameter instructs the PostgreSQL server to write enough information to the WAL to support logical decoding. Run the following statement on the primary CN:
SELECT success, result FROM run_command_on_all_nodes($$ SHOW wal_level $$);
Expected output: result is logical for all nodes.
success | result
---------+---------
t | logical
t | logical
t | logical
t | logical
By default, these parameters are already configured in PolarDB for PostgreSQL (distributed edition). If your output differs, submit a ticket for support.
Step 1: Create a publication and replication slots on the publisher
The publisher is your PolarDB for PostgreSQL (distributed edition) cluster. This step has two parts: creating a publication that defines which tables to capture, and creating replication slots on every node that generates change events.
Create a publication
A publication defines the set of tables whose changes will be captured. Run this statement once on the primary CN—the system propagates it to all nodes automatically.
CREATE PUBLICATION <publication_name> FOR TABLE <table_name1>, <table_name2>;
-
Do not use
FOR ALL TABLES. Specify each table explicitly. -
For distributed tables, use the logical table name, not the names of the physical shards.
Create replication slots
Replication slots track how far each subscriber has consumed the WAL, preventing the database from discarding changes before they are delivered. Because data changes in a distributed cluster originate from the primary CN and each DN, you need a replication slot on every one of those nodes.
Replication slots that are not consumed cause WAL to accumulate on disk. If a subscriber stops consuming events, disk usage grows until the slot is dropped or the subscriber catches up. Drop replication slots promptly when they are no longer needed.
Run the following statement on the primary CN to create identically named replication slots on the primary CN and all DNs in a single batch:
WITH nodes AS (
SELECT
nodename,
nodeport,
$$ SELECT pg_create_logical_replication_slot('<publication_slot_name>', 'pgoutput', false) $$ AS cmd
FROM pg_dist_node
WHERE nodeid = 1 OR shouldhaveshards = true
)
SELECT result.*
FROM (
SELECT
array_agg(nodename) as nodenames,
array_agg(nodeport) as nodeports,
array_agg(cmd) as cmds
FROM nodes
) params,
LATERAL master_run_on_worker(nodenames, nodeports, cmds, true) AS result;
Replace <publication_slot_name> with your replication slot name.
Expected output: success is t for each node. If a slot fails to create, the result column shows the error.
node_name | node_port | success | result
----------------+-----------+---------+------------------------------------
10.xxx.xxx.xxx | 3007 | t | (<publication_slot_name>,0/C024D7D0)
10.xxx.xxx.xxx | 3006 | t | (<publication_slot_name>,0/C33B6668)
10.xxx.xxx.xxx | 3003 | t | (<publication_slot_name>,0/C33949B0)
(3 rows)
Step 2: Create subscriptions on the subscriber
The subscriber—Debezium, Flink, or another PostgreSQL instance—must create a separate subscription for each node that has a replication slot: the primary CN and every DN. Each subscription connects to one node and consumes its change stream independently.
The following examples show two common subscriber configurations. Other downstream tools follow the same pattern—only the connection parameters differ.
The only difference between subscriptions for different nodes is the host (or database.hostname) and port (or database.port). All other parameters remain the same.
Debezium
Each Debezium connector instance connects to one node. Set database.hostname and database.port to the primary endpoint and port of the target node. The following example connects to one DN:
{
"name": "xxx",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "<DN1_primary_endpoint>",
"database.port": "<DN1_port>",
"database.user": "<your_username>",
"database.password": "<your_password>",
"database.dbname": "postgres",
"slot.name": "<replication_slot_name>",
"publication.name": "<publication_name>",
...
}
}
Create one connector per node (primary CN and each DN), changing only database.hostname and database.port.
PostgreSQL
If PostgreSQL is the subscriber, use CREATE SUBSCRIPTION with create_slot=false because the slots were already created on the publisher in Step 1. Set host and port to the primary endpoint and port of the target node. The following example subscribes to one DN:
CREATE SUBSCRIPTION test_subscription
CONNECTION 'dbname=postgres host=<DN1_primary_endpoint> port=<DN1_port> user=<your_username> password=<your_password>'
PUBLICATION <publication_name>
WITH (create_slot=false, slot_name='<replication_slot_name>');
Create one subscription per node (primary CN and each DN), changing only host and port.
Step 3: Verify the CDC link
After configuring all subscribers, check that every replication slot is active. Run the following statement on the primary CN:
WITH nodes AS (
SELECT
nodename,
nodeport,
$$ SELECT active FROM pg_replication_slots WHERE slot_name = '<publication_slot_name>' $$ AS cmd
FROM pg_dist_node
WHERE nodeid = 1 OR shouldhaveshards = true
)
SELECT result.*
FROM (
SELECT
array_agg(nodename) as nodenames,
array_agg(nodeport) as nodeports,
array_agg(cmd) as cmds
FROM nodes
) params,
LATERAL master_run_on_worker(nodenames, nodeports, cmds, true) AS result;
Expected output: result is t for each node, indicating an active replication link between that node and its subscriber. A value of f means the subscriber for that node is not connected.
node_name | node_port | success | result
----------------+-----------+---------+--------
10.xxx.xxx.xxx | 3007 | t | t
10.xxx.xxx.xxx | 3006 | t | t
10.xxx.xxx.xxx | 3003 | t | t
(3 rows)
Once all links are active, insert or modify data in a source table and confirm that the subscriber receives the changes.
Maintenance and cleanup
When a downstream system no longer needs the data, stop the subscription and drop the replication slots immediately to free WAL disk space.
Dropping a replication slot permanently deletes its associated WAL. Any unconsumed changes are lost and cannot be recovered. If you only need to pause a subscription temporarily, do not drop the slot.
Run the following statement on the primary CN to drop replication slots from all relevant nodes in a single batch:
WITH nodes AS (
SELECT
nodename,
nodeport,
$$ SELECT pg_drop_replication_slot('<publication_slot_name>') $$ AS cmd
FROM pg_dist_node
WHERE nodeid = 1 OR shouldhaveshards = true
)
SELECT result.*
FROM (
SELECT
array_agg(nodename) as nodenames,
array_agg(nodeport) as nodeports,
array_agg(cmd) as cmds
FROM nodes
) params,
LATERAL master_run_on_worker(nodenames, nodeports, cmds, true) AS result;