AnalyticDB for PostgreSQL includes a self-developed Change Data Capture (CDC) connector that uses PostgreSQL's logical replication feature to capture full and incremental data changes in real time. The connector integrates natively with Flink, making it straightforward to build real-time data synchronization and stream processing pipelines.
This topic describes how to use Realtime Compute for Apache Flink with the CDC connector to subscribe to full and incremental data from AnalyticDB for PostgreSQL.
Limitations
-
This feature requires an AnalyticDB for PostgreSQL V7.0 instance running minor engine version 7.2.1.4 or later. Check the minor version on the Basic Information page of your instance. If your instance does not meet this requirement, update the minor version. For instructions on viewing the minor version, see View the minor engine version.
-
The serverless mode of AnalyticDB for PostgreSQL is not supported.
Prerequisites
Before you begin, ensure that you have:
-
An AnalyticDB for PostgreSQL instance and a fully managed Flink workspace in the same VPC
-
The following parameter settings configured on the AnalyticDB for PostgreSQL instance:
-
wal_levelset tologicalto enable logical replication -
For High-availability Edition instances:
hot_standby,hot_standby_feedback, andsync_replication_slotsall set toonto prevent logical subscription interruptions during a primary/secondary failover
-
-
An initial account or a privileged user with the RDS_SUPERUSER permission granted the REPLICATION privilege:
ALTER USER <username> WITH REPLICATION; -
The CIDR block of the Flink workspace added to a whitelist of the AnalyticDB for PostgreSQL instance
-
The flink-sql-connector-adbpg-cdc-3.3.jar file downloaded and uploaded as a custom connector to your Flink workspace
Procedure
Step 1: Prepare a test table and test data
-
Log on to the AnalyticDB for PostgreSQL console. Find the instance and click the instance ID.
-
In the lower-right corner of the Basic Information page, click Log On to Database.
-
Create a test database, a schema, and a source table named
adbpg_source_table. Then insert 50 rows of test data.-- Create a test database. CREATE DATABASE testdb; -- Switch to the testdb database and create a schema. CREATE SCHEMA testschema; -- Create a source table. CREATE TABLE testschema.adbpg_source_table( id int, username text, PRIMARY KEY(id) ); -- Insert 50 rows of data. INSERT INTO testschema.adbpg_source_table(id, username) SELECT i, 'username'||i::text FROM generate_series(1, 50) AS t(i); -
Create a sink table named
adbpg_sink_tablefor Flink to write results to.CREATE TABLE testschema.adbpg_sink_table( id int, username text, score int );
Step 2: Create a Flink job
-
Log on to the Realtime Compute for Apache Flink console. On the Fully Managed Flink tab, find the workspace and click Console in the Actions column.
-
In the left navigation pane, choose Development > ETL.
-
In the top menu bar, click
. Select New Blank Stream Draft and set the following parameters.Parameter Description Example Name The name of the draft. The name must be unique within the current project. adbpg-test Engine Version The Flink engine version. For details on versions and lifecycle dates, see Engine version. vvr-6.0.7-flink-1.15 -
Click Create.
Step 3: Write the job code and deploy the job
-
Copy the following SQL into the editor. The job creates three tables: a Datagen source that generates streaming data, a CDC source that captures changes from AnalyticDB for PostgreSQL, and a sink that writes results back to AnalyticDB for PostgreSQL.
-- Datagen source: generates sequential IDs (1-100) with random scores (70-100). CREATE TEMPORARY TABLE datagen_source ( id INT, score INT ) WITH ( 'connector' = 'datagen', 'fields.id.kind'='sequence', 'fields.id.start'='1', 'fields.id.end'='100', 'fields.score.kind'='random', 'fields.score.min'='70', 'fields.score.max'='100' ); -- CDC source: captures data changes from adbpg_source_table using the adbpg-cdc connector. CREATE TEMPORARY TABLE source_adbpg( id int, username varchar, PRIMARY KEY(id) NOT ENFORCED ) WITH( 'connector' = 'adbpg-cdc', 'hostname' = 'gp-bp16v8cgx46ns****-master.gpdb.rds.aliyuncs.com', 'port' = '5432', 'username' = 'account****', 'password' = 'password****', 'database-name' = 'testdb', 'schema-name' = 'testschema', 'table-name' = 'adbpg_source_table', 'slot.name' = 'flink', 'decoding.plugin.name' = 'pgoutput' ); -- Sink: writes the joined results to adbpg_sink_table. CREATE TEMPORARY TABLE sink_adbpg ( id int, username varchar, score int ) WITH ( 'connector' = 'adbpg', 'url' = 'jdbc:postgresql://gp-bp16v8cgx46ns****-master.gpdb.rds.aliyuncs.com:5432/testdb', 'tablename' = 'testschema.adbpg_sink_table', 'username' = 'account****', 'password' = 'password****', 'maxRetryTimes' = '2', 'batchsize' = '5000', 'conflictMode' = 'ignore', 'writeMode' = 'insert', 'retryWaitTime' = '200' ); -- Join the two sources and write results to the sink. INSERT INTO sink_adbpg SELECT ts.id, ts.username, ds.score FROM datagen_source AS ds JOIN source_adbpg AS ts ON ds.id = ts.id;Replace the
hostname,username,password, andurlplaceholders with your actual instance values. Thehostnameis the internal endpoint, available on the Basic Information page of the instance. For a full description of connector parameters, see Connector parameters. -
In the upper-right corner of the SQL Editor page, click Validate to check for syntax errors.
-
Click Deploy, then click Confirm.
-
In the upper-right corner, click Operations. On the Deployments page, click Start.
Step 4: View the data written by Flink
-
Run the following queries in
testdbto check the data written by the Flink job.SELECT * FROM testschema.adbpg_sink_table; SELECT COUNT(*) FROM testschema.adbpg_sink_table; -
Insert 50 more rows into the source table, then verify that Flink captures and writes the incremental data.
-- Insert 50 incremental rows (IDs 51-100) into the source table. INSERT INTO testschema.adbpg_source_table(id, username) SELECT i, 'username'||i::text FROM generate_series(51, 100) AS t(i); -- Verify the incremental data in the sink table. SELECT COUNT(*) FROM testschema.adbpg_sink_table WHERE id > 50;The expected result:
count ------- 50 (1 row)
Connector parameters
All examples use the adbpg-cdc connector for source tables and the adbpg connector for sink tables.
| Parameter | Required | Data type | Default | Description |
|---|---|---|---|---|
connector |
Yes | STRING | — | The connector type. Set to adbpg-cdc for source tables and adbpg for sink tables. |
hostname |
Yes | STRING | — | The internal endpoint of the AnalyticDB for PostgreSQL instance. Find it on the Basic Information page of the instance. |
username |
Yes | STRING | — | The database account. |
password |
Yes | STRING | — | The database password. |
database-name |
Yes | STRING | — | The database name. |
schema-name |
Yes | STRING | — | The schema name. Supports regular expressions to subscribe to multiple schemas. |
table-name |
Yes | STRING | — | The table name. Supports regular expressions to subscribe to multiple tables. |
port |
Yes | INTEGER | 5432 |
The port of AnalyticDB for PostgreSQL. Fixed at 5432. |
decoding.plugin.name |
Yes | STRING | pgoutput |
The PostgreSQL logical decoding plugin. Fixed at pgoutput. |
slot.name |
Yes | STRING | — | The name of the logical decoding replication slot. See Notes on slot.name. |
url |
Yes (sink only) | STRING | — | The JDBC connection URL. Format: jdbc:postgresql://<Address>:<PortId>/<DatabaseName>. |
debezium.* |
No | STRING | — | Fine-grained Debezium client settings. For example, 'debezium.snapshot.mode' = 'never' disables the initial snapshot. See Debezium PostgreSQL connector properties. |
scan.incremental.snapshot.enabled |
No | BOOLEAN | false |
Set to true to enable incremental snapshots. |
scan.startup.mode |
No | STRING | initial |
The startup mode for data consumption. Valid values: initial (scans all historical data, then reads the latest write-ahead log (WAL)); latest-offset (skips historical data, reads from the end of the WAL); snapshot (scans all historical data and new WAL entries generated during the scan, then stops). |
changelog-mode |
No | STRING | ALL |
The changelog mode for encoding stream changes. ALL supports INSERT, DELETE, UPDATE_BEFORE, and UPDATE_AFTER. UPSERT supports only INSERT, DELETE, and UPDATE_AFTER. |
heartbeat.interval.ms |
No | DURATION | 30s |
The interval for sending heartbeat packets to keep the replication slot offset advancing. If your source tables are infrequently updated, set a reasonable heartbeat interval to trigger WAL cleanup and avoid disk space accumulation. |
scan.incremental.snapshot.chunk.key-column |
No | STRING | First primary key column | The column used for chunking during the snapshot phase. |
Notes on slot.name
Replication slot conflicts
For source tables in the same Flink job, use the same slot.name value. If different Flink jobs subscribe to the same table, assign a unique slot.name to each job. Reusing the same slot name across jobs causes the following error:
PSQLException: ERROR: replication slot "debezium" is active for PID 974
Replication slot cleanup
Flink does not automatically delete replication slots when a job stops. This is by design: if a job restarts, the slot preserves the WAL offset so no data is lost. However, if a job is permanently retired, its replication slot continues to hold WAL entries, preventing cleanup and consuming disk space. Delete the replication slot manually once you confirm a job will no longer restart.
Usage notes
Processing semantics
During normal operation, the CDC connector provides exactly-once semantics. In failure scenarios, only at-least-once semantics are guaranteed.
REPLICA IDENTITY and its effects
PostgreSQL's REPLICA IDENTITY setting controls how much information is included in WAL entries for UPDATE and DELETE events. There are four possible values:
-
DEFAULT: WAL entries contain only the previous values of primary key columns. -
NOTHING: WAL entries for UPDATE and DELETE contain no column information. -
FULL: WAL entries contain the previous values of all columns. -
INDEX index-name: WAL entries contain the previous values of the columns in the specified index.
The CDC connector automatically sets REPLICA IDENTITY to FULL on every subscribed table. This is required to capture the complete before-image of rows for UPDATE and DELETE events, which is necessary for accurate change data capture. Be aware of the following side effects:
-
Increased disk usage: Frequent UPDATE or DELETE operations produce larger WAL entries.
-
Reduced write throughput: High-concurrency write workloads may experience a performance impact.
-
Higher checkpoint pressure: Larger WAL logs mean checkpoints process more data, which can extend checkpoint duration.
Best practices
Flink CDC supports job development using the Flink SQL API or the DataStream API. It works well for synchronizing individual tables or a defined set of tables, and for stream computations such as joins across different data sources. The Flink framework provides exactly-once event processing semantics throughout the data pipeline.
Flink CDC is not suitable for synchronizing an entire PostgreSQL-compatible database. It does not support DDL synchronization, and you must define the schema of each table in Flink SQL, which makes maintenance complex at scale.
The following example demonstrates best practices for a Flink CDC SQL job that synchronizes data from AnalyticDB for PostgreSQL to ApsaraMQ for Kafka. Complete the prerequisites before proceeding.
Step 1: Prepare test tables
Create two source tables in the AnalyticDB for PostgreSQL instance.
CREATE TABLE products (
product_id SERIAL PRIMARY KEY,
product_name VARCHAR(200) NOT NULL,
sku CHAR(12) NOT NULL,
description TEXT,
price NUMERIC(10,2) NOT NULL,
discount_price DECIMAL(10,2),
stock_quantity INTEGER DEFAULT 0,
weight REAL,
volume DOUBLE PRECISION,
dimensions BOX,
release_date DATE,
is_featured BOOLEAN DEFAULT FALSE,
rating FLOAT,
warranty_period INTERVAL,
metadata JSON,
tags TEXT[]
);
CREATE TABLE documents (
document_id UUID PRIMARY KEY,
title VARCHAR(200) NOT NULL,
content TEXT,
summary TEXT,
publication_date TIMESTAMP WITHOUT TIME ZONE,
last_updated TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
author_id BIGINT,
file_data BYTEA,
xml_content XML,
json_metadata JSON,
reading_time INTERVAL,
is_public BOOLEAN DEFAULT TRUE,
views_count INTEGER DEFAULT 0,
category VARCHAR(50),
tags TEXT[]
);
Step 2: Prepare ApsaraMQ for Kafka resources
-
Add the CIDR block of the Flink workspace to the whitelist of the ApsaraMQ for Kafka instance.
-
Create the required resources in the ApsaraMQ for Kafka instance.
Step 3: Create a Flink job
-
Log on to the Realtime Compute for Apache Flink console. On the Fully Managed Flink tab, find the workspace and click Console in the Actions column.
-
In the left navigation pane, choose Development > ETL.
-
In the top menu bar, click
. Select New Blank Stream Draft and set the parameters.Parameter Description Example Name The name of the draft. The name must be unique within the current project. adbpg-test Engine Version The Flink engine version. For details on versions and lifecycle dates, see Engine version. vvr-6.0.7-flink-1.15 -
Click Create.
Step 4: Write the job code and deploy the job
-
Copy the following SQL into the editor and replace the placeholder values with your actual configuration.
-
Use one source for multiple tables. For multi-table synchronization, define a single
adbpg-cdcsource that covers all source tables. List all columns from all tables in this source definition, keeping only one copy of any shared column names. This creates only one replication slot in AnalyticDB for PostgreSQL, reducing load on the source database and simplifying maintenance. -
Specify multiple tables with the `table-name` parameter. Enclose table names in parentheses and separate them with
|, for example:'table-name' = '(table1|table2|table3)'. -
Control full vs. incremental sync with `debezium.snapshot.mode`. Setting it to
nevercaptures only incremental changes. Change it toinitialto capture both the full dataset and subsequent changes. -
Use `STATEMENT SET` for multiple INSERT statements. Wrap all
INSERT INTOstatements in aBEGIN STATEMENT SET; ... END;block when writing to more than one sink table.
-- One source captures changes from multiple tables simultaneously. -- Define all columns from all source tables here; keep only one if column names overlap. CREATE TEMPORARY TABLE ADBPGSource( table_name STRING METADATA FROM 'table_name' VIRTUAL, row_kind STRING METADATA FROM 'row_kind' VIRTUAL, product_id BIGINT, product_name STRING, sku STRING, description STRING, price STRING, discount_price STRING, stock_quantity INT, weight STRING, volume STRING, dimensions STRING, release_date STRING, is_featured BOOLEAN, rating FLOAT, warranty_period STRING, metadata STRING, tags STRING, document_id STRING, title STRING, content STRING, summary STRING, publication_date STRING, last_updated STRING, author_id BIGINT, file_data STRING, xml_content STRING, json_metadata STRING, reading_time STRING, is_public BOOLEAN, views_count INT, category STRING ) WITH ( 'connector' = 'adbpg-cdc', 'hostname' = 'gp-2zev887z58390***-master.gpdb.rds.aliyuncs.com', 'port' = '5432', 'username' = 'account****', 'password' = 'password****', 'database-name' = 'testdb', 'schema-name' = 'public', 'table-name' = '(products|documents)', 'slot.name' = 'flink', 'decoding.plugin.name' = 'pgoutput', 'debezium.snapshot.mode' = 'never' ); CREATE TEMPORARY TABLE KafkaProducts ( product_id BIGINT, product_name STRING, sku STRING, description STRING, price STRING, discount_price STRING, stock_quantity INT, weight STRING, volume STRING, dimensions STRING, release_date STRING, is_featured BOOLEAN, rating FLOAT, warranty_period STRING, metadata STRING, tags STRING, PRIMARY KEY(product_id) NOT ENFORCED ) WITH ( 'connector' = 'upsert-kafka', 'topic' = '****', 'properties.bootstrap.servers' = 'alikafka-post-cn-****-1-vpc.alikafka.aliyuncs.com:9092', 'key.format'='avro', 'value.format'='avro' ); CREATE TEMPORARY TABLE KafkaDocuments ( document_id STRING, title STRING, content STRING, summary STRING, publication_date STRING, last_updated STRING, author_id BIGINT, file_data STRING, xml_content STRING, json_metadata STRING, reading_time STRING, is_public BOOLEAN, views_count INT, category STRING, tags STRING, PRIMARY KEY(document_id) NOT ENFORCED ) WITH ( 'connector' = 'upsert-kafka', 'topic' = '****', 'properties.bootstrap.servers' = 'alikafka-post-cn-****-1-vpc.alikafka.aliyuncs.com:9092', 'key.format'='avro', 'value.format'='avro' ); -- Use a STATEMENT SET to route data from one source to multiple sink tables. BEGIN STATEMENT SET; -- Use the table_name METADATA column to route each row to the correct sink. INSERT INTO KafkaProducts SELECT product_id, product_name, sku, description, price, discount_price, stock_quantity, weight, volume, dimensions, release_date, is_featured, rating, warranty_period, metadata, tags FROM ADBPGSource WHERE table_name = 'products'; INSERT INTO KafkaDocuments SELECT document_id, title, content, summary, publication_date, last_updated, author_id, file_data, xml_content, json_metadata, reading_time, is_public, views_count, category, tags FROM ADBPGSource WHERE table_name = 'documents'; END;Note the following about this SQL job:
-
-
In the upper-right corner of the SQL Editor page, click Validate to check for syntax errors.
-
Click Deploy, then click OK.
-
In the upper-right corner, click Operations. On the Deployments page, click Start.
Step 5: Insert test data and verify
In the AnalyticDB for PostgreSQL instance, insert data into the source tables, then observe the message changes in the ApsaraMQ for Kafka topics.
INSERT INTO products (
product_name, sku, description, price, discount_price, stock_quantity, weight, volume, dimensions, release_date, is_featured, rating, warranty_period, metadata, tags
) VALUES (
'Test Product', 'Test-2025', 'A piece of test product data', 299.99, 279.99, 150, 50.5, 120.75, '(10,20),(30,40)', '2023-05-01', TRUE, 4.8, INTERVAL '1 year', '{"brand": "TechCo", "model": "X1"}', '{"Test1", "Test2"}'
);
What's next
For instructions on using Flink to read and write full datasets from AnalyticDB for PostgreSQL, see Read and write full data in real time using Flink.