The PostgreSQL Change Data Capture (CDC) connector is used to read full savepoint data and change data in sequence from a PostgreSQL database. The exactly-once processing semantics is used to ensure data accuracy even if a failure occurs. This topic describes how to use the Postgres CDC connector.
Background information
The following table describes the capabilities supported by the PostgreSQL CDC connector.
Item | Description |
Table type | Source table. Note You can use the Java Database Connectivity (JDBC) connector for a result table or a dimension table. |
Running mode | Streaming mode. |
Data format | N/A. |
Metric |
Note For more information about the metrics and how to view the metrics, see Report metrics of fully managed Flink to other platforms. |
API type | SQL API. |
Data update or deletion in a result table | N/A. |
Prerequisites
When you use the Postgres CDC connector to read CDC change streams based on the logical replication feature of PostgreSQL databases, make sure that the following conditions are met:
The wal_level parameter is set to logical. This indicates that information required for logical coding is added to write-ahead logging (WAL) logs.
The REPLICA IDENTITY parameter of the change tracking table is set to FULL by using the
ALTER TABLE schema.table REPLICA IDENTITY FULL;
command to ensure data consistency.The value of the max_wal_senders parameter is greater than the number of replication slots used in the database and the value of the max_replication_slots parameter is greater than the number of slots required by your fully managed Flink deployment.
Your Alibaba Cloud account is a superuser or is granted the LOGIN and REPLICATION permissions. Your Alibaba Cloud account is also granted the SELECT permission on the subscription table for full data query.
The configurations related to the preceding conditions in the ApsaraDB RDS for PostgreSQL, Amazon RDS for PostgreSQL, and self-managed PostgreSQL databases may be different. For more information, see Configure a PostgreSQL database.
Limits
Only Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 2.0 or later supports the PostgreSQL CDC connector.
The PostgreSQL CDC connector does not support checkpoints during a full table scan.
If checkpoints are triggered in your deployment during a full table scan, a deployment failover may occur due to a checkpoint timeout. To prevent this issue, we recommend that you add the following configurations to the Other Configuration field in the Parameters section of the Configuration tab. For more information, see How do I configure parameters for deployment running?
execution.checkpointing.interval: 10min execution.checkpointing.tolerable-failed-checkpoints: 100 restart-strategy: fixed-delay restart-strategy.fixed-delay.attempts: 2147483647
The following table describes the parameters in the preceding configurations.
Parameter
Description
Remarks
execution.checkpointing.interval
The interval at which checkpoints are triggered.
The value of this parameter is a duration, such as 10min or 30s.
execution.checkpointing.tolerable-failed-checkpoints
The maximum number of checkpoint failures that are allowed.
The product of the value of this parameter and the value of the execution.checkpointing.interval parameter is the period of time during which savepoints can be read.
NoteIf the table that needs to be scanned is excessively large, we recommend that you set this parameter to a large value.
restart-strategy
The restart policy.
Valid values:
fixed-delay: The deployment is restarted at a fixed delay.
failure-rate: The deployment is restarted based on the failure rate.
exponential-delay: The deployment is restarted with an exponential delay.
For more information, see Restart Strategies.
restart-strategy.fixed-delay.attempts
The maximum number of restart attempts when the restart-strategy parameter is set to fixed-delay.
N/A.
Syntax
CREATE TABLE postgrescdc_source (
shipment_id INT,
order_id INT,
origin STRING,
destination STRING,
is_arrived BOOLEAN
) WITH (
'connector' = 'postgres-cdc',
'hostname' = '<yourHostname>',
'port' = '5432',
'username' = '<yourUserName>',
'password' = '<yourPassWord>',
'database-name' = '<yourDatabaseName>',
'schema-name' = '<yourSchemaName>',
'table-name' = '<yourTableName>'
);
Parameters in the WITH clause
Parameter | Description | Data type | Required | Default value | Remarks |
connector | The type of the table. | STRING | Yes | No default value | Set the value to |
hostname | The IP address or hostname of the PostgreSQL database. | STRING | Yes | No default value | N/A. |
username | The username that is used to access the PostgreSQL database service. | STRING | Yes | No default value | N/A. |
password | The password that is used to access the PostgreSQL database service. | STRING | Yes | No default value | N/A. |
database-name | The name of the database. | STRING | Yes | No default value | If you want to read data from multiple databases, you can set this parameter to a regular expression. |
schema-name | The schema name of the PostgreSQL database. | STRING | Yes | No default value | If you want to read data from multiple schemas, you can set this parameter to a regular expression. |
table-name | The name of the PostgreSQL table. | STRING | Yes | No default value | If you want to read data from multiple tables, you can set this parameter to a regular expression. |
port | The port that is used to access the PostgreSQL database service. | INTEGER | No | 5432 | N/A. |
decoding.plugin.name | The name of the PostgreSQL logical decoding plug-in. | STRING | No | decoderbufs | The plug-in name is determined based on the plug-in that is installed in the PostgreSQL database service. Valid values:
Note If you use ApsaraDB RDS for PostgreSQL, you must enable the logical decoding feature that is supported by the wal2json plug-in. For more information, see Use the wal2json extension. |
debezium.* | The Debezium property parameters. | STRING | No | No default value | The parameters are used to impose fine-grained control over the behavior of Debezium clients. For example, you can set the |
slot.name | The name of the logical decoding slot. | STRING | For Realtime Compute for Apache Flink that uses VVR of a version earlier than 8.0.1, this parameter is not required. For Realtime Compute for Apache Flink that uses VVR 8.0.1 or later, this parameter is required. | For Realtime Compute for Apache Flink that uses VVR of a version earlier than 8.0.1, the default value is flink. For Realtime Compute for Apache Flink that uses VVR 8.0.1 or later, this parameter does not have a default value. | We recommend that you configure the |
Data type mappings
The following table describes the data type mappings between PostgreSQL CDC and Flink fields.
Data type of Postgres CDC | Data type of Flink |
SMALLINT | SMALLINT |
INT2 | |
SMALLSERIAL | |
SERIAL2 | |
INTEGER | INT |
SERIAL | |
BIGINT | BIGINT |
BIGSERIAL | |
REAL | FLOAT |
FLOAT4 | |
FLOAT8 | DOUBLE |
DOUBLE PRECISION | |
NUMERIC(p, s) | DECIMAL(p, s) |
DECIMAL(p, s) | |
BOOLEAN | BOOLEAN |
DATE | DATE |
TIME [(p)] [WITHOUT TIMEZONE] | TIME [(p)] [WITHOUT TIMEZONE] |
TIMESTAMP [(p)] [WITHOUT TIMEZONE] | TIMESTAMP [(p)] [WITHOUT TIMEZONE] |
CHAR(n) | STRING |
CHARACTER(n) | |
VARCHAR(n) | |
CHARACTER VARYING(n) | |
TEXT | |
BYTEA | BYTES |
Sample code
CREATE TABLE source (
id INT NOT NULL,
name STRING,
description STRING,
weight DECIMAL(10,3)
) WITH (
'connector' = 'postgres-cdc',
'hostname' = '<host name>',
'port' = '<port>',
'username' = '<user name>',
'password' = '<password>',
'database-name' = '<database name>',
'schema-name' = '<schema name>',
'table-name' = '<table name>'
);
SELECT * FROM source;
References
For more information about the connectors that are supported by Realtime Compute for Apache Flink, see Supported connectors.
For more information about how to write data to a PolarDB for Oracle 1.0 result table, see PolarDB for Oracle 1.0 connector.
You can use the MySQL connector to read data from or write data to ApsaraDB RDS for MySQL, PolarDB for MySQL, and self-managed MySQL databases. For more information, see MySQL connector.