All Products
Search
Document Center

AnalyticDB:Use Flink CDC to subscribe to full and incremental data in real time (invitational preview)

Last Updated:Dec 01, 2025

AnalyticDB for PostgreSQL provides a self-developed Change Data Capture (CDC) connector that subscribes to full and incremental data based on the logical replication feature of PostgreSQL. The connector seamlessly integrates with Flink. It efficiently captures real-time data changes from source tables for real-time data synchronization and stream processing. This helps enterprises quickly respond to dynamic data requirements. This topic describes how to use Realtime Compute for Apache Flink CDC to subscribe to full and incremental data from AnalyticDB for PostgreSQL in real time.

Limits

  • This feature is available only for AnalyticDB for PostgreSQL V7.0 instances that run minor engine version 7.2.1.4 or later.

    Note

    You can view the minor version on the Basic Information page of an instance in the AnalyticDB for PostgreSQL console. If your instance does not meet the required versions, update the minor version of the instance.

  • The serverless mode of AnalyticDB for PostgreSQL is not supported.

Prerequisites

  • The AnalyticDB for PostgreSQL instance and the fully managed Flink workspace must be in the same VPC.

  • You must adjust the parameter settings for the AnalyticDB for PostgreSQL instance:

    • Enable logical replication by setting the wal_level parameter to logical.

    • If you use a High-availability Edition instance of AnalyticDB for PostgreSQL, you must set the hot_standby, hot_standby_feedback, and sync_replication_slots parameters to on. This ensures that the logical subscription is not interrupted by a primary/secondary failover.

  • You must use an initial account or a privileged user with the RDS_SUPERUSER permission for the AnalyticDB for PostgreSQL instance. The user must be granted the REPLICATION privilege. ALTER USER <username> WITH REPLICATION;.

  • The CIDR block of the Flink workspace must be added to a whitelist of the AnalyticDB for PostgreSQL instance.

  • You must download flink-sql-connector-adbpg-cdc-3.3.jar and upload the CDC connector to your Flink workspace.

Procedure

Step 1: Prepare a test table and test data

  1. Log on to the AnalyticDB for PostgreSQL console. Find the instance that you want to manage and click the instance ID.

  2. In the lower-right corner of the Basic Information page, click Log On to Database.

  3. Create a test database and a source table named adbpg_source_table. Then, insert 50 rows of data into the source table.

    -- Create a test database.
    CREATE DATABASE testdb;
    -- Switch to the testdb database and create a schema.
    CREATE SCHEMA testschema;
    -- Create a source table named adbpg_source_table.
    CREATE TABLE testschema.adbpg_source_table(
      id int,
      username text,
      PRIMARY KEY(id)
    );
    -- Insert 50 rows of data into the adbpg_source_table table.
    INSERT INTO testschema.adbpg_source_table(id, username)
    SELECT i, 'username'||i::text
    FROM generate_series(1, 50) AS t(i);
  4. Create a sink table named adbpg_sink_table for Flink to write the result data.

    CREATE TABLE testschema.adbpg_sink_table(
      id int,
      username text,
      score int
    );

Step 2: Create a Flink job

  1. Log on to the Realtime Compute for Apache Flink console. On the Fully Managed Flink tab, find the workspace that you want to manage and click Console in the Actions column.

  2. In the navigation pane on the left, choose Development > ETL.

  3. In the top menu bar, click image. Select New Blank Stream Draft, and set the parameters below.

    Parameter

    Description

    Example

    Name

    The name of the draft that you want to create.

    Note

    The draft name must be unique in the current project.

    adbpg-test

    Engine Version

    The engine version of Flink that is used by the draft. For more information about engine versions, version mappings, and important time points in the lifecycle of each version, see Engine version.

    vvr-6.0.7-flink-1.15

  4. Click Create.

Step 3: Write the job code and deploy the job

  1. Create a source named datagen_source to generate analog data and a source named source_adbpg to capture real-time data changes from the AnalyticDB for PostgreSQL database. Then, join the two sources and write the results to a sink table named sink_adbpg. The processed data is written to AnalyticDB for PostgreSQL.

    Copy the following job code to the editor.

    ---Create a Datagen source table to generate streaming data using the Datagen connector.
    CREATE TEMPORARY TABLE datagen_source (
     id INT,
     score INT
    ) WITH (
     'connector' = 'datagen',
     'fields.id.kind'='sequence',
     'fields.id.start'='1',
     'fields.id.end'='100',
     'fields.score.kind'='random',
     'fields.score.min'='70',
     'fields.score.max'='100'
    );
    
    --Create an adbpg source table to capture data changes of the adbpg_source_table table based on slot.name and pgoutput using the adbpg-cdc connector.
    CREATE TEMPORARY TABLE source_adbpg(
     id int,
     username varchar,
     PRIMARY KEY(id) NOT ENFORCED
    ) WITH(
      'connector' = 'adbpg-cdc', 
      'hostname' = 'gp-bp16v8cgx46ns****-master.gpdb.rds.aliyuncs.com',
      'port' = '5432',
      'username' = 'account****',
      'password' = 'password****',
      'database-name' = 'testdb',
      'schema-name' = 'testschema',
      'table-name' = 'adbpg_source_table',
      'slot.name' = 'flink',
      'decoding.plugin.name' = 'pgoutput'
    );
    
    --Create an adbpg sink table to write the processed results to the destination table adbpg_sink_table in the database.
    CREATE TEMPORARY TABLE sink_adbpg (
      id int,
      username varchar,
      score int
    ) WITH (
      'connector' = 'adbpg', 
      'url' = 'jdbc:postgresql://gp-bp16v8cgx46ns****-master.gpdb.rds.aliyuncs.com:5432/testdb',
      'tablename' = 'testschema.adbpg_sink_table',  
      'username' = 'account****',
      'password' = 'password****',
      'maxRetryTimes' = '2',
      'batchsize' = '5000',
      'conflictMode' = 'ignore',
      'writeMode' = 'insert',
      'retryWaitTime' = '200'
    );
    
    -- Write the join results of the datagen_source and source_adbpg tables to the adbpg sink table.
    INSERT INTO sink_adbpg
    SELECT ts.id,ts.username,ds.score FROM datagen_source AS ds
    JOIN source_adbpg AS ts ON ds.id = ts.id;

    Parameters

    Parameter

    Required

    Data type

    Description

    connector

    Yes

    STRING

    The connector type. Set the value to adbpg-cdc for the source table and adbpg for the sink table.

    hostname

    Yes

    STRING

    The internal endpoint of the AnalyticDB for PostgreSQL instance. You can obtain the internal endpoint on the Basic Information page of the instance.

    username

    Yes

    STRING

    The database account and password of the AnalyticDB for PostgreSQL instance.

    password

    Yes

    STRING

    database-name

    Yes

    STRING

    The name of the database.

    schema-name

    Yes

    STRING

    The name of the schema. This parameter supports regular expressions. You can subscribe to multiple schemas at a time.

    table-name

    Yes

    STRING

    The name of the table. This parameter supports regular expressions. You can subscribe to multiple tables at a time.

    port

    Yes

    INTEGER

    The port of AnalyticDB for PostgreSQL. The value is fixed at 5432.

    decoding.plugin.name

    Yes

    STRING

    The name of the PostgreSQL logical decoding plug-in. The value is fixed at pgoutput.

    slot.name

    Yes

    STRING

    The name of the logical decoding slot.

    • For source tables in the same Flink job, use the same value for slot.name.

    • If different Flink jobs involve the same table, set a unique slot.name for each job. This prevents the following error: PSQLException: ERROR: replication slot "debezium" is active for PID 974.

    debezium.*

    No

    STRING

    Controls the behavior of the Debezium client at a finer granularity. For example, setting 'debezium.snapshot.mode' = 'never' disables the snapshot feature. For more information, see the configuration properties.

    scan.incremental.snapshot.enabled

    No

    BOOLEAN

    Specifies whether to enable incremental snapshots. Valid values:

    • false (default): Incremental snapshots are disabled.

    • true: Incremental snapshots are enabled.

    scan.startup.mode

    No

    STRING

    The startup mode for data consumption. Valid values:

    • initial (default): When the job starts for the first time, it scans all historical data and then reads the latest write-ahead logging (WAL) data. This provides a seamless transition between full and incremental data.

    • latest-offset: When the job starts for the first time, it does not scan historical data. It starts reading from the end of the WAL, which is the latest log position. It captures only the data changes that occur after the connector starts.

    • snapshot: Scans all historical data and reads the new WAL entries generated during the full scan. The job stops after the full scan is complete.

    changelog-mode

    No

    STRING

    The changelog mode for encoding stream changes. Valid values:

    • ALL (default): Supports all operation types, including INSERT, DELETE, UPDATE_BEFORE, and UPDATE_AFTER.

    • UPSERT: Supports only UPSERT operations, including INSERT, DELETE, and UPDATE_AFTER.

    heartbeat.interval.ms

    No

    DURATION

    The interval for sending heartbeat packets. The default value is 30 seconds. The unit is milliseconds.

    The AnalyticDB for PostgreSQL CDC connector sends heartbeat packets to the database to ensure that the slot offset continuously advances. If table data does not change frequently, set this parameter to a reasonable value to promptly clean up WAL logs and avoid wasting disk space.

    scan.incremental.snapshot.chunk.key-column

    No

    STRING

    Specifies a column to use for chunking during the snapshot phase. By default, the first column of the primary key is selected.

    url

    Yes

    STRING

    The format is jdbc:postgresql://<Address>:<PortId>/<DatabaseName>.

  2. In the upper-right corner of the SQL Editor page, click Validate to perform a syntax check.

  3. Click Deploy, and then click Confirm.

  4. In the upper-right corner, click Operations. On the Deployments page, click Start.

Step 4: View the data written by Flink

  1. Execute the following statements in the test database to view the data written by Flink.

    SELECT * FROM testschema.adbpg_sink_table;
    SELECT COUNT(*) FROM testschema.adbpg_sink_table; 
  2. Insert 50 more rows of data into the source table. Then, check the total number of incremental data rows that Flink writes to the sink table.

    -- Insert 50 rows of incremental data into the source table.
    INSERT INTO testschema.adbpg_source_table(id, username)
    SELECT i, 'username'||i::text
    FROM generate_series(51, 100) AS t(i);
    
    -- Check the new data in the destination table.
    SELECT COUNT(*) FROM testschema.adbpg_sink_table where id > 50;

    The result is shown below.

     count 
    -------
        50
    (1 row)

Usage notes

  • Manage Replication Slots promptly to avoid wasting disk space.

    To prevent data loss caused by the cleanup of WAL logs that correspond to a checkpoint during a Flink job restart, Flink does not automatically delete Replication Slots. Therefore, if you confirm that a Flink job no longer needs to be restarted, you must manually delete its corresponding Replication Slot to release the resources it occupies. In addition, if the confirmed position of a Replication Slot is not advanced for a long time, AnalyticDB for PostgreSQL cannot clean up the WAL entries after that position. This may cause unused WAL data to accumulate and consume a large amount of disk space.

  • During the normal operation of an AnalyticDB for PostgreSQL instance, exactly-once data processing semantics are guaranteed. However, in a failure scenario, only at-least-once semantics are supported.

  • The CDC connector changes the REPLICA IDENTITY parameter of the subscribed table to FULL to ensure data synchronization consistency. This change has the following effects:

    • Increased disk space usage. In scenarios with frequent update or delete operations, this setting increases the size of WAL logs, which results in increased disk space usage.

    • Degraded write performance. In scenarios with high-concurrency writes, performance may be significantly affected.

    • Increased checkpoint pressure. Larger WAL logs mean that checkpoints need to process more data, which may extend the time required for checkpoints.

Best practices

Flink CDC supports job development using the Flink SQL API or DataStream API. You can use Flink CDC to implement integrated full and incremental data synchronization for single or multiple tables in a source database. You can also perform computations such as table joins on disparate data sources. The Flink framework ensures exactly-once event processing semantics throughout the data processing procedure. However, Flink CDC is not suitable for synchronizing an entire PostgreSQL-compatible database. This is because it does not support DDL synchronization, and you must define the structure of each table in Flink SQL, which makes maintenance complex.

This section uses an example of synchronizing data from AnalyticDB for PostgreSQL to Kafka to describe the best practices for Flink CDC SQL job development. Before you develop the Flink CDC job, make sure that you have prepared and configured the resources as described in the Prerequisites section.

Step 1: Prepare test tables

Create two source tables in the AnalyticDB for PostgreSQL instance.

CREATE TABLE products (
    product_id SERIAL PRIMARY KEY,
    product_name VARCHAR(200) NOT NULL,
    sku CHAR(12) NOT NULL,
    description TEXT,
    price NUMERIC(10,2) NOT NULL,
    discount_price DECIMAL(10,2),
    stock_quantity INTEGER DEFAULT 0,
    weight REAL,
    volume DOUBLE PRECISION,
    dimensions BOX,
    release_date DATE,
    is_featured BOOLEAN DEFAULT FALSE,
    rating FLOAT,
    warranty_period INTERVAL,
    metadata JSON,
    tags TEXT[]
);

CREATE TABLE documents (
    document_id UUID PRIMARY KEY,
    title VARCHAR(200) NOT NULL,
    content TEXT,
    summary TEXT,
    publication_date TIMESTAMP WITHOUT TIME ZONE,
    last_updated TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
    author_id BIGINT,
    file_data BYTEA,
    xml_content XML,
    json_metadata JSON,
    reading_time INTERVAL,
    is_public BOOLEAN DEFAULT TRUE,
    views_count INTEGER DEFAULT 0,
    category VARCHAR(50),
    tags TEXT[]
);

Step 2: Prepare Kafka resources

  1. Purchase and deploy a Kafka instance.

  2. Add the CIDR block of the Flink workspace to the whitelist of the Kafka instance.

  3. Create resources in the Kafka instance.

Step 3: Create a Flink job

  1. Log on to the Realtime Compute for Apache Flink console. On the Fully Managed Flink tab, find the workspace that you want to manage and click Console in the Actions column.

  2. In the navigation pane on the left, choose Development > ETL.

  3. In the top menu bar, click image. Select New Blank Stream Draft, and set the parameters below.

    Parameter

    Description

    Example

    Name

    The name of the draft that you want to create.

    Note

    The draft name must be unique in the current project.

    adbpg-test

    Engine Version

    The engine version of Flink that is used by the draft. For more information about engine versions, version mappings, and important time points in the lifecycle of each version, see Engine version.

    vvr-6.0.7-flink-1.15

  4. Click Create.

Step 4: Write the job code and deploy the job

  1. Write an SQL job in the Flink workspace. Copy the following job code to the editor and replace the configurations with your actual values.

    -- Use one source to capture data from multiple tables
    CREATE TEMPORARY TABLE ADBPGSource(
        table_name STRING METADATA FROM 'table_name' VIRTUAL,
        row_kind STRING METADATA FROM 'row_kind' VIRTUAL,
        product_id BIGINT,
        product_name STRING,
        sku STRING,
        description STRING,
        price STRING,
        discount_price STRING,
        stock_quantity INT,
        weight STRING,
        volume STRING,
        dimensions STRING,
        release_date STRING,
        is_featured BOOLEAN,
        rating FLOAT,
        warranty_period STRING,
        metadata STRING,
        tags STRING,
        document_id STRING,
        title STRING,
        content STRING,
        summary STRING,
        publication_date STRING,
        last_updated STRING,
        author_id BIGINT,
        file_data STRING,
        xml_content STRING,
        json_metadata STRING,
        reading_time STRING,
        is_public BOOLEAN,
        views_count INT,
        category STRING
    ) WITH (
      'connector' = 'adbpg-cdc',
      'hostname' = 'gp-2zev887z58390***-master.gpdb.rds.aliyuncs.com',
      'port' = '5432',
      'username' = 'account****',
      'password' = 'password****',
      'database-name' = 'testdb',
      'schema-name' = 'public',
      'table-name' = '(products|documents)',
      'slot.name' = 'flink',
      'decoding.plugin.name' = 'pgoutput',
      'debezium.snapshot.mode' = 'never'
    );
    
    CREATE TEMPORARY TABLE KafkaProducts (
        product_id BIGINT,
        product_name STRING,
        sku STRING,
        description STRING,
        price STRING,
        discount_price STRING,
        stock_quantity INT,
        weight STRING,
        volume STRING,
        dimensions STRING,
        release_date STRING,
        is_featured BOOLEAN,
        rating FLOAT,
        warranty_period STRING,
        metadata STRING,
        tags STRING,
        PRIMARY KEY(product_id) NOT ENFORCED
    ) WITH (
      'connector' = 'upsert-kafka',
      'topic' = '****',
      'properties.bootstrap.servers' = 'alikafka-post-cn-****-1-vpc.alikafka.aliyuncs.com:9092',
      'key.format'='avro',
      'value.format'='avro'
    );
    
    CREATE TEMPORARY TABLE KafkaDocuments (
        document_id STRING,
        title STRING,
        content STRING,
        summary STRING,
        publication_date STRING,
        last_updated STRING,
        author_id BIGINT,
        file_data STRING,
        xml_content STRING,
        json_metadata STRING,
        reading_time STRING,
        is_public BOOLEAN,
        views_count INT,
        category STRING,
        tags STRING,
        PRIMARY KEY(document_id) NOT ENFORCED
    ) WITH (
      'connector' = 'upsert-kafka',
      'topic' = '****',
      'properties.bootstrap.servers' = 'alikafka-post-cn-****-1-vpc.alikafka.aliyuncs.com:9092',
      'key.format'='avro',
      'value.format'='avro'
    );
    
    -- Use a STATEMENT SET to wrap multiple statements
    BEGIN STATEMENT SET;
    -- Use the table_name METADATA to route data to the destination table
    INSERT INTO KafkaProducts
    SELECT product_id,product_name,sku,description,price,discount_price,stock_quantity,weight,volume,dimensions,release_date,is_featured,rating,warranty_period,metadata,tags
    FROM ADBPGSource
    WHERE table_name = 'products';
    
    INSERT INTO KafkaDocuments
    SELECT document_id,title,content,summary,publication_date,last_updated,author_id,file_data,xml_content,json_metadata,reading_time,is_public,views_count,category,tags
    FROM ADBPGSource
    WHERE table_name = 'documents';
    
    END;

    Note the following points about this SQL job:

    • For multi-table synchronization tasks, we recommend that you use one source table to capture data from multiple tables, as shown in this SQL example. You must define all columns from all source tables in this source table. If column names are duplicated, keep only one. When writing to the destination table, use the METADATA table_name to route data to the specified table. This approach requires creating only one Replication Slot in AnalyticDB for PostgreSQL. This reduces the resource usage of the source database, improves synchronization performance, and simplifies future maintenance.

    • Use the table-name parameter to specify multiple source tables. Enclose the table names in parentheses and separate them with vertical bars (|), for example, (table1|table2|table3).

    • Setting debezium.snapshot.mode to never means that only incremental data from the source table is synchronized. To synchronize both full and incremental data, change the setting to initial.

  2. In the upper-right corner of the SQL Editor page, click Validate to perform a syntax check.

  3. Click Deploy, and then click OK.

  4. In the upper-right corner, click Operations. On the Deployments page, click Start.

Step 5: Insert test data

In the AnalyticDB for PostgreSQL instance, update the data in the two source tables and observe the message changes in the Kafka topic.

You can use the following SQL statement to insert test data:

INSERT INTO products (
    product_name, sku, description, price, discount_price, stock_quantity, weight, volume, dimensions, release_date, is_featured, rating, warranty_period, metadata, tags
) VALUES (
    'Test Product', 'Test-2025', 'A piece of test product data', 299.99, 279.99, 150, 50.5, 120.75, '(10,20),(30,40)', '2023-05-01', TRUE, 4.8, INTERVAL '1 year', '{"brand": "TechCo", "model": "X1"}', '{"Test1", "Test2"}'
);

References

For more information about subscribing to the full data of AnalyticDB for PostgreSQL, see Read and write full data in real time using Flink.