All Products
Search
Document Center

Realtime Compute for Apache Flink:PostgreSQL CDC

Last Updated:Jun 02, 2026

The Postgres CDC connector reads a full snapshot of a PostgreSQL database and then captures change data, with exactly-once processing semantics.

Overview

The Postgres CDC connector supports the following capabilities:

Category

Details

Supported types

SQL source, Flink CDC source

Note

Use the JDBC connector for sink tables and lookup (dimension) tables.

Execution mode

Streaming

Data format

Not applicable

Metrics

Monitoring metrics

  • currentFetchEventTimeLag: The interval from when data is generated to when it is pulled by the source operator.

  • currentEmitEventTimeLag: The interval from when data is generated to when it leaves the source operator.

  • sourceIdleTime: The duration for which the source has not produced new data.

Note
  • The currentFetchEventTimeLag and currentEmitEventTimeLag metrics are valid only in the incremental phase. In the snapshot phase, their value is always 0.

  • For more information about the metrics, see Metric description.

API types

SQL and Flink CDC

Sink update/delete

Not applicable

Features

Starting from VVR 8.0.6, the Postgres CDC connector integrates with the incremental snapshot framework. It reads full historical data, then automatically switches to reading change logs from the WAL, with exactly-once semantics.

Key features:

  • Unified stream and batch processing. Reads both full and incremental data in a single job.

  • Concurrent snapshot reads. Scales horizontally for faster performance.

  • Seamless full-to-incremental switching. Automatically scales in to reduce resource usage.

  • Resumable reads. Resumes from breakpoints during the snapshot phase for improved stability.

  • Lock-free reading. No locks required, avoiding impact on online operations.

Prerequisites

The Postgres CDC connector reads CDC streams through PostgreSQL logical replication. It supports ApsaraDB RDS for PostgreSQL, Amazon RDS for PostgreSQL, and self-managed PostgreSQL.

Important

Configuration differs by deployment type. Configure Postgres.

After configuration, verify the following:

  • wal_level is set to logical to enable logical decoding.

  • REPLICA IDENTITY of each subscribed table is set to FULL, so that INSERT and UPDATE events include previous column values for data consistency.

    Note

    REPLICA IDENTITY is a PostgreSQL table-level setting that controls whether INSERT and UPDATE events include previous column values. REPLICA IDENTITY.

  • max_wal_senders and max_replication_slots values exceed the sum of slots in use and slots required by the Flink job.

  • The account has SUPERUSER privileges, or both LOGIN and REPLICATION permissions, plus SELECT on the subscribed tables.

  • If your Postgres table contains generated columns, set the publish_generated_columns parameter to stored when creating the slot. Otherwise, the snapshot and incremental phase schemas may differ.

Usage notes

The incremental snapshot feature requires VVR 8.0.6 or later.

Replication slots

Flink PostgreSQL CDC jobs use replication slots to prevent premature WAL purging and ensure data consistency. Poorly managed slots can cause excessive disk usage or read delays. Best practices:

  • Clean up unused slots promptly

    • Flink does not automatically delete replication slots after a job stops or restarts statelessly, to prevent WAL data loss.

    • If a job will not restart, manually delete its replication slot to free disk space.

      Note

      Lifecycle management: Treat replication slots as job-level resources and manage them alongside job starts and stops.

  • Avoid reusing old slots

    • Always use a new slot name. Reusing an old slot forces the job to read accumulated historical WAL data at startup, delaying new data processing.

    • PostgreSQL requires one slot per connection. Each job must use a unique slot name.

      Note

      Naming convention: When you customize slot.name, avoid names with numeric suffixes such as my_slot_1, to prevent conflicts with temporary slots.

  • Slot behavior with incremental snapshots enabled

    • Prerequisites: Checkpoints must be enabled, and the source table must have a primary key defined.

    • Slot creation rules:

      • Incremental snapshot disabled: Only a parallelism of 1 is supported. One global slot is used.

      • Incremental snapshot enabled:

        • Snapshot phase: Each concurrent source subtask creates a temporary slot. The naming format is ${slot.name}_${task_id}.

        • Incremental phase: All temporary slots are automatically reclaimed. Only one global slot is retained.

    • Maximum number of slots: Source parallelism + 1 (during the snapshot phase)

  • Resources and performance

    • If available slots or disk space is limited, reduce snapshot parallelism to use fewer temporary slots. This reduces snapshot read speed.

    • If the downstream sink supports idempotent writes, set scan.incremental.snapshot.backfill.skip = true to skip WAL backfill during the snapshot phase and accelerate startup.

      This provides only at-least-once semantics and is not suitable for stateful computations (aggregations or lookup joins) because required historical changes may be lost.

  • When incremental snapshots are disabled, checkpoints are not supported during the snapshot phase.

    Configuration to avoid timeouts during the snapshot phase

    When incremental snapshots are disabled, checkpoints during the snapshot phase may cause timeout failovers. Configure these parameters in Other Configuration (Configure custom running parameters):

    execution.checkpointing.interval: 10min
    execution.checkpointing.tolerable-failed-checkpoints: 100
    restart-strategy: fixed-delay
    restart-strategy.fixed-delay.attempts: 2147483647

    Parameters:

    Parameter

    Description

    Remarks

    execution.checkpointing.interval

    The interval between checkpoints.

    The unit is a duration value, such as 10min or 30s.

    execution.checkpointing.tolerable-failed-checkpoints

    The number of checkpoint failures that can be tolerated before the job fails.

    The product of this parameter and the checkpoint scheduling interval is the allowed snapshot read time.

    Note

    If the table is very large, set this parameter to a larger value.

    restart-strategy

    The job restart strategy.

    Valid values:

    • fixed-delay: Fixed-delay restart strategy.

    • failure-rate: Failure-rate restart strategy.

    • exponential-delay: Exponential-delay restart strategy.

    Restart Strategies.

    restart-strategy.fixed-delay.attempts

    Max restart attempts for the fixed-delay restart strategy.

Reuse Postgres subscription

The Postgres CDC connector relies on a publication to determine which tables' changes are pushed to a slot. If multiple jobs share the same publication, their configurations will be overwritten.

Cause

The default publication.autocreate.mode is filtered, which includes only tables in the connector configuration. This modifies the publication on job startup, which can affect other jobs.

Solution

  1. Create a publication in PostgreSQL that includes all monitored tables, or create a separate publication per job.

    -- Create a publication named my_flink_pub that includes all tables (or specified tables, creating one publication per job)
    CREATE PUBLICATION my_flink_pub FOR TABLE table_a, table_b;
    -- Or more simply, include all tables in the database
    CREATE PUBLICATION my_flink_pub FOR ALL TABLES;
    Note

    Subscribing to all tables is not recommended for large databases due to excessive bandwidth and CPU usage on the Flink cluster.

  2. Add the following Flink configurations:

    • debezium.publication.name = 'my_flink_pub' (Specifies the publication name)

    • debezium.publication.autocreate.mode = 'disabled' (Prevents Flink from trying to create or modify the publication on startup)

This provides complete isolation and prevents new jobs from affecting existing ones.

SQL

Syntax

CREATE TABLE postgrescdc_source (
  id INT NOT NULL,
  name STRING,
  description STRING,
  weight DECIMAL(10,3)
) WITH (
  'connector' = 'postgres-cdc',
  'hostname' = '<host name>',
  'port' = '<port>',
  'username' = '<user name>',
  'password' = '<password>',
  'database-name' = '<database name>',
  'schema-name' = '<schema name>',
  'table-name' = '<table name>',
  'decoding.plugin.name'= 'pgoutput',
  'scan.incremental.snapshot.enabled' = 'true',
  -- Skipping backfill can speed up reads and reduce resource usage, but it may cause data duplication. Enable this if the downstream sink is idempotent.
  'scan.incremental.snapshot.backfill.skip' = 'false',
  -- In a production environment, set this to 'filtered' or 'disabled' and manage the publication manually instead of through Flink.
  'debezium-publication.autocreate.mode' = 'disabled'
  -- If you have multiple sources, configure a different publication for each source.
  --'debezium.publication.name' = 'my_flink_pub'
);

Connector options

Option

Description

Data type

Required

Default

Remarks

connector

The connector name.

STRING

Yes

The value must be postgres-cdc.

hostname

The IP address or hostname of the PostgreSQL database.

STRING

Yes

username

The username for the PostgreSQL database service.

STRING

Yes

password

The password for the PostgreSQL database service.

STRING

Yes

database-name

The PostgreSQL database name.

STRING

Yes

-

schema-name

The PostgreSQL schema name. Regex supported.

STRING

Yes

-

table-name

The PostgreSQL table name. Regex supported.

STRING

Yes

The table name supports regular expressions to read data from multiple tables.

port

The port number.

INTEGER

No

5432

decoding.plugin.name

The name of the PostgreSQL logical decoding plugin.

STRING

No

decoderbufs

This is determined by the plugin installed on the PostgreSQL service. The supported plugins are:

  • decoderbufs: Supported on PostgreSQL 9.6 and later. This plugin must be installed.

  • pgoutput (recommended): The official built-in plugin for PostgreSQL 10 and later.

slot.name

The logical decoding slot name.

STRING

Required for VVR 8.0.1 and later. Optional for earlier versions.

flink (Earlier than 8.0.1)

Set a unique slot.name for each table to avoid the PSQLException: ERROR: replication slot "debezium" is active for PID 974 error. Replication slots.

No default value for VVR 8.0.1+.

debezium.*

Debezium properties and parameters

STRING

No

Provides finer-grained control over the Debezium client's behavior. For example, 'debezium.snapshot.mode' = 'never'. Configuration properties.

scan.incremental.snapshot.enabled

Specifies whether to enable incremental snapshots.

BOOLEAN

No

false

Note

scan.startup.mode

The startup mode for data consumption.

STRING

No

initial

Valid values:

  • initial: Scans the full historical data on the first startup, and then reads the latest WAL data.

  • latest-offset: Does not scan the full historical data on the first startup. It starts reading from the end of the WAL, which means it only reads the latest changes made after the connector starts.

  • snapshot: Scans the full historical data, reads the new WAL data generated during the snapshot phase, and then the job stops.

changelog-mode

The changelog mode for encoding stream changes.

String

No

all

Supported changelog modes:

  • ALL: Supports all types, including INSERT, DELETE, UPDATE_BEFORE, and UPDATE_AFTER.

  • UPSERT: Supports only the upsert type, which includes INSERT, DELETE, and UPDATE_AFTER.

heartbeat.interval.ms

The interval for sending heartbeat packets.

Duration

No

30s

The unit is milliseconds.

The Postgres CDC connector actively sends heartbeats to the database to advance the slot offset. When table changes are infrequent, setting this value ensures timely reclamation of WAL logs.

scan.incremental.snapshot.chunk.key-column

Specifies a column to be used as the chunk key for splitting shards during the snapshot phase.

STRING

No

By default, the first column of the primary key is selected.

scan.incremental.close-idle-reader.enabled

Specifies whether to close idle readers after the snapshot is complete.

Boolean

No

false

To enable this configuration, set execution.checkpointing.checkpoints-after-tasks-finish.enabled to true.

scan.incremental.snapshot.backfill.skip

Specifies whether to skip reading logs during the snapshot phase.

Boolean

No

false

Enabled: The incremental phase starts reading logs from the low watermark. This reduces the number of WAL slots but provides only at-least-once semantics. Recommended when the downstream supports idempotent writes.

Disabled (default): Logs between the low and high watermarks are read during the snapshot phase to ensure consistency. Required for SQL with aggregations or joins.

Type mappings

PostgreSQL to Flink type mappings:

PostgreSQL CDC

Flink

SMALLINT

SMALLINT

INT2

SMALLSERIAL

SERIAL2

INTEGER

INT

SERIAL

BIGINT

BIGINT

BIGSERIAL

REAL

FLOAT

FLOAT4

FLOAT8

DOUBLE

DOUBLE PRECISION

NUMERIC(p, s)

DECIMAL(p, s)

DECIMAL(p, s)

BOOLEAN

BOOLEAN

DATE

DATE

TIME [(p)] [WITHOUT TIMEZONE]

TIME [(p)] [WITHOUT TIMEZONE]

TIMESTAMP [(p)] [WITHOUT TIMEZONE]

TIMESTAMP [(p)] [WITHOUT TIMEZONE]

CHAR(n)

STRING

CHARACTER(n)

VARCHAR(n)

CHARACTER VARYING(n)

TEXT

BYTEA

BYTES

Example

CREATE TABLE source (
  id INT NOT NULL,
  name STRING,
  description STRING,
  weight DECIMAL(10,3)
) WITH (
  'connector' = 'postgres-cdc',
  'hostname' = '<host name>',
  'port' = '<port>',
  'username' = '<user name>',
  'password' = '<password>',
  'database-name' = '<database name>',
  'schema-name' = '<schema name>',
  'table-name' = '<table name>'
);

SELECT * FROM source;

Flink CDC

VVR V11.4+ supports the PostgreSQL connector as a Flink CDC source.

Syntax

source:
  type: postgres
  name: PostgreSQL Source
  hostname: localhost
  port: 5432
  username: pg_username
  password: pg_password
  tables: db.scm.tbl
  slot.name: test_slot
  scan.startup.mode: initial
  server-time-zone: UTC
  connect.timeout: 120s
  decoding.plugin.name: decoderbufs

sink:
  type: ...

Connector options

Option

Description

Required

Data type

Default

Remarks

type

The connector name.

Yes

STRING

Must be postgres.

name

The data source name.

No

STRING

hostname

The domain name or IP address of the PostgreSQL database server.

Yes

STRING

port

The PostgreSQL database port.

No

INTEGER

5432

username

The PostgreSQL username.

Yes

STRING

password

The PostgreSQL password.

Yes

STRING

tables

The table names to capture.

Regex supported.

Yes

STRING

Important

Currently, only tables in the same database can be captured.

A period (.) is treated as a separator for a fully qualified name. To use a period (.) in a regular expression to match any character, escape it with a backslash. For example: bdb.schema_\.*.order_\.*.

slot.name

The PostgreSQL replication slot name.

Yes

STRING

The name must comply with PostgreSQL replication slot naming rules and can contain lowercase letters, numbers, and underscores.

decoding.plugin.name

The name of the PostgreSQL logical decoding plugin installed on the server.

No

STRING

pgoutput

Valid values: decoderbufs and pgoutput.

tables.exclude

The tables to exclude. This option takes effect after the tables option. Regex supported.

No

STRING

See tables option.

server-time-zone

The session time zone of the database server, such as "Asia/Shanghai".

No

STRING

If not set, the system default time zone (ZoneId.systemDefault()) is used.

scan.incremental.snapshot.chunk.size

The size (number of rows) of each chunk in the incremental snapshot framework.

No

INTEGER

8096

When incremental snapshot is enabled, the table is split into multiple chunks for reading. The data of a chunk is cached in memory before it is fully consumed.

A smaller chunk results in a larger total number of chunks for the table. Although this reduces the granularity of fault recovery, it may lead to out-of-memory (OOM) errors and lower overall throughput. Therefore, you need to find a balance and set a reasonable chunk size.

scan.snapshot.fetch.size

The maximum number of records to fetch at a time when reading the full data of a table.

No

INTEGER

1024

scan.startup.mode

The startup mode for data consumption.

No

STRING

initial

Valid values:

  • initial (default): Scans the snapshot on the first startup, and then switches to the latest WAL data.

  • latest-offset: Skips snapshot reading; Starts reading from the end of the WAL, which means it only reads the latest changes made after the connector starts.

  • committed-offset: Skips snapshot reading; It consumes WAL data from a specified offset.

  • snapshot: Consumes only the snapshot and not the incremental data.

scan.incremental.close-idle-reader.enabled

Specifies whether to close idle readers after the snapshot is complete.

No

BOOLEAN

false

To enable this configuration, set execution.checkpointing.checkpoints-after-tasks-finish.enabled to true.

scan.lsn-commit.checkpoints-num-delay

The number of checkpoints to delay before starting to commit LSN offsets.

No

INTEGER

3

Checkpoint LSN offsets are committed on a rolling basis to prevent the inability to recover from state.

connect.timeout

The maximum time the connector waits to connect to the PostgreSQL database server before timing out.

No

DURATION

30s

This value cannot be less than 250 milliseconds.

connect.max-retries

Max retry attempts for the connector to establish a connection.

No

INTEGER

3

connection.pool.size

The connection pool size.

No

INTEGER

20

jdbc.properties.*

Allows users to pass custom JDBC URL properties.

No

STRING

20

Users can pass custom properties, such as 'jdbc.properties.useSSL' = 'false'.

heartbeat.interval

The interval for sending heartbeat events to track the latest available WAL log offset.

No

DURATION

30s

debezium.*

Passes Debezium properties to the Debezium Embedded Engine, which is used to capture data changes from the PostgreSQL server.

No

STRING

Debezium PostgreSQL connector properties: Debezium documentation.

chunk-meta.group.size

The size of the chunk metadata.

No

STRING

1000

If the metadata is larger than this value, it is passed in multiple parts.

metadata.list

A list of readable metadata passed to the downstream, which can be used in the transform module.

No

STRING

false

Use commas (,) as separators. Currently, the available metadata is: op_ts.

scan.incremental.snapshot.unbounded-chunk-first.enabled

Dispatches the unbounded chunk first during the snapshot reading phase.

No

STRING

false

This is an experimental feature. Enabling it can reduce the risk of OOM errors when the TaskManager synchronizes the last chunk during the snapshot phase. We recommend adding this before the job's first startup.

References