The Postgres CDC connector reads a full snapshot of a PostgreSQL database and then captures change data, with exactly-once processing semantics.
Overview
The Postgres CDC connector supports the following capabilities:
|
Category |
Details |
|
Supported types |
SQL source, Flink CDC source Note
Use the JDBC connector for sink tables and lookup (dimension) tables. |
|
Execution mode |
Streaming |
|
Data format |
Not applicable |
|
Metrics |
|
|
API types |
SQL and Flink CDC |
|
Sink update/delete |
Not applicable |
Features
Starting from VVR 8.0.6, the Postgres CDC connector integrates with the incremental snapshot framework. It reads full historical data, then automatically switches to reading change logs from the WAL, with exactly-once semantics.
Key features:
-
Unified stream and batch processing. Reads both full and incremental data in a single job.
-
Concurrent snapshot reads. Scales horizontally for faster performance.
-
Seamless full-to-incremental switching. Automatically scales in to reduce resource usage.
-
Resumable reads. Resumes from breakpoints during the snapshot phase for improved stability.
-
Lock-free reading. No locks required, avoiding impact on online operations.
Prerequisites
The Postgres CDC connector reads CDC streams through PostgreSQL logical replication. It supports ApsaraDB RDS for PostgreSQL, Amazon RDS for PostgreSQL, and self-managed PostgreSQL.
Configuration differs by deployment type. Configure Postgres.
After configuration, verify the following:
-
wal_level is set to
logicalto enable logical decoding. -
REPLICA IDENTITY of each subscribed table is set to
FULL, so thatINSERTandUPDATEevents include previous column values for data consistency.NoteREPLICA IDENTITYis a PostgreSQL table-level setting that controls whetherINSERTandUPDATEevents include previous column values. REPLICA IDENTITY. -
max_wal_sendersandmax_replication_slotsvalues exceed the sum of slots in use and slots required by the Flink job. -
The account has
SUPERUSERprivileges, or bothLOGINandREPLICATIONpermissions, plusSELECTon the subscribed tables.
-
If your Postgres table contains generated columns, set the publish_generated_columns parameter to
storedwhen creating the slot. Otherwise, the snapshot and incremental phase schemas may differ.
Usage notes
The incremental snapshot feature requires VVR 8.0.6 or later.
Replication slots
Flink PostgreSQL CDC jobs use replication slots to prevent premature WAL purging and ensure data consistency. Poorly managed slots can cause excessive disk usage or read delays. Best practices:
-
Clean up unused slots promptly
-
Flink does not automatically delete replication slots after a job stops or restarts statelessly, to prevent WAL data loss.
-
If a job will not restart, manually delete its replication slot to free disk space.
NoteLifecycle management: Treat replication slots as job-level resources and manage them alongside job starts and stops.
-
-
Avoid reusing old slots
-
Always use a new slot name. Reusing an old slot forces the job to read accumulated historical WAL data at startup, delaying new data processing.
-
PostgreSQL requires one slot per connection. Each job must use a unique slot name.
NoteNaming convention: When you customize
slot.name, avoid names with numeric suffixes such asmy_slot_1, to prevent conflicts with temporary slots.
-
-
Slot behavior with incremental snapshots enabled
-
Prerequisites: Checkpoints must be enabled, and the source table must have a primary key defined.
-
Slot creation rules:
-
Incremental snapshot disabled: Only a parallelism of 1 is supported. One global slot is used.
-
Incremental snapshot enabled:
-
Snapshot phase: Each concurrent source subtask creates a temporary slot. The naming format is
${slot.name}_${task_id}. -
Incremental phase: All temporary slots are automatically reclaimed. Only one global slot is retained.
-
-
-
Maximum number of slots: Source parallelism + 1 (during the snapshot phase)
-
-
Resources and performance
-
If available slots or disk space is limited, reduce snapshot parallelism to use fewer temporary slots. This reduces snapshot read speed.
-
If the downstream sink supports idempotent writes, set
scan.incremental.snapshot.backfill.skip = trueto skip WAL backfill during the snapshot phase and accelerate startup.This provides only at-least-once semantics and is not suitable for stateful computations (aggregations or lookup joins) because required historical changes may be lost.
-
-
When incremental snapshots are disabled, checkpoints are not supported during the snapshot phase.
Reuse Postgres subscription
The Postgres CDC connector relies on a publication to determine which tables' changes are pushed to a slot. If multiple jobs share the same publication, their configurations will be overwritten.
Cause
The default publication.autocreate.mode is filtered, which includes only tables in the connector configuration. This modifies the publication on job startup, which can affect other jobs.
Solution
-
Create a publication in PostgreSQL that includes all monitored tables, or create a separate publication per job.
-- Create a publication named my_flink_pub that includes all tables (or specified tables, creating one publication per job) CREATE PUBLICATION my_flink_pub FOR TABLE table_a, table_b; -- Or more simply, include all tables in the database CREATE PUBLICATION my_flink_pub FOR ALL TABLES;NoteSubscribing to all tables is not recommended for large databases due to excessive bandwidth and CPU usage on the Flink cluster.
-
Add the following Flink configurations:
-
debezium.publication.name = 'my_flink_pub'(Specifies the publication name) -
debezium.publication.autocreate.mode = 'disabled'(Prevents Flink from trying to create or modify the publication on startup)
-
This provides complete isolation and prevents new jobs from affecting existing ones.
SQL
Syntax
CREATE TABLE postgrescdc_source (
id INT NOT NULL,
name STRING,
description STRING,
weight DECIMAL(10,3)
) WITH (
'connector' = 'postgres-cdc',
'hostname' = '<host name>',
'port' = '<port>',
'username' = '<user name>',
'password' = '<password>',
'database-name' = '<database name>',
'schema-name' = '<schema name>',
'table-name' = '<table name>',
'decoding.plugin.name'= 'pgoutput',
'scan.incremental.snapshot.enabled' = 'true',
-- Skipping backfill can speed up reads and reduce resource usage, but it may cause data duplication. Enable this if the downstream sink is idempotent.
'scan.incremental.snapshot.backfill.skip' = 'false',
-- In a production environment, set this to 'filtered' or 'disabled' and manage the publication manually instead of through Flink.
'debezium-publication.autocreate.mode' = 'disabled'
-- If you have multiple sources, configure a different publication for each source.
--'debezium.publication.name' = 'my_flink_pub'
);
Connector options
|
Option |
Description |
Data type |
Required |
Default |
Remarks |
|
connector |
The connector name. |
STRING |
Yes |
– |
The value must be |
|
hostname |
The IP address or hostname of the PostgreSQL database. |
STRING |
Yes |
– |
– |
|
username |
The username for the PostgreSQL database service. |
STRING |
Yes |
– |
– |
|
password |
The password for the PostgreSQL database service. |
STRING |
Yes |
– |
– |
|
database-name |
The PostgreSQL database name. |
STRING |
Yes |
– |
- |
|
schema-name |
The PostgreSQL schema name. Regex supported. |
STRING |
Yes |
– |
- |
|
table-name |
The PostgreSQL table name. Regex supported. |
STRING |
Yes |
– |
The table name supports regular expressions to read data from multiple tables. |
|
port |
The port number. |
INTEGER |
No |
5432 |
– |
|
decoding.plugin.name |
The name of the PostgreSQL logical decoding plugin. |
STRING |
No |
decoderbufs |
This is determined by the plugin installed on the PostgreSQL service. The supported plugins are:
|
|
slot.name |
The logical decoding slot name. |
STRING |
Required for VVR 8.0.1 and later. Optional for earlier versions. |
|
Set a unique No default value for VVR 8.0.1+. |
|
debezium.* |
Debezium properties and parameters |
STRING |
No |
– |
Provides finer-grained control over the Debezium client's behavior. For example, |
|
scan.incremental.snapshot.enabled |
Specifies whether to enable incremental snapshots. |
BOOLEAN |
No |
false |
Note
|
|
scan.startup.mode |
The startup mode for data consumption. |
STRING |
No |
initial |
Valid values:
|
|
changelog-mode |
The changelog mode for encoding stream changes. |
String |
No |
all |
Supported changelog modes:
|
|
heartbeat.interval.ms |
The interval for sending heartbeat packets. |
Duration |
No |
30s |
The unit is milliseconds. The Postgres CDC connector actively sends heartbeats to the database to advance the slot offset. When table changes are infrequent, setting this value ensures timely reclamation of WAL logs. |
|
scan.incremental.snapshot.chunk.key-column |
Specifies a column to be used as the chunk key for splitting shards during the snapshot phase. |
STRING |
No |
– |
By default, the first column of the primary key is selected. |
|
scan.incremental.close-idle-reader.enabled |
Specifies whether to close idle readers after the snapshot is complete. |
Boolean |
No |
false |
To enable this configuration, set |
|
scan.incremental.snapshot.backfill.skip |
Specifies whether to skip reading logs during the snapshot phase. |
Boolean |
No |
false |
Enabled: The incremental phase starts reading logs from the low watermark. This reduces the number of WAL slots but provides only at-least-once semantics. Recommended when the downstream supports idempotent writes. Disabled (default): Logs between the low and high watermarks are read during the snapshot phase to ensure consistency. Required for SQL with aggregations or joins. |
Type mappings
PostgreSQL to Flink type mappings:
|
PostgreSQL CDC |
Flink |
|
SMALLINT |
SMALLINT |
|
INT2 |
|
|
SMALLSERIAL |
|
|
SERIAL2 |
|
|
INTEGER |
INT |
|
SERIAL |
|
|
BIGINT |
BIGINT |
|
BIGSERIAL |
|
|
REAL |
FLOAT |
|
FLOAT4 |
|
|
FLOAT8 |
DOUBLE |
|
DOUBLE PRECISION |
|
|
NUMERIC(p, s) |
DECIMAL(p, s) |
|
DECIMAL(p, s) |
|
|
BOOLEAN |
BOOLEAN |
|
DATE |
DATE |
|
TIME [(p)] [WITHOUT TIMEZONE] |
TIME [(p)] [WITHOUT TIMEZONE] |
|
TIMESTAMP [(p)] [WITHOUT TIMEZONE] |
TIMESTAMP [(p)] [WITHOUT TIMEZONE] |
|
CHAR(n) |
STRING |
|
CHARACTER(n) |
|
|
VARCHAR(n) |
|
|
CHARACTER VARYING(n) |
|
|
TEXT |
|
|
BYTEA |
BYTES |
Example
CREATE TABLE source (
id INT NOT NULL,
name STRING,
description STRING,
weight DECIMAL(10,3)
) WITH (
'connector' = 'postgres-cdc',
'hostname' = '<host name>',
'port' = '<port>',
'username' = '<user name>',
'password' = '<password>',
'database-name' = '<database name>',
'schema-name' = '<schema name>',
'table-name' = '<table name>'
);
SELECT * FROM source;
Flink CDC
VVR V11.4+ supports the PostgreSQL connector as a Flink CDC source.
Syntax
source:
type: postgres
name: PostgreSQL Source
hostname: localhost
port: 5432
username: pg_username
password: pg_password
tables: db.scm.tbl
slot.name: test_slot
scan.startup.mode: initial
server-time-zone: UTC
connect.timeout: 120s
decoding.plugin.name: decoderbufs
sink:
type: ...
Connector options
|
Option |
Description |
Required |
Data type |
Default |
Remarks |
|
type |
The connector name. |
Yes |
STRING |
– |
Must be |
|
name |
The data source name. |
No |
STRING |
– |
– |
|
hostname |
The domain name or IP address of the PostgreSQL database server. |
Yes |
STRING |
– |
– |
|
port |
The PostgreSQL database port. |
No |
INTEGER |
5432 |
– |
|
username |
The PostgreSQL username. |
Yes |
STRING |
– |
– |
|
password |
The PostgreSQL password. |
Yes |
STRING |
– |
– |
|
tables |
The table names to capture. Regex supported. |
Yes |
STRING |
– |
Important
Currently, only tables in the same database can be captured. A period (.) is treated as a separator for a fully qualified name. To use a period (.) in a regular expression to match any character, escape it with a backslash. For example: |
|
slot.name |
The PostgreSQL replication slot name. |
Yes |
STRING |
– |
The name must comply with PostgreSQL replication slot naming rules and can contain lowercase letters, numbers, and underscores. |
|
decoding.plugin.name |
The name of the PostgreSQL logical decoding plugin installed on the server. |
No |
STRING |
|
Valid values: |
|
tables.exclude |
The tables to exclude. This option takes effect after the |
No |
STRING |
– |
See |
|
server-time-zone |
The session time zone of the database server, such as "Asia/Shanghai". |
No |
STRING |
– |
If not set, the system default time zone ( |
|
scan.incremental.snapshot.chunk.size |
The size (number of rows) of each chunk in the incremental snapshot framework. |
No |
INTEGER |
8096 |
When incremental snapshot is enabled, the table is split into multiple chunks for reading. The data of a chunk is cached in memory before it is fully consumed. A smaller chunk results in a larger total number of chunks for the table. Although this reduces the granularity of fault recovery, it may lead to out-of-memory (OOM) errors and lower overall throughput. Therefore, you need to find a balance and set a reasonable chunk size. |
|
scan.snapshot.fetch.size |
The maximum number of records to fetch at a time when reading the full data of a table. |
No |
INTEGER |
1024 |
– |
|
scan.startup.mode |
The startup mode for data consumption. |
No |
STRING |
initial |
Valid values:
|
|
scan.incremental.close-idle-reader.enabled |
Specifies whether to close idle readers after the snapshot is complete. |
No |
BOOLEAN |
false |
To enable this configuration, set |
|
scan.lsn-commit.checkpoints-num-delay |
The number of checkpoints to delay before starting to commit LSN offsets. |
No |
INTEGER |
3 |
Checkpoint LSN offsets are committed on a rolling basis to prevent the inability to recover from state. |
|
connect.timeout |
The maximum time the connector waits to connect to the PostgreSQL database server before timing out. |
No |
DURATION |
30s |
This value cannot be less than 250 milliseconds. |
|
connect.max-retries |
Max retry attempts for the connector to establish a connection. |
No |
INTEGER |
3 |
– |
|
connection.pool.size |
The connection pool size. |
No |
INTEGER |
20 |
– |
|
jdbc.properties.* |
Allows users to pass custom JDBC URL properties. |
No |
STRING |
20 |
Users can pass custom properties, such as |
|
heartbeat.interval |
The interval for sending heartbeat events to track the latest available WAL log offset. |
No |
DURATION |
30s |
– |
|
debezium.* |
Passes Debezium properties to the Debezium Embedded Engine, which is used to capture data changes from the PostgreSQL server. |
No |
STRING |
– |
Debezium PostgreSQL connector properties: Debezium documentation. |
|
chunk-meta.group.size |
The size of the chunk metadata. |
No |
STRING |
1000 |
If the metadata is larger than this value, it is passed in multiple parts. |
|
metadata.list |
A list of readable metadata passed to the downstream, which can be used in the transform module. |
No |
STRING |
false |
Use commas (,) as separators. Currently, the available metadata is: |
|
scan.incremental.snapshot.unbounded-chunk-first.enabled |
Dispatches the unbounded chunk first during the snapshot reading phase. |
No |
STRING |
false |
This is an experimental feature. Enabling it can reduce the risk of OOM errors when the TaskManager synchronizes the last chunk during the snapshot phase. We recommend adding this before the job's first startup. |