All Products
Search
Document Center

AnalyticDB:Use Flink CDC to subscribe to full and incremental data in real time (invitational preview)

Last Updated:Mar 30, 2026

AnalyticDB for PostgreSQL includes a self-developed Change Data Capture (CDC) connector that uses PostgreSQL's logical replication feature to capture full and incremental data changes in real time. The connector integrates natively with Flink, making it straightforward to build real-time data synchronization and stream processing pipelines.

This topic describes how to use Realtime Compute for Apache Flink with the CDC connector to subscribe to full and incremental data from AnalyticDB for PostgreSQL.

Limitations

  • This feature requires an AnalyticDB for PostgreSQL V7.0 instance running minor engine version 7.2.1.4 or later. Check the minor version on the Basic Information page of your instance. If your instance does not meet this requirement, update the minor version. For instructions on viewing the minor version, see View the minor engine version.

  • The serverless mode of AnalyticDB for PostgreSQL is not supported.

Prerequisites

Before you begin, ensure that you have:

Procedure

Step 1: Prepare a test table and test data

  1. Log on to the AnalyticDB for PostgreSQL console. Find the instance and click the instance ID.

  2. In the lower-right corner of the Basic Information page, click Log On to Database.

  3. Create a test database, a schema, and a source table named adbpg_source_table. Then insert 50 rows of test data.

    -- Create a test database.
    CREATE DATABASE testdb;
    -- Switch to the testdb database and create a schema.
    CREATE SCHEMA testschema;
    -- Create a source table.
    CREATE TABLE testschema.adbpg_source_table(
      id int,
      username text,
      PRIMARY KEY(id)
    );
    -- Insert 50 rows of data.
    INSERT INTO testschema.adbpg_source_table(id, username)
    SELECT i, 'username'||i::text
    FROM generate_series(1, 50) AS t(i);
  4. Create a sink table named adbpg_sink_table for Flink to write results to.

    CREATE TABLE testschema.adbpg_sink_table(
      id int,
      username text,
      score int
    );

Step 2: Create a Flink job

  1. Log on to the Realtime Compute for Apache Flink console. On the Fully Managed Flink tab, find the workspace and click Console in the Actions column.

  2. In the left navigation pane, choose Development > ETL.

  3. In the top menu bar, click image. Select New Blank Stream Draft and set the following parameters.

    Parameter Description Example
    Name The name of the draft. The name must be unique within the current project. adbpg-test
    Engine Version The Flink engine version. For details on versions and lifecycle dates, see Engine version. vvr-6.0.7-flink-1.15
  4. Click Create.

Step 3: Write the job code and deploy the job

  1. Copy the following SQL into the editor. The job creates three tables: a Datagen source that generates streaming data, a CDC source that captures changes from AnalyticDB for PostgreSQL, and a sink that writes results back to AnalyticDB for PostgreSQL.

    -- Datagen source: generates sequential IDs (1-100) with random scores (70-100).
    CREATE TEMPORARY TABLE datagen_source (
     id INT,
     score INT
    ) WITH (
     'connector' = 'datagen',
     'fields.id.kind'='sequence',
     'fields.id.start'='1',
     'fields.id.end'='100',
     'fields.score.kind'='random',
     'fields.score.min'='70',
     'fields.score.max'='100'
    );
    
    -- CDC source: captures data changes from adbpg_source_table using the adbpg-cdc connector.
    CREATE TEMPORARY TABLE source_adbpg(
     id int,
     username varchar,
     PRIMARY KEY(id) NOT ENFORCED
    ) WITH(
      'connector' = 'adbpg-cdc',
      'hostname' = 'gp-bp16v8cgx46ns****-master.gpdb.rds.aliyuncs.com',
      'port' = '5432',
      'username' = 'account****',
      'password' = 'password****',
      'database-name' = 'testdb',
      'schema-name' = 'testschema',
      'table-name' = 'adbpg_source_table',
      'slot.name' = 'flink',
      'decoding.plugin.name' = 'pgoutput'
    );
    
    -- Sink: writes the joined results to adbpg_sink_table.
    CREATE TEMPORARY TABLE sink_adbpg (
      id int,
      username varchar,
      score int
    ) WITH (
      'connector' = 'adbpg',
      'url' = 'jdbc:postgresql://gp-bp16v8cgx46ns****-master.gpdb.rds.aliyuncs.com:5432/testdb',
      'tablename' = 'testschema.adbpg_sink_table',
      'username' = 'account****',
      'password' = 'password****',
      'maxRetryTimes' = '2',
      'batchsize' = '5000',
      'conflictMode' = 'ignore',
      'writeMode' = 'insert',
      'retryWaitTime' = '200'
    );
    
    -- Join the two sources and write results to the sink.
    INSERT INTO sink_adbpg
    SELECT ts.id, ts.username, ds.score FROM datagen_source AS ds
    JOIN source_adbpg AS ts ON ds.id = ts.id;

    Replace the hostname, username, password, and url placeholders with your actual instance values. The hostname is the internal endpoint, available on the Basic Information page of the instance. For a full description of connector parameters, see Connector parameters.

  2. In the upper-right corner of the SQL Editor page, click Validate to check for syntax errors.

  3. Click Deploy, then click Confirm.

  4. In the upper-right corner, click Operations. On the Deployments page, click Start.

Step 4: View the data written by Flink

  1. Run the following queries in testdb to check the data written by the Flink job.

    SELECT * FROM testschema.adbpg_sink_table;
    SELECT COUNT(*) FROM testschema.adbpg_sink_table;
  2. Insert 50 more rows into the source table, then verify that Flink captures and writes the incremental data.

    -- Insert 50 incremental rows (IDs 51-100) into the source table.
    INSERT INTO testschema.adbpg_source_table(id, username)
    SELECT i, 'username'||i::text
    FROM generate_series(51, 100) AS t(i);
    
    -- Verify the incremental data in the sink table.
    SELECT COUNT(*) FROM testschema.adbpg_sink_table WHERE id > 50;

    The expected result:

     count
    -------
        50
    (1 row)

Connector parameters

All examples use the adbpg-cdc connector for source tables and the adbpg connector for sink tables.

Parameter Required Data type Default Description
connector Yes STRING The connector type. Set to adbpg-cdc for source tables and adbpg for sink tables.
hostname Yes STRING The internal endpoint of the AnalyticDB for PostgreSQL instance. Find it on the Basic Information page of the instance.
username Yes STRING The database account.
password Yes STRING The database password.
database-name Yes STRING The database name.
schema-name Yes STRING The schema name. Supports regular expressions to subscribe to multiple schemas.
table-name Yes STRING The table name. Supports regular expressions to subscribe to multiple tables.
port Yes INTEGER 5432 The port of AnalyticDB for PostgreSQL. Fixed at 5432.
decoding.plugin.name Yes STRING pgoutput The PostgreSQL logical decoding plugin. Fixed at pgoutput.
slot.name Yes STRING The name of the logical decoding replication slot. See Notes on slot.name.
url Yes (sink only) STRING The JDBC connection URL. Format: jdbc:postgresql://<Address>:<PortId>/<DatabaseName>.
debezium.* No STRING Fine-grained Debezium client settings. For example, 'debezium.snapshot.mode' = 'never' disables the initial snapshot. See Debezium PostgreSQL connector properties.
scan.incremental.snapshot.enabled No BOOLEAN false Set to true to enable incremental snapshots.
scan.startup.mode No STRING initial The startup mode for data consumption. Valid values: initial (scans all historical data, then reads the latest write-ahead log (WAL)); latest-offset (skips historical data, reads from the end of the WAL); snapshot (scans all historical data and new WAL entries generated during the scan, then stops).
changelog-mode No STRING ALL The changelog mode for encoding stream changes. ALL supports INSERT, DELETE, UPDATE_BEFORE, and UPDATE_AFTER. UPSERT supports only INSERT, DELETE, and UPDATE_AFTER.
heartbeat.interval.ms No DURATION 30s The interval for sending heartbeat packets to keep the replication slot offset advancing. If your source tables are infrequently updated, set a reasonable heartbeat interval to trigger WAL cleanup and avoid disk space accumulation.
scan.incremental.snapshot.chunk.key-column No STRING First primary key column The column used for chunking during the snapshot phase.

Notes on slot.name

Replication slot conflicts

For source tables in the same Flink job, use the same slot.name value. If different Flink jobs subscribe to the same table, assign a unique slot.name to each job. Reusing the same slot name across jobs causes the following error:

PSQLException: ERROR: replication slot "debezium" is active for PID 974

Replication slot cleanup

Flink does not automatically delete replication slots when a job stops. This is by design: if a job restarts, the slot preserves the WAL offset so no data is lost. However, if a job is permanently retired, its replication slot continues to hold WAL entries, preventing cleanup and consuming disk space. Delete the replication slot manually once you confirm a job will no longer restart.

Usage notes

Processing semantics

During normal operation, the CDC connector provides exactly-once semantics. In failure scenarios, only at-least-once semantics are guaranteed.

REPLICA IDENTITY and its effects

PostgreSQL's REPLICA IDENTITY setting controls how much information is included in WAL entries for UPDATE and DELETE events. There are four possible values:

  • DEFAULT: WAL entries contain only the previous values of primary key columns.

  • NOTHING: WAL entries for UPDATE and DELETE contain no column information.

  • FULL: WAL entries contain the previous values of all columns.

  • INDEX index-name: WAL entries contain the previous values of the columns in the specified index.

The CDC connector automatically sets REPLICA IDENTITY to FULL on every subscribed table. This is required to capture the complete before-image of rows for UPDATE and DELETE events, which is necessary for accurate change data capture. Be aware of the following side effects:

  • Increased disk usage: Frequent UPDATE or DELETE operations produce larger WAL entries.

  • Reduced write throughput: High-concurrency write workloads may experience a performance impact.

  • Higher checkpoint pressure: Larger WAL logs mean checkpoints process more data, which can extend checkpoint duration.

Best practices

Flink CDC supports job development using the Flink SQL API or the DataStream API. It works well for synchronizing individual tables or a defined set of tables, and for stream computations such as joins across different data sources. The Flink framework provides exactly-once event processing semantics throughout the data pipeline.

Flink CDC is not suitable for synchronizing an entire PostgreSQL-compatible database. It does not support DDL synchronization, and you must define the schema of each table in Flink SQL, which makes maintenance complex at scale.

The following example demonstrates best practices for a Flink CDC SQL job that synchronizes data from AnalyticDB for PostgreSQL to ApsaraMQ for Kafka. Complete the prerequisites before proceeding.

Step 1: Prepare test tables

Create two source tables in the AnalyticDB for PostgreSQL instance.

CREATE TABLE products (
    product_id SERIAL PRIMARY KEY,
    product_name VARCHAR(200) NOT NULL,
    sku CHAR(12) NOT NULL,
    description TEXT,
    price NUMERIC(10,2) NOT NULL,
    discount_price DECIMAL(10,2),
    stock_quantity INTEGER DEFAULT 0,
    weight REAL,
    volume DOUBLE PRECISION,
    dimensions BOX,
    release_date DATE,
    is_featured BOOLEAN DEFAULT FALSE,
    rating FLOAT,
    warranty_period INTERVAL,
    metadata JSON,
    tags TEXT[]
);

CREATE TABLE documents (
    document_id UUID PRIMARY KEY,
    title VARCHAR(200) NOT NULL,
    content TEXT,
    summary TEXT,
    publication_date TIMESTAMP WITHOUT TIME ZONE,
    last_updated TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
    author_id BIGINT,
    file_data BYTEA,
    xml_content XML,
    json_metadata JSON,
    reading_time INTERVAL,
    is_public BOOLEAN DEFAULT TRUE,
    views_count INTEGER DEFAULT 0,
    category VARCHAR(50),
    tags TEXT[]
);

Step 2: Prepare ApsaraMQ for Kafka resources

  1. Purchase and deploy an ApsaraMQ for Kafka instance.

  2. Add the CIDR block of the Flink workspace to the whitelist of the ApsaraMQ for Kafka instance.

  3. Create the required resources in the ApsaraMQ for Kafka instance.

Step 3: Create a Flink job

  1. Log on to the Realtime Compute for Apache Flink console. On the Fully Managed Flink tab, find the workspace and click Console in the Actions column.

  2. In the left navigation pane, choose Development > ETL.

  3. In the top menu bar, click image. Select New Blank Stream Draft and set the parameters.

    Parameter Description Example
    Name The name of the draft. The name must be unique within the current project. adbpg-test
    Engine Version The Flink engine version. For details on versions and lifecycle dates, see Engine version. vvr-6.0.7-flink-1.15
  4. Click Create.

Step 4: Write the job code and deploy the job

  1. Copy the following SQL into the editor and replace the placeholder values with your actual configuration.

    • Use one source for multiple tables. For multi-table synchronization, define a single adbpg-cdc source that covers all source tables. List all columns from all tables in this source definition, keeping only one copy of any shared column names. This creates only one replication slot in AnalyticDB for PostgreSQL, reducing load on the source database and simplifying maintenance.

    • Specify multiple tables with the `table-name` parameter. Enclose table names in parentheses and separate them with |, for example: 'table-name' = '(table1|table2|table3)'.

    • Control full vs. incremental sync with `debezium.snapshot.mode`. Setting it to never captures only incremental changes. Change it to initial to capture both the full dataset and subsequent changes.

    • Use `STATEMENT SET` for multiple INSERT statements. Wrap all INSERT INTO statements in a BEGIN STATEMENT SET; ... END; block when writing to more than one sink table.

    -- One source captures changes from multiple tables simultaneously.
    -- Define all columns from all source tables here; keep only one if column names overlap.
    CREATE TEMPORARY TABLE ADBPGSource(
        table_name STRING METADATA FROM 'table_name' VIRTUAL,
        row_kind STRING METADATA FROM 'row_kind' VIRTUAL,
        product_id BIGINT,
        product_name STRING,
        sku STRING,
        description STRING,
        price STRING,
        discount_price STRING,
        stock_quantity INT,
        weight STRING,
        volume STRING,
        dimensions STRING,
        release_date STRING,
        is_featured BOOLEAN,
        rating FLOAT,
        warranty_period STRING,
        metadata STRING,
        tags STRING,
        document_id STRING,
        title STRING,
        content STRING,
        summary STRING,
        publication_date STRING,
        last_updated STRING,
        author_id BIGINT,
        file_data STRING,
        xml_content STRING,
        json_metadata STRING,
        reading_time STRING,
        is_public BOOLEAN,
        views_count INT,
        category STRING
    ) WITH (
      'connector' = 'adbpg-cdc',
      'hostname' = 'gp-2zev887z58390***-master.gpdb.rds.aliyuncs.com',
      'port' = '5432',
      'username' = 'account****',
      'password' = 'password****',
      'database-name' = 'testdb',
      'schema-name' = 'public',
      'table-name' = '(products|documents)',
      'slot.name' = 'flink',
      'decoding.plugin.name' = 'pgoutput',
      'debezium.snapshot.mode' = 'never'
    );
    
    CREATE TEMPORARY TABLE KafkaProducts (
        product_id BIGINT,
        product_name STRING,
        sku STRING,
        description STRING,
        price STRING,
        discount_price STRING,
        stock_quantity INT,
        weight STRING,
        volume STRING,
        dimensions STRING,
        release_date STRING,
        is_featured BOOLEAN,
        rating FLOAT,
        warranty_period STRING,
        metadata STRING,
        tags STRING,
        PRIMARY KEY(product_id) NOT ENFORCED
    ) WITH (
      'connector' = 'upsert-kafka',
      'topic' = '****',
      'properties.bootstrap.servers' = 'alikafka-post-cn-****-1-vpc.alikafka.aliyuncs.com:9092',
      'key.format'='avro',
      'value.format'='avro'
    );
    
    CREATE TEMPORARY TABLE KafkaDocuments (
        document_id STRING,
        title STRING,
        content STRING,
        summary STRING,
        publication_date STRING,
        last_updated STRING,
        author_id BIGINT,
        file_data STRING,
        xml_content STRING,
        json_metadata STRING,
        reading_time STRING,
        is_public BOOLEAN,
        views_count INT,
        category STRING,
        tags STRING,
        PRIMARY KEY(document_id) NOT ENFORCED
    ) WITH (
      'connector' = 'upsert-kafka',
      'topic' = '****',
      'properties.bootstrap.servers' = 'alikafka-post-cn-****-1-vpc.alikafka.aliyuncs.com:9092',
      'key.format'='avro',
      'value.format'='avro'
    );
    
    -- Use a STATEMENT SET to route data from one source to multiple sink tables.
    BEGIN STATEMENT SET;
    -- Use the table_name METADATA column to route each row to the correct sink.
    INSERT INTO KafkaProducts
    SELECT product_id, product_name, sku, description, price, discount_price, stock_quantity, weight, volume, dimensions, release_date, is_featured, rating, warranty_period, metadata, tags
    FROM ADBPGSource
    WHERE table_name = 'products';
    
    INSERT INTO KafkaDocuments
    SELECT document_id, title, content, summary, publication_date, last_updated, author_id, file_data, xml_content, json_metadata, reading_time, is_public, views_count, category, tags
    FROM ADBPGSource
    WHERE table_name = 'documents';
    
    END;

    Note the following about this SQL job:

  2. In the upper-right corner of the SQL Editor page, click Validate to check for syntax errors.

  3. Click Deploy, then click OK.

  4. In the upper-right corner, click Operations. On the Deployments page, click Start.

Step 5: Insert test data and verify

In the AnalyticDB for PostgreSQL instance, insert data into the source tables, then observe the message changes in the ApsaraMQ for Kafka topics.

INSERT INTO products (
    product_name, sku, description, price, discount_price, stock_quantity, weight, volume, dimensions, release_date, is_featured, rating, warranty_period, metadata, tags
) VALUES (
    'Test Product', 'Test-2025', 'A piece of test product data', 299.99, 279.99, 150, 50.5, 120.75, '(10,20),(30,40)', '2023-05-01', TRUE, 4.8, INTERVAL '1 year', '{"brand": "TechCo", "model": "X1"}', '{"Test1", "Test2"}'
);

What's next

For instructions on using Flink to read and write full datasets from AnalyticDB for PostgreSQL, see Read and write full data in real time using Flink.