The PostgreSQL change data capture (CDC) connector reads full snapshots and change data from a PostgreSQL database. The connector ensures that each data record is read exactly once and maintains exactly-once semantics during failure recovery. This topic describes how to use the PostgreSQL CDC connector.
Background information
The Postgres CDC connector has the following capabilities.
Item | Description |
Supported types | Source table Note You can use the JDBC connector to create a sink or dimension table. |
Running mode | Streaming mode only |
Data format | Not applicable |
Specific monitoring metrics |
Note
|
API type | SQL and Data Ingestion YAML |
Update or delete data in sink tables | Not applicable |
Features
The PostgreSQL CDC connector uses the incremental snapshot framework, which is available in Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 8.0.6 or later. The connector first reads the full historical data and then automatically switches to reading write-ahead logging (WAL) change logs. This process ensures that no data is missed or duplicated. Even if a failure occurs, data is processed with exactly-once semantics. The PostgreSQL CDC source table supports concurrent reading of full data, lock-free reading, and resumable data transfer.
As a source table, it has the following features and advantages:
Unifies stream and batch processing. It supports reading full and incremental data, which eliminates the need to maintain two separate processes.
Supports concurrent reading of full data for horizontal performance scaling.
Seamlessly switches from reading full data to reading incremental data and automatically scales in to save compute resources.
Supports resumable data transfer during the full data reading phase for improved stability.
Reads full data without locks to avoid affecting online business operations.
Prerequisites
The PostgreSQL CDC connector reads CDC data streams using the logical replication feature of a PostgreSQL database. The connector supports Alibaba Cloud RDS for PostgreSQL, Amazon RDS for PostgreSQL, and self-managed PostgreSQL.
The configurations for Alibaba Cloud RDS for PostgreSQL, Amazon RDS for PostgreSQL, and self-managed PostgreSQL differ. Before you begin, complete the required configurations as described in the Configure PostgreSQL document.
After you complete the configurations, ensure that the following conditions are met:
The value of the wal_level parameter is set to logical. This adds the information required for logical encoding to the write-ahead logging (WAL).
The REPLICA IDENTITY of the subscribed table is set to FULL. This ensures that INSERT and UPDATE events include the previous values of all columns in the table, which guarantees data synchronization consistency.
NoteREPLICA IDENTITY is a table-level setting specific to PostgreSQL. It determines whether the logical decoding plugin includes the previous values of the involved table columns during INSERT and UPDATE events. For more information about the values of REPLICA IDENTITY, see REPLICA IDENTITY.
The values of the max_wal_senders and max_replication_slots parameters are greater than the sum of the replication slots currently in use by the database and the number of slots required by the Flink job.
The account has SUPERUSER system permissions or both LOGIN and REPLICATION permissions. The account must also have SELECT permission on the subscribed tables to query full data.
Precautions
The incremental snapshot feature of PostgreSQL CDC is supported only in Realtime Compute for Apache Flink V8.0.6 and later.
A Flink PostgreSQL CDC job relies on a Replication Slot to ensure that the write-ahead log (WAL) is not purged prematurely, which guarantees data consistency. However, improper management can lead to issues such as wasted disk space or data read latency. Follow these recommendations:
Promptly purge slots that are no longer in use
Flink does not automatically delete a replication slot, even after a job has stopped, especially in stateless restart scenarios. This behavior prevents data loss that could occur if the WAL is purged.
If you confirm that a job will not be restarted, you must manually delete its associated replication slot to free up disk space.
ImportantLifecycle management: Treat a replication slot as part of the job's resources. Manage it in sync with job start and stop operations.
Avoid reusing old slots
A new job must use a new slot name instead of reusing an old one. Reusing a slot can cause the job to read a large volume of historical WAL data upon startup, which delays reading the latest data.
PostgreSQL logical replication requires that a slot can be used by only one connection. Therefore, different jobs must use different slot names.
ImportantNaming convention: When you customize `slot.name`, avoid using names with numeric suffixes, such as `my_slot_1`, to prevent conflicts with temporary slots.
Slot behavior when incremental snapshot is enabled
Prerequisites: You must enable checkpointing, and the source table must have a primary key.
Slot creation rules:
Incremental snapshot disabled: Only a single concurrency is supported, which uses one global slot.
Incremental snapshot enabled:
Full phase: Each concurrent source subtask creates a temporary slot with a name in the format
${slot.name}_${task_id}.Incremental phase: All temporary slots are automatically reclaimed. Only one global slot is retained.
Maximum number of slots: Source concurrency + 1 (during the full snapshot phase)
Resources and performance
If the number of slots or the amount of disk space in PostgreSQL is limited, you can reduce the concurrency of the full snapshot phase to decrease the number of temporary slots. This will slow down the speed at which full data is read.
If the downstream system supports idempotent writes, you can set
scan.incremental.snapshot.backfill.skip = trueto skip the WAL backfill during the full phase and accelerate the startup speed.This configuration provides only at-least-once semantics. It is not suitable for jobs with stateful computations such as aggregations or dimension table joins, because historical changes that are required for intermediate states might be lost.
When the incremental snapshot feature is not enabled, the PostgreSQL CDC connector does not support performing checkpoints during the full table scan phase.
If incremental snapshot is not enabled, a job might fail over due to a checkpoint timeout if a checkpoint is triggered during the full table scan phase. Therefore, you can configure the following parameters in the Other Configurations section to prevent failovers due to checkpoint timeouts during the full synchronization phase. For more information, see How do I configure custom runtime parameters for a job?.
execution.checkpointing.interval: 10min execution.checkpointing.tolerable-failed-checkpoints: 100 restart-strategy: fixed-delay restart-strategy.fixed-delay.attempts: 2147483647The following table describes the parameters.
Parameter
Description
Notes
execution.checkpointing.interval
The interval at which checkpoints are triggered.
Data type: Duration. Example: 10 min or 30 s.
execution.checkpointing.tolerable-failed-checkpoints
The number of checkpoint failures that can be tolerated.
The product of this parameter's value and the checkpoint scheduling interval determines the allowed snapshot reading time.
NoteIf the table is very large, set this parameter to a larger value.
restart-strategy
The restart policy.
Valid values:
fixed-delay: The fixed-delay restart policy.
failure-rate: The failure-rate restart policy.
exponential-delay: The exponential-delay restart policy.
For more information, see Restart Strategies.
restart-strategy.fixed-delay.attempts
The maximum number of restart attempts for the fixed-delay restart policy.
None.
SQL
Syntax
CREATE TABLE postgrescdc_source (
shipment_id INT,
order_id INT,
origin STRING,
destination STRING,
is_arrived BOOLEAN
) WITH (
'connector' = 'postgres-cdc',
'hostname' = '<yourHostname>',
'port' = '5432',
'username' = '<yourUserName>',
'password' = '<yourPassWord>',
'database-name' = '<yourDatabaseName>',
'schema-name' = '<yourSchemaName>',
'table-name' = '<yourTableName>'
);Parameters in the WITH clause
Parameter | Description | Data type | Required | Default value | Notes |
connector | The type of the connector. | STRING | Yes | None | The value must be |
hostname | The IP address or hostname of the PostgreSQL database. | STRING | Yes | None | None. |
username | The username for the PostgreSQL database service. | STRING | Yes | None | None. |
password | The password for the PostgreSQL database service. | STRING | Yes | None | None. |
database-name | The database name. | STRING | Yes | None | The database name. |
schema-name | The PostgreSQL schema name. | STRING | Yes | None | The schema name supports regular expressions to read data from multiple schemas. |
table-name | The PostgreSQL table name. | STRING | Yes | None | The table name supports regular expressions to read data from multiple tables. |
port | The port number of the PostgreSQL database service. | INTEGER | No | 5432 | None. |
decoding.plugin.name | The name of the PostgreSQL logical decoding plugin. | STRING | No | decoderbufs | This is determined by the plugin installed on the PostgreSQL service. The supported plugins are as follows:
|
slot.name | The name of the logical decoding slot. | STRING | Optional for versions earlier than 8.0.1. Required for versions 8.0.1 and later. | The default value is flink for versions earlier than 8.0.1. No default value for versions 8.0.1 and later. | Set the |
debezium.* | The Debezium property parameter. | STRING | No | None | Provides more granular control over the Debezium client behavior. For example, |
scan.incremental.snapshot.enabled | Specifies whether to enable incremental snapshot. | BOOLEAN | No | false | Valid values:
Note
|
scan.startup.mode | The startup mode for data consumption. | STRING | No | initial | Valid values:
|
changelog-mode | The changelog mode for encoding stream changes. | String | No | all | Supported changelog modes include the following:
|
heartbeat.interval.ms | The interval for sending heartbeat packets. | Duration | No | 30s | The unit is milliseconds. The PostgreSQL CDC connector actively sends heartbeat packets to the database to advance the slot offset. When table changes are infrequent, setting this value can promptly purge WAL logs. |
scan.incremental.snapshot.chunk.key-column | Specifies a column as the splitting column for sharding in the snapshot phase. | STRING | No | None | By default, the first column of the primary key is selected. |
scan.incremental.close-idle-reader.enabled | Specifies whether to close idle readers after the snapshot ends. | Boolean | No | false | This parameter is valid only if you set the |
scan.incremental.snapshot.backfill.skip | Specifies whether to skip log reading in the full snapshot phase. | Boolean | No | false | Valid values:
|
Data type mappings
The following table describes the mappings between PostgreSQL and Flink data types.
PostgreSQL data type | Flink data type |
SMALLINT | SMALLINT |
INT2 | |
SMALLSERIAL | |
SERIAL2 | |
INTEGER | INT |
SERIAL | |
BIGINT | BIGINT |
BIGSERIAL | |
REAL | FLOAT |
FLOAT4 | |
FLOAT8 | DOUBLE |
DOUBLE PRECISION | |
NUMERIC(p, s) | DECIMAL(p, s) |
DECIMAL(p, s) | |
BOOLEAN | BOOLEAN |
DATE | DATE |
TIME [(p)] [WITHOUT TIMEZONE] | TIME [(p)] [WITHOUT TIMEZONE] |
TIMESTAMP [(p)] [WITHOUT TIMEZONE] | TIMESTAMP [(p)] [WITHOUT TIMEZONE] |
CHAR(n) | STRING |
CHARACTER(n) | |
VARCHAR(n) | |
CHARACTER VARYING(n) | |
TEXT | |
BYTEA | BYTES |
Sample code
CREATE TABLE source (
id INT NOT NULL,
name STRING,
description STRING,
weight DECIMAL(10,3)
) WITH (
'connector' = 'postgres-cdc',
'hostname' = '<host name>',
'port' = '<port>',
'username' = '<user name>',
'password' = '<password>',
'database-name' = '<database name>',
'schema-name' = '<schema name>',
'table-name' = '<table name>'
);
SELECT * FROM source;Data ingestion
In Realtime Compute for Apache Flink V11.4 and later, you can use the PostgreSQL connector as a data source in a Data Ingestion YAML job.
Syntax
source:
type: postgres
name: PostgreSQL Source
hostname: localhost
port: 5432
username: pg_username
password: pg_password
tables: db.scm.tbl
slot.name: test_slot
scan.startup.mode: initial
server-time-zone: UTC
connect.timeout: 120s
decoding.plugin.name: decoderbufs
sink:
type: ...Parameters
Parameter | Description | Required | Data type | Default value | Notes |
type | The type of the data source. | Yes | STRING | None | The value must be postgres. |
name | The name of the data source. | No | STRING | None | None. |
hostname | The domain name or IP address of the PostgreSQL database server. | Yes | STRING | (none) | None. |
port | The port exposed by the PostgreSQL database server. | No | INTEGER | 5432 | None. |
username | The username for the PostgreSQL database. | Yes | STRING | (none) | None. |
password | The password for the PostgreSQL database. | Yes | STRING | (none) | None. |
tables | The names of the PostgreSQL database tables to capture. You can use a regular expression to monitor multiple tables that match the expression. | Yes | STRING | (none) | Important Currently, you can capture only tables in the same database. A period (.) is used as a separator for database, schema, and table names. To use a period (.) to match any character in a regular expression, you must escape it with a backslash (\). For example: |
slot.name | The name of the PostgreSQL replication slot. | Yes | STRING | (none) | The name must follow PostgreSQL replication slot naming conventions and can contain only lowercase letters, numbers, and underscores. |
decoding.plugin.name | The name of the logical decoding plug-in installed on the PostgreSQL server. | No | STRING |
| Valid values include |
tables.exclude | The names of the PostgreSQL database tables to exclude. This parameter takes effect after the tables parameter. | No | STRING | (none) | You can also use a regular expression to exclude multiple tables that match the expression. The usage is the same as the tables parameter. |
server-time-zone | The session time zone of the database server, such as "Asia/Shanghai". | No | STRING | (none) | If this parameter is not set, the system default time zone ( |
scan.incremental.snapshot.chunk.size | The size of each chunk in the incremental snapshot framework, specified as the number of rows. | No | INTEGER | 8096 | When you enable incremental snapshot reading, the table is split into chunks. Data from each chunk is cached in memory before it is fully read. A smaller chunk size results in more chunks. This improves the granularity of fault recovery but can cause out-of-memory (OOM) errors and lower overall throughput. Set a reasonable chunk size to balance these factors. |
scan.snapshot.fetch.size | The maximum number of records to fetch at a time when reading the full data of a table. | No | INTEGER | 1024 | None. |
scan.startup.mode | The startup mode for data consumption. | No | STRING | initial | Valid values:
|
scan.incremental.close-idle-reader.enabled | Specifies whether to close idle readers after the snapshot phase ends. | No | BOOLEAN | false | This configuration takes effect only if you set execution.checkpointing.checkpoints-after-tasks-finish.enabled to true. |
scan.lsn-commit.checkpoints-num-delay | The number of checkpoints to delay before the connector starts to commit the LSN offset. | No | INTEGER | 3 | The checkpoint LSN offset is committed on a rolling basis to prevent recovery failures from state. |
connect.timeout | The maximum time that the connector waits to connect to the PostgreSQL database server before a timeout occurs. | No | DURATION | 30s | The value cannot be less than 250 ms. |
connect.max-retries | The maximum number of retries for the connector to establish a connection to the PostgreSQL database server. | No | INTEGER | 3 | None. |
connection.pool.size | The size of the connection pool. | No | INTEGER | 20 | None. |
jdbc.properties.* | Allows you to pass custom JDBC URL properties. | No | STRING | 20 | You can pass custom properties, such as |
heartbeat.interval | The interval for sending heartbeat events to track the latest available WAL offset. | No | DURATION | 30s | None. |
debezium.* | Passes Debezium properties to the Debezium Embedded Engine, which is used to capture data changes from the PostgreSQL server. | No | STRING | (none) | For more information about Debezium PostgreSQL connector properties, see the related documentation. |
chunk-meta.group.size | The size of the chunk metadata. | No | STRING | 1000 | If the metadata is larger than this value, the metadata is passed in multiple parts. |
metadata.list | A list of readable metadata that is passed to the downstream and can be used in the transform module. | No | STRING | false | Use a comma (,) to separate the values. The currently available metadata is: |
scan.incremental.snapshot.unbounded-chunk-first.enabled | Specifies whether to distribute the unbounded chunk first during the snapshot reading phase. | No | STRING | false | Valid values:
Important This is an experimental feature. If you enable this feature, you can reduce the risk of out-of-memory (OOM) errors when a TaskManager synchronizes the last chunk in the snapshot phase. We recommend that you add this configuration before the job starts for the first time. |
References
For a list of connectors supported by Realtime Compute for Apache Flink, see Supported connectors.
To write data to a PolarDB for PostgreSQL (Oracle Compatible) 1.0 sink table, see PolarDB for PostgreSQL (Oracle Compatible) 1.0.
To read from or write to an RDS for MySQL, PolarDB for MySQL, or self-managed MySQL database, use the MySQL connector.