All Products
Search
Document Center

Realtime Compute for Apache Flink:PostgreSQL CDC (public preview)

Last Updated:Dec 03, 2025

The PostgreSQL change data capture (CDC) connector reads full snapshots and change data from a PostgreSQL database. The connector ensures that each data record is read exactly once and maintains exactly-once semantics during failure recovery. This topic describes how to use the PostgreSQL CDC connector.

Background information

The Postgres CDC connector has the following capabilities.

Item

Description

Supported types

Source table

Note

You can use the JDBC connector to create a sink or dimension table.

Running mode

Streaming mode only

Data format

Not applicable

Specific monitoring metrics

  • currentFetchEventTimeLag: The interval from when data is generated to when it is pulled to the Source operator.

  • currentEmitEventTimeLag: The interval from when data is generated to when it leaves the Source operator.

  • sourceIdleTime: The duration for which the source has not generated new data.

Note
  • The currentFetchEventTimeLag and currentEmitEventTimeLag metrics are valid only in the incremental phase. The value is always 0 in the full snapshot phase.

  • For more information about the metrics, see Metric description.

API type

SQL and Data Ingestion YAML

Update or delete data in sink tables

Not applicable

Features

The PostgreSQL CDC connector uses the incremental snapshot framework, which is available in Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 8.0.6 or later. The connector first reads the full historical data and then automatically switches to reading write-ahead logging (WAL) change logs. This process ensures that no data is missed or duplicated. Even if a failure occurs, data is processed with exactly-once semantics. The PostgreSQL CDC source table supports concurrent reading of full data, lock-free reading, and resumable data transfer.

As a source table, it has the following features and advantages:

  • Unifies stream and batch processing. It supports reading full and incremental data, which eliminates the need to maintain two separate processes.

  • Supports concurrent reading of full data for horizontal performance scaling.

  • Seamlessly switches from reading full data to reading incremental data and automatically scales in to save compute resources.

  • Supports resumable data transfer during the full data reading phase for improved stability.

  • Reads full data without locks to avoid affecting online business operations.

Prerequisites

The PostgreSQL CDC connector reads CDC data streams using the logical replication feature of a PostgreSQL database. The connector supports Alibaba Cloud RDS for PostgreSQL, Amazon RDS for PostgreSQL, and self-managed PostgreSQL.

Important

The configurations for Alibaba Cloud RDS for PostgreSQL, Amazon RDS for PostgreSQL, and self-managed PostgreSQL differ. Before you begin, complete the required configurations as described in the Configure PostgreSQL document.

After you complete the configurations, ensure that the following conditions are met:

  • The value of the wal_level parameter is set to logical. This adds the information required for logical encoding to the write-ahead logging (WAL).

  • The REPLICA IDENTITY of the subscribed table is set to FULL. This ensures that INSERT and UPDATE events include the previous values of all columns in the table, which guarantees data synchronization consistency.

    Note

    REPLICA IDENTITY is a table-level setting specific to PostgreSQL. It determines whether the logical decoding plugin includes the previous values of the involved table columns during INSERT and UPDATE events. For more information about the values of REPLICA IDENTITY, see REPLICA IDENTITY.

  • The values of the max_wal_senders and max_replication_slots parameters are greater than the sum of the replication slots currently in use by the database and the number of slots required by the Flink job.

  • The account has SUPERUSER system permissions or both LOGIN and REPLICATION permissions. The account must also have SELECT permission on the subscribed tables to query full data.

Precautions

  • The incremental snapshot feature of PostgreSQL CDC is supported only in Realtime Compute for Apache Flink V8.0.6 and later.

A Flink PostgreSQL CDC job relies on a Replication Slot to ensure that the write-ahead log (WAL) is not purged prematurely, which guarantees data consistency. However, improper management can lead to issues such as wasted disk space or data read latency. Follow these recommendations:

  • Promptly purge slots that are no longer in use

    • Flink does not automatically delete a replication slot, even after a job has stopped, especially in stateless restart scenarios. This behavior prevents data loss that could occur if the WAL is purged.

    • If you confirm that a job will not be restarted, you must manually delete its associated replication slot to free up disk space.

      Important

      Lifecycle management: Treat a replication slot as part of the job's resources. Manage it in sync with job start and stop operations.

  • Avoid reusing old slots

    • A new job must use a new slot name instead of reusing an old one. Reusing a slot can cause the job to read a large volume of historical WAL data upon startup, which delays reading the latest data.

    • PostgreSQL logical replication requires that a slot can be used by only one connection. Therefore, different jobs must use different slot names.

      Important

      Naming convention: When you customize `slot.name`, avoid using names with numeric suffixes, such as `my_slot_1`, to prevent conflicts with temporary slots.

  • Slot behavior when incremental snapshot is enabled

    • Prerequisites: You must enable checkpointing, and the source table must have a primary key.

    • Slot creation rules:

      • Incremental snapshot disabled: Only a single concurrency is supported, which uses one global slot.

      • Incremental snapshot enabled:

        • Full phase: Each concurrent source subtask creates a temporary slot with a name in the format ${slot.name}_${task_id}.

        • Incremental phase: All temporary slots are automatically reclaimed. Only one global slot is retained.

    • Maximum number of slots: Source concurrency + 1 (during the full snapshot phase)

  • Resources and performance

    • If the number of slots or the amount of disk space in PostgreSQL is limited, you can reduce the concurrency of the full snapshot phase to decrease the number of temporary slots. This will slow down the speed at which full data is read.

    • If the downstream system supports idempotent writes, you can set scan.incremental.snapshot.backfill.skip = true to skip the WAL backfill during the full phase and accelerate the startup speed.

      This configuration provides only at-least-once semantics. It is not suitable for jobs with stateful computations such as aggregations or dimension table joins, because historical changes that are required for intermediate states might be lost.

  • When the incremental snapshot feature is not enabled, the PostgreSQL CDC connector does not support performing checkpoints during the full table scan phase.

    If incremental snapshot is not enabled, a job might fail over due to a checkpoint timeout if a checkpoint is triggered during the full table scan phase. Therefore, you can configure the following parameters in the Other Configurations section to prevent failovers due to checkpoint timeouts during the full synchronization phase. For more information, see How do I configure custom runtime parameters for a job?.

    execution.checkpointing.interval: 10min
    execution.checkpointing.tolerable-failed-checkpoints: 100
    restart-strategy: fixed-delay
    restart-strategy.fixed-delay.attempts: 2147483647

    The following table describes the parameters.

    Parameter

    Description

    Notes

    execution.checkpointing.interval

    The interval at which checkpoints are triggered.

    Data type: Duration. Example: 10 min or 30 s.

    execution.checkpointing.tolerable-failed-checkpoints

    The number of checkpoint failures that can be tolerated.

    The product of this parameter's value and the checkpoint scheduling interval determines the allowed snapshot reading time.

    Note

    If the table is very large, set this parameter to a larger value.

    restart-strategy

    The restart policy.

    Valid values:

    • fixed-delay: The fixed-delay restart policy.

    • failure-rate: The failure-rate restart policy.

    • exponential-delay: The exponential-delay restart policy.

    For more information, see Restart Strategies.

    restart-strategy.fixed-delay.attempts

    The maximum number of restart attempts for the fixed-delay restart policy.

    None.

SQL

Syntax

CREATE TABLE postgrescdc_source (
  shipment_id INT,
  order_id INT,
  origin STRING,
  destination STRING,
  is_arrived BOOLEAN
) WITH (
  'connector' = 'postgres-cdc',
  'hostname' = '<yourHostname>',
  'port' = '5432',
  'username' = '<yourUserName>',
  'password' = '<yourPassWord>',
  'database-name' = '<yourDatabaseName>',
  'schema-name' = '<yourSchemaName>',
  'table-name' = '<yourTableName>'
);

Parameters in the WITH clause

Parameter

Description

Data type

Required

Default value

Notes

connector

The type of the connector.

STRING

Yes

None

The value must be postgres-cdc.

hostname

The IP address or hostname of the PostgreSQL database.

STRING

Yes

None

None.

username

The username for the PostgreSQL database service.

STRING

Yes

None

None.

password

The password for the PostgreSQL database service.

STRING

Yes

None

None.

database-name

The database name.

STRING

Yes

None

The database name.

schema-name

The PostgreSQL schema name.

STRING

Yes

None

The schema name supports regular expressions to read data from multiple schemas.

table-name

The PostgreSQL table name.

STRING

Yes

None

The table name supports regular expressions to read data from multiple tables.

port

The port number of the PostgreSQL database service.

INTEGER

No

5432

None.

decoding.plugin.name

The name of the PostgreSQL logical decoding plugin.

STRING

No

decoderbufs

This is determined by the plugin installed on the PostgreSQL service. The supported plugins are as follows:

  • decoderbufs (default): Supported in PostgreSQL 9.6 and later. This plugin must be installed.

  • pgoutput (recommended): The official built-in plugin for PostgreSQL 10 and later.

slot.name

The name of the logical decoding slot.

STRING

Optional for versions earlier than 8.0.1. Required for versions 8.0.1 and later.

The default value is flink for versions earlier than 8.0.1. No default value for versions 8.0.1 and later.

Set the slot.name parameter for each table to avoid the PSQLException: ERROR: replication slot "debezium" is active for PID 974 error.

debezium.*

The Debezium property parameter.

STRING

No

None

Provides more granular control over the Debezium client behavior. For example, 'debezium.snapshot.mode' = 'never'. For more information, see Connector properties.

scan.incremental.snapshot.enabled

Specifies whether to enable incremental snapshot.

BOOLEAN

No

false

Valid values:

  • false (default): Disables incremental snapshot.

  • true: Enables incremental snapshot.

Note
  • This is an experimental feature. This parameter is supported only in Realtime Compute for Apache Flink V8.0.6 and later.

  • For more information about the feature advantages, prerequisites, and limits of incremental snapshot, see Features, Prerequisites, and Limits.

scan.startup.mode

The startup mode for data consumption.

STRING

No

initial

Valid values:

  • initial (default): Scans the full historical data first and then reads the latest WAL data during the first startup.

  • latest-offset: Does not scan the full historical data during the first startup. It starts reading from the end of the WAL, which means it reads only the latest changes made after the connector starts.

  • snapshot: Scans the full historical data, reads the new WAL data generated during the full snapshot phase, and then stops the job.

changelog-mode

The changelog mode for encoding stream changes.

String

No

all

Supported changelog modes include the following:

  • ALL: Supports all types, including INSERT, DELETE, UPDATE_BEFORE, and UPDATE_AFTER.

  • UPSERT: Supports only the upsert type, including INSERT, DELETE, and UPDATE_AFTER.

heartbeat.interval.ms

The interval for sending heartbeat packets.

Duration

No

30s

The unit is milliseconds.

The PostgreSQL CDC connector actively sends heartbeat packets to the database to advance the slot offset. When table changes are infrequent, setting this value can promptly purge WAL logs.

scan.incremental.snapshot.chunk.key-column

Specifies a column as the splitting column for sharding in the snapshot phase.

STRING

No

None

By default, the first column of the primary key is selected.

scan.incremental.close-idle-reader.enabled

Specifies whether to close idle readers after the snapshot ends.

Boolean

No

false

This parameter is valid only if you set the execution.checkpointing.checkpoints-after-tasks-finish.enabled parameter to true.

scan.incremental.snapshot.backfill.skip

Specifies whether to skip log reading in the full snapshot phase.

Boolean

No

false

Valid values:

  • true: Skips log reading.

    The incremental phase starts reading logs from the low watermark.

    If the downstream operator or storage supports idempotence, we recommend that you skip reading logs in the full snapshot phase. This reduces the number of WAL slots but provides only at-least-once semantics.

  • false: Does not skip log reading.

    During the full snapshot phase, the connector reads logs between the low and high watermarks to ensure consistency.

    If your SQL performs operations such as aggregation or joins, we recommend that you do not skip reading logs in the full snapshot phase.

Data type mappings

The following table describes the mappings between PostgreSQL and Flink data types.

PostgreSQL data type

Flink data type

SMALLINT

SMALLINT

INT2

SMALLSERIAL

SERIAL2

INTEGER

INT

SERIAL

BIGINT

BIGINT

BIGSERIAL

REAL

FLOAT

FLOAT4

FLOAT8

DOUBLE

DOUBLE PRECISION

NUMERIC(p, s)

DECIMAL(p, s)

DECIMAL(p, s)

BOOLEAN

BOOLEAN

DATE

DATE

TIME [(p)] [WITHOUT TIMEZONE]

TIME [(p)] [WITHOUT TIMEZONE]

TIMESTAMP [(p)] [WITHOUT TIMEZONE]

TIMESTAMP [(p)] [WITHOUT TIMEZONE]

CHAR(n)

STRING

CHARACTER(n)

VARCHAR(n)

CHARACTER VARYING(n)

TEXT

BYTEA

BYTES

Sample code

CREATE TABLE source (
  id INT NOT NULL,
  name STRING,
  description STRING,
  weight DECIMAL(10,3)
) WITH (
  'connector' = 'postgres-cdc',
  'hostname' = '<host name>',
  'port' = '<port>',
  'username' = '<user name>',
  'password' = '<password>',
  'database-name' = '<database name>',
  'schema-name' = '<schema name>',
  'table-name' = '<table name>'
);

SELECT * FROM source;

Data ingestion

In Realtime Compute for Apache Flink V11.4 and later, you can use the PostgreSQL connector as a data source in a Data Ingestion YAML job.

Syntax

source:
  type: postgres
  name: PostgreSQL Source
  hostname: localhost
  port: 5432
  username: pg_username
  password: pg_password
  tables: db.scm.tbl
  slot.name: test_slot
  scan.startup.mode: initial
  server-time-zone: UTC
  connect.timeout: 120s
  decoding.plugin.name: decoderbufs

sink:
  type: ...

Parameters

Parameter

Description

Required

Data type

Default value

Notes

type

The type of the data source.

Yes

STRING

None

The value must be postgres.

name

The name of the data source.

No

STRING

None

None.

hostname

The domain name or IP address of the PostgreSQL database server.

Yes

STRING

(none)

None.

port

The port exposed by the PostgreSQL database server.

No

INTEGER

5432

None.

username

The username for the PostgreSQL database.

Yes

STRING

(none)

None.

password

The password for the PostgreSQL database.

Yes

STRING

(none)

None.

tables

The names of the PostgreSQL database tables to capture.

You can use a regular expression to monitor multiple tables that match the expression.

Yes

STRING

(none)

Important

Currently, you can capture only tables in the same database.

A period (.) is used as a separator for database, schema, and table names. To use a period (.) to match any character in a regular expression, you must escape it with a backslash (\). For example: bdb.schema_\.*.order_\.*.

slot.name

The name of the PostgreSQL replication slot.

Yes

STRING

(none)

The name must follow PostgreSQL replication slot naming conventions and can contain only lowercase letters, numbers, and underscores.

decoding.plugin.name

The name of the logical decoding plug-in installed on the PostgreSQL server.

No

STRING

pgoutput

Valid values include decoderbufs and pgoutput.

tables.exclude

The names of the PostgreSQL database tables to exclude. This parameter takes effect after the tables parameter.

No

STRING

(none)

You can also use a regular expression to exclude multiple tables that match the expression. The usage is the same as the tables parameter.

server-time-zone

The session time zone of the database server, such as "Asia/Shanghai".

No

STRING

(none)

If this parameter is not set, the system default time zone (ZoneId.systemDefault()) is used to determine the server time zone.

scan.incremental.snapshot.chunk.size

The size of each chunk in the incremental snapshot framework, specified as the number of rows.

No

INTEGER

8096

When you enable incremental snapshot reading, the table is split into chunks. Data from each chunk is cached in memory before it is fully read.

A smaller chunk size results in more chunks. This improves the granularity of fault recovery but can cause out-of-memory (OOM) errors and lower overall throughput. Set a reasonable chunk size to balance these factors.

scan.snapshot.fetch.size

The maximum number of records to fetch at a time when reading the full data of a table.

No

INTEGER

1024

None.

scan.startup.mode

The startup mode for data consumption.

No

STRING

initial

Valid values:

  • initial (default): During the first startup, the connector scans the full historical data and then reads the latest WAL data.

  • latest-offset: During the first startup, the connector does not scan historical data. It starts reading from the end of the WAL, which means it only reads changes made after the connector starts.

  • committed-offset: Does not scan historical data. Starts consuming incremental data from a specified position.

  • snapshot: Consumes only full historical data and does not consume incremental data.

scan.incremental.close-idle-reader.enabled

Specifies whether to close idle readers after the snapshot phase ends.

No

BOOLEAN

false

This configuration takes effect only if you set execution.checkpointing.checkpoints-after-tasks-finish.enabled to true.

scan.lsn-commit.checkpoints-num-delay

The number of checkpoints to delay before the connector starts to commit the LSN offset.

No

INTEGER

3

The checkpoint LSN offset is committed on a rolling basis to prevent recovery failures from state.

connect.timeout

The maximum time that the connector waits to connect to the PostgreSQL database server before a timeout occurs.

No

DURATION

30s

The value cannot be less than 250 ms.

connect.max-retries

The maximum number of retries for the connector to establish a connection to the PostgreSQL database server.

No

INTEGER

3

None.

connection.pool.size

The size of the connection pool.

No

INTEGER

20

None.

jdbc.properties.*

Allows you to pass custom JDBC URL properties.

No

STRING

20

You can pass custom properties, such as 'jdbc.properties.useSSL' = 'false'.

heartbeat.interval

The interval for sending heartbeat events to track the latest available WAL offset.

No

DURATION

30s

None.

debezium.*

Passes Debezium properties to the Debezium Embedded Engine, which is used to capture data changes from the PostgreSQL server.

No

STRING

(none)

For more information about Debezium PostgreSQL connector properties, see the related documentation.

chunk-meta.group.size

The size of the chunk metadata.

No

STRING

1000

If the metadata is larger than this value, the metadata is passed in multiple parts.

metadata.list

A list of readable metadata that is passed to the downstream and can be used in the transform module.

No

STRING

false

Use a comma (,) to separate the values. The currently available metadata is: op_ts.

scan.incremental.snapshot.unbounded-chunk-first.enabled

Specifies whether to distribute the unbounded chunk first during the snapshot reading phase.

No

STRING

false

Valid values:

  • true: Distributes the unbounded chunk first during the snapshot reading phase.

  • false (default): Does not distribute the unbounded chunk first during the snapshot reading phase.

Important

This is an experimental feature. If you enable this feature, you can reduce the risk of out-of-memory (OOM) errors when a TaskManager synchronizes the last chunk in the snapshot phase. We recommend that you add this configuration before the job starts for the first time.

References