AnalyticDB for PostgreSQL provides a self-developed Change Data Capture (CDC) connector that subscribes to full and incremental data based on the logical replication feature of PostgreSQL. The connector seamlessly integrates with Flink. It efficiently captures real-time data changes from source tables for real-time data synchronization and stream processing. This helps enterprises quickly respond to dynamic data requirements. This topic describes how to use Realtime Compute for Apache Flink CDC to subscribe to full and incremental data from AnalyticDB for PostgreSQL in real time.
Limits
This feature is available only for AnalyticDB for PostgreSQL V7.0 instances that run minor engine version 7.2.1.4 or later.
NoteYou can view the minor version on the Basic Information page of an instance in the AnalyticDB for PostgreSQL console. If your instance does not meet the required versions, update the minor version of the instance.
The serverless mode of AnalyticDB for PostgreSQL is not supported.
Prerequisites
The AnalyticDB for PostgreSQL instance and the fully managed Flink workspace must be in the same VPC.
You must adjust the parameter settings for the AnalyticDB for PostgreSQL instance:
Enable logical replication by setting the
wal_levelparameter to logical.If you use a High-availability Edition instance of AnalyticDB for PostgreSQL, you must set the
hot_standby,hot_standby_feedback, andsync_replication_slotsparameters to on. This ensures that the logical subscription is not interrupted by a primary/secondary failover.
You must use an initial account or a privileged user with the RDS_SUPERUSER permission for the AnalyticDB for PostgreSQL instance. The user must be granted the REPLICATION privilege.
ALTER USER <username> WITH REPLICATION;.The CIDR block of the Flink workspace must be added to a whitelist of the AnalyticDB for PostgreSQL instance.
You must download flink-sql-connector-adbpg-cdc-3.3.jar and upload the CDC connector to your Flink workspace.
Procedure
Step 1: Prepare a test table and test data
Log on to the AnalyticDB for PostgreSQL console. Find the instance that you want to manage and click the instance ID.
In the lower-right corner of the Basic Information page, click Log On to Database.
Create a test database and a source table named adbpg_source_table. Then, insert 50 rows of data into the source table.
-- Create a test database. CREATE DATABASE testdb; -- Switch to the testdb database and create a schema. CREATE SCHEMA testschema; -- Create a source table named adbpg_source_table. CREATE TABLE testschema.adbpg_source_table( id int, username text, PRIMARY KEY(id) ); -- Insert 50 rows of data into the adbpg_source_table table. INSERT INTO testschema.adbpg_source_table(id, username) SELECT i, 'username'||i::text FROM generate_series(1, 50) AS t(i);Create a sink table named adbpg_sink_table for Flink to write the result data.
CREATE TABLE testschema.adbpg_sink_table( id int, username text, score int );
Step 2: Create a Flink job
Log on to the Realtime Compute for Apache Flink console. On the Fully Managed Flink tab, find the workspace that you want to manage and click Console in the Actions column.
In the navigation pane on the left, choose .
In the top menu bar, click
. Select New Blank Stream Draft, and set the parameters below.Parameter
Description
Example
Name
The name of the draft that you want to create.
NoteThe draft name must be unique in the current project.
adbpg-test
Engine Version
The engine version of Flink that is used by the draft. For more information about engine versions, version mappings, and important time points in the lifecycle of each version, see Engine version.
vvr-6.0.7-flink-1.15
Click Create.
Step 3: Write the job code and deploy the job
Create a source named datagen_source to generate analog data and a source named source_adbpg to capture real-time data changes from the AnalyticDB for PostgreSQL database. Then, join the two sources and write the results to a sink table named sink_adbpg. The processed data is written to AnalyticDB for PostgreSQL.
Copy the following job code to the editor.
---Create a Datagen source table to generate streaming data using the Datagen connector. CREATE TEMPORARY TABLE datagen_source ( id INT, score INT ) WITH ( 'connector' = 'datagen', 'fields.id.kind'='sequence', 'fields.id.start'='1', 'fields.id.end'='100', 'fields.score.kind'='random', 'fields.score.min'='70', 'fields.score.max'='100' ); --Create an adbpg source table to capture data changes of the adbpg_source_table table based on slot.name and pgoutput using the adbpg-cdc connector. CREATE TEMPORARY TABLE source_adbpg( id int, username varchar, PRIMARY KEY(id) NOT ENFORCED ) WITH( 'connector' = 'adbpg-cdc', 'hostname' = 'gp-bp16v8cgx46ns****-master.gpdb.rds.aliyuncs.com', 'port' = '5432', 'username' = 'account****', 'password' = 'password****', 'database-name' = 'testdb', 'schema-name' = 'testschema', 'table-name' = 'adbpg_source_table', 'slot.name' = 'flink', 'decoding.plugin.name' = 'pgoutput' ); --Create an adbpg sink table to write the processed results to the destination table adbpg_sink_table in the database. CREATE TEMPORARY TABLE sink_adbpg ( id int, username varchar, score int ) WITH ( 'connector' = 'adbpg', 'url' = 'jdbc:postgresql://gp-bp16v8cgx46ns****-master.gpdb.rds.aliyuncs.com:5432/testdb', 'tablename' = 'testschema.adbpg_sink_table', 'username' = 'account****', 'password' = 'password****', 'maxRetryTimes' = '2', 'batchsize' = '5000', 'conflictMode' = 'ignore', 'writeMode' = 'insert', 'retryWaitTime' = '200' ); -- Write the join results of the datagen_source and source_adbpg tables to the adbpg sink table. INSERT INTO sink_adbpg SELECT ts.id,ts.username,ds.score FROM datagen_source AS ds JOIN source_adbpg AS ts ON ds.id = ts.id;Parameters
Parameter
Required
Data type
Description
connector
Yes
STRING
The connector type. Set the value to
adbpg-cdcfor the source table andadbpgfor the sink table.hostname
Yes
STRING
The internal endpoint of the AnalyticDB for PostgreSQL instance. You can obtain the internal endpoint on the Basic Information page of the instance.
username
Yes
STRING
The database account and password of the AnalyticDB for PostgreSQL instance.
password
Yes
STRING
database-name
Yes
STRING
The name of the database.
schema-name
Yes
STRING
The name of the schema. This parameter supports regular expressions. You can subscribe to multiple schemas at a time.
table-name
Yes
STRING
The name of the table. This parameter supports regular expressions. You can subscribe to multiple tables at a time.
port
Yes
INTEGER
The port of AnalyticDB for PostgreSQL. The value is fixed at 5432.
decoding.plugin.name
Yes
STRING
The name of the PostgreSQL logical decoding plug-in. The value is fixed at pgoutput.
slot.name
Yes
STRING
The name of the logical decoding slot.
For source tables in the same Flink job, use the same value for
slot.name.If different Flink jobs involve the same table, set a unique
slot.namefor each job. This prevents the following error:PSQLException: ERROR: replication slot "debezium" is active for PID 974.
debezium.*
No
STRING
Controls the behavior of the Debezium client at a finer granularity. For example, setting
'debezium.snapshot.mode' = 'never'disables the snapshot feature. For more information, see the configuration properties.scan.incremental.snapshot.enabled
No
BOOLEAN
Specifies whether to enable incremental snapshots. Valid values:
false (default): Incremental snapshots are disabled.
true: Incremental snapshots are enabled.
scan.startup.mode
No
STRING
The startup mode for data consumption. Valid values:
initial (default): When the job starts for the first time, it scans all historical data and then reads the latest write-ahead logging (WAL) data. This provides a seamless transition between full and incremental data.
latest-offset: When the job starts for the first time, it does not scan historical data. It starts reading from the end of the WAL, which is the latest log position. It captures only the data changes that occur after the connector starts.
snapshot: Scans all historical data and reads the new WAL entries generated during the full scan. The job stops after the full scan is complete.
changelog-mode
No
STRING
The changelog mode for encoding stream changes. Valid values:
ALL (default): Supports all operation types, including
INSERT,DELETE,UPDATE_BEFORE, andUPDATE_AFTER.UPSERT: Supports only
UPSERToperations, includingINSERT,DELETE, andUPDATE_AFTER.
heartbeat.interval.ms
No
DURATION
The interval for sending heartbeat packets. The default value is 30 seconds. The unit is milliseconds.
The AnalyticDB for PostgreSQL CDC connector sends heartbeat packets to the database to ensure that the slot offset continuously advances. If table data does not change frequently, set this parameter to a reasonable value to promptly clean up WAL logs and avoid wasting disk space.
scan.incremental.snapshot.chunk.key-column
No
STRING
Specifies a column to use for chunking during the snapshot phase. By default, the first column of the primary key is selected.
url
Yes
STRING
The format is
jdbc:postgresql://<Address>:<PortId>/<DatabaseName>.In the upper-right corner of the SQL Editor page, click Validate to perform a syntax check.
Click Deploy, and then click Confirm.
In the upper-right corner, click Operations. On the Deployments page, click Start.
Step 4: View the data written by Flink
Execute the following statements in the test database to view the data written by Flink.
SELECT * FROM testschema.adbpg_sink_table; SELECT COUNT(*) FROM testschema.adbpg_sink_table;Insert 50 more rows of data into the source table. Then, check the total number of incremental data rows that Flink writes to the sink table.
-- Insert 50 rows of incremental data into the source table. INSERT INTO testschema.adbpg_source_table(id, username) SELECT i, 'username'||i::text FROM generate_series(51, 100) AS t(i); -- Check the new data in the destination table. SELECT COUNT(*) FROM testschema.adbpg_sink_table where id > 50;The result is shown below.
count ------- 50 (1 row)
Usage notes
Manage Replication Slots promptly to avoid wasting disk space.
To prevent data loss caused by the cleanup of WAL logs that correspond to a checkpoint during a Flink job restart, Flink does not automatically delete Replication Slots. Therefore, if you confirm that a Flink job no longer needs to be restarted, you must manually delete its corresponding Replication Slot to release the resources it occupies. In addition, if the confirmed position of a Replication Slot is not advanced for a long time, AnalyticDB for PostgreSQL cannot clean up the WAL entries after that position. This may cause unused WAL data to accumulate and consume a large amount of disk space.
During the normal operation of an AnalyticDB for PostgreSQL instance, exactly-once data processing semantics are guaranteed. However, in a failure scenario, only at-least-once semantics are supported.
The CDC connector changes the REPLICA IDENTITY parameter of the subscribed table to
FULLto ensure data synchronization consistency. This change has the following effects:Increased disk space usage. In scenarios with frequent update or delete operations, this setting increases the size of WAL logs, which results in increased disk space usage.
Degraded write performance. In scenarios with high-concurrency writes, performance may be significantly affected.
Increased checkpoint pressure. Larger WAL logs mean that checkpoints need to process more data, which may extend the time required for checkpoints.
Best practices
Flink CDC supports job development using the Flink SQL API or DataStream API. You can use Flink CDC to implement integrated full and incremental data synchronization for single or multiple tables in a source database. You can also perform computations such as table joins on disparate data sources. The Flink framework ensures exactly-once event processing semantics throughout the data processing procedure. However, Flink CDC is not suitable for synchronizing an entire PostgreSQL-compatible database. This is because it does not support DDL synchronization, and you must define the structure of each table in Flink SQL, which makes maintenance complex.
This section uses an example of synchronizing data from AnalyticDB for PostgreSQL to Kafka to describe the best practices for Flink CDC SQL job development. Before you develop the Flink CDC job, make sure that you have prepared and configured the resources as described in the Prerequisites section.
Step 1: Prepare test tables
Create two source tables in the AnalyticDB for PostgreSQL instance.
CREATE TABLE products (
product_id SERIAL PRIMARY KEY,
product_name VARCHAR(200) NOT NULL,
sku CHAR(12) NOT NULL,
description TEXT,
price NUMERIC(10,2) NOT NULL,
discount_price DECIMAL(10,2),
stock_quantity INTEGER DEFAULT 0,
weight REAL,
volume DOUBLE PRECISION,
dimensions BOX,
release_date DATE,
is_featured BOOLEAN DEFAULT FALSE,
rating FLOAT,
warranty_period INTERVAL,
metadata JSON,
tags TEXT[]
);
CREATE TABLE documents (
document_id UUID PRIMARY KEY,
title VARCHAR(200) NOT NULL,
content TEXT,
summary TEXT,
publication_date TIMESTAMP WITHOUT TIME ZONE,
last_updated TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
author_id BIGINT,
file_data BYTEA,
xml_content XML,
json_metadata JSON,
reading_time INTERVAL,
is_public BOOLEAN DEFAULT TRUE,
views_count INTEGER DEFAULT 0,
category VARCHAR(50),
tags TEXT[]
);Step 2: Prepare Kafka resources
Add the CIDR block of the Flink workspace to the whitelist of the Kafka instance.
Step 3: Create a Flink job
Log on to the Realtime Compute for Apache Flink console. On the Fully Managed Flink tab, find the workspace that you want to manage and click Console in the Actions column.
In the navigation pane on the left, choose .
In the top menu bar, click
. Select New Blank Stream Draft, and set the parameters below.Parameter
Description
Example
Name
The name of the draft that you want to create.
NoteThe draft name must be unique in the current project.
adbpg-test
Engine Version
The engine version of Flink that is used by the draft. For more information about engine versions, version mappings, and important time points in the lifecycle of each version, see Engine version.
vvr-6.0.7-flink-1.15
Click Create.
Step 4: Write the job code and deploy the job
Write an SQL job in the Flink workspace. Copy the following job code to the editor and replace the configurations with your actual values.
-- Use one source to capture data from multiple tables CREATE TEMPORARY TABLE ADBPGSource( table_name STRING METADATA FROM 'table_name' VIRTUAL, row_kind STRING METADATA FROM 'row_kind' VIRTUAL, product_id BIGINT, product_name STRING, sku STRING, description STRING, price STRING, discount_price STRING, stock_quantity INT, weight STRING, volume STRING, dimensions STRING, release_date STRING, is_featured BOOLEAN, rating FLOAT, warranty_period STRING, metadata STRING, tags STRING, document_id STRING, title STRING, content STRING, summary STRING, publication_date STRING, last_updated STRING, author_id BIGINT, file_data STRING, xml_content STRING, json_metadata STRING, reading_time STRING, is_public BOOLEAN, views_count INT, category STRING ) WITH ( 'connector' = 'adbpg-cdc', 'hostname' = 'gp-2zev887z58390***-master.gpdb.rds.aliyuncs.com', 'port' = '5432', 'username' = 'account****', 'password' = 'password****', 'database-name' = 'testdb', 'schema-name' = 'public', 'table-name' = '(products|documents)', 'slot.name' = 'flink', 'decoding.plugin.name' = 'pgoutput', 'debezium.snapshot.mode' = 'never' ); CREATE TEMPORARY TABLE KafkaProducts ( product_id BIGINT, product_name STRING, sku STRING, description STRING, price STRING, discount_price STRING, stock_quantity INT, weight STRING, volume STRING, dimensions STRING, release_date STRING, is_featured BOOLEAN, rating FLOAT, warranty_period STRING, metadata STRING, tags STRING, PRIMARY KEY(product_id) NOT ENFORCED ) WITH ( 'connector' = 'upsert-kafka', 'topic' = '****', 'properties.bootstrap.servers' = 'alikafka-post-cn-****-1-vpc.alikafka.aliyuncs.com:9092', 'key.format'='avro', 'value.format'='avro' ); CREATE TEMPORARY TABLE KafkaDocuments ( document_id STRING, title STRING, content STRING, summary STRING, publication_date STRING, last_updated STRING, author_id BIGINT, file_data STRING, xml_content STRING, json_metadata STRING, reading_time STRING, is_public BOOLEAN, views_count INT, category STRING, tags STRING, PRIMARY KEY(document_id) NOT ENFORCED ) WITH ( 'connector' = 'upsert-kafka', 'topic' = '****', 'properties.bootstrap.servers' = 'alikafka-post-cn-****-1-vpc.alikafka.aliyuncs.com:9092', 'key.format'='avro', 'value.format'='avro' ); -- Use a STATEMENT SET to wrap multiple statements BEGIN STATEMENT SET; -- Use the table_name METADATA to route data to the destination table INSERT INTO KafkaProducts SELECT product_id,product_name,sku,description,price,discount_price,stock_quantity,weight,volume,dimensions,release_date,is_featured,rating,warranty_period,metadata,tags FROM ADBPGSource WHERE table_name = 'products'; INSERT INTO KafkaDocuments SELECT document_id,title,content,summary,publication_date,last_updated,author_id,file_data,xml_content,json_metadata,reading_time,is_public,views_count,category,tags FROM ADBPGSource WHERE table_name = 'documents'; END;Note the following points about this SQL job:
For multi-table synchronization tasks, we recommend that you use one source table to capture data from multiple tables, as shown in this SQL example. You must define all columns from all source tables in this source table. If column names are duplicated, keep only one. When writing to the destination table, use the
METADATAtable_name to route data to the specified table. This approach requires creating only one Replication Slot in AnalyticDB for PostgreSQL. This reduces the resource usage of the source database, improves synchronization performance, and simplifies future maintenance.Use the
table-nameparameter to specify multiple source tables. Enclose the table names in parentheses and separate them with vertical bars (|), for example,(table1|table2|table3).Setting
debezium.snapshot.modetonevermeans that only incremental data from the source table is synchronized. To synchronize both full and incremental data, change the setting toinitial.
In the upper-right corner of the SQL Editor page, click Validate to perform a syntax check.
Click Deploy, and then click OK.
In the upper-right corner, click Operations. On the Deployments page, click Start.
Step 5: Insert test data
In the AnalyticDB for PostgreSQL instance, update the data in the two source tables and observe the message changes in the Kafka topic.
You can use the following SQL statement to insert test data:
INSERT INTO products (
product_name, sku, description, price, discount_price, stock_quantity, weight, volume, dimensions, release_date, is_featured, rating, warranty_period, metadata, tags
) VALUES (
'Test Product', 'Test-2025', 'A piece of test product data', 299.99, 279.99, 150, 50.5, 120.75, '(10,20),(30,40)', '2023-05-01', TRUE, 4.8, INTERVAL '1 year', '{"brand": "TechCo", "model": "X1"}', '{"Test1", "Test2"}'
);References
For more information about subscribing to the full data of AnalyticDB for PostgreSQL, see Read and write full data in real time using Flink.