This topic provides the DDL syntax that is used to create a PostgreSQL Change Data Capture (CDC) source table, describes the parameters in the WITH clause, and provides data type mappings and sample code.
What is a PostgreSQL CDC source table?
- A PostgreSQL database or an ApsaraDB RDS for PostgreSQL database is created, and tables are created in the database. For more information about how to create an ApsaraDB RDS for PostgreSQL database and create a table in the database, see Create a database and an account on an ApsaraDB RDS for PostgreSQL instance.
- The configuration on the ApsaraDB RDS for PostgreSQL database, Amazon RDS for PostgreSQL database, or self-managed PostgreSQL database is complete. For more information, see Configure a PostgreSQL database.
- Only Flink that uses Ververica Runtime (VVR) 2.1.2 or later supports the PostgreSQL CDC connector.
- The PostgreSQL CDC connector reads all data at a time when the connector scans full
table data. In this case, no offset exists for data restoration. Therefore, checkpoints
cannot be triggered during a full table scan.
To prevent a checkpoint from being triggered, the PostgreSQL CDC source table keeps the running checkpoint waiting. If the table that is scanned is excessively large, the scan is time-consuming, which causes the checkpoint to time out. The checkpoint that times out is considered a failed checkpoint. When the default configurations of Flink are used, a failed checkpoint triggers a Flink job failover.To prevent a job failover due to the timeout of a checkpoint, we recommend that you configure the following parameters in the Additional Configuration section on the Advanced tab of the Draft Editor page if the size of the table is too large.
execution.checkpointing.interval: 10min execution.checkpointing.tolerable-failed-checkpoints: 100 restart-strategy: fixed-delay restart-strategy.fixed-delay.attempts: 2147483647
Parameter Description execution.checkpointing.interval The interval at which checkpoints are triggered.
The value of this parameter is a duration, such as 10min or 30s.
execution.checkpointing.tolerable-failed-checkpoints The maximum number of checkpoint failures that are allowed.
The product of the value of this parameter and the value of the execution.checkpointing.interval parameter is the duration during which snapshots can be read. If the table that needs to be scanned is too large, you can set this parameter to a large value.
restart-strategy The restart policy. Valid values:
- fixed-delay: The job is restarted at a fixed delay.
- failure-rate: The job is restarted based on the failure rate.
- exponential-delay: The job is restarted with an exponential delay.
For more information, see Restart Strategies.
restart-strategy.fixed-delay.attempts The maximum number of restart attempts when a fixed delay restart policy is used.
CREATE TABLE postgrescdc_source ( shipment_id INT, order_id INT, origin STRING, destination STRING, is_arrived BOOLEAN ) WITH ( 'connector' = 'postgres-cdc', 'hostname' = '<yourHostname>', 'port' = '5432', 'username' = '<yourUserName>', 'password' = '<yourPassWord>', 'database-name' = '<yourDatabaseName>', 'schema-name' = '<yourSchemaName>', 'table-name' = '<yourTableName>' );
Parameters in the WITH clause
|connector||The type of the source table.||Yes||STRING||Set the value to
|hostname||The IP address or hostname of the PostgreSQL database.||Yes||STRING||N/A.|
|username||The username that is used to access the PostgreSQL database service.||Yes||STRING||N/A.|
|password||The password that is used to access the PostgreSQL database service.||Yes||STRING||N/A.|
|database-name||The name of the database.||Yes||STRING||You can set this parameter to a regular expression to read data from multiple databases.|
|schema-name||The schema name of the PostgreSQL database.||Yes||STRING||You can set this parameter to a regular expression to read data from multiple schemas.|
|table-name||The name of the PostgreSQL table.||Yes||STRING||You can set this parameter to a regular expression to read data from multiple tables.|
|port||The port that is used to access the PostgreSQL database service.||No||INTEGER||Default value: 5432.|
|decoding.plugin.name||The name of the PostgreSQL logical decoding plug-in.||No||STRING||The plug-in name is determined based on the plug-in that is installed in the PostgreSQL
database service. Valid values:
Note If you use ApsaraDB RDS for PostgreSQL, you must enable the logical decoding feature that is supported by the wal2json plug-in. For more information, see Use the wal2json plug-in.
|debezium.*||The Debezium property parameters.||No||STRING||The parameters are used to impose fine-grained control over the behavior of Debezium
clients. For example, you can set
Note We recommend that you configure the
Data type mappings
|Data type of PostgreSQL CDC||Data type of Flink|
|NUMERIC(p, s)||DECIMAL(p, s)|
|TIME [(p)] [WITHOUT TIMEZONE]||TIME [(p)] [WITHOUT TIMEZONE]|
|TIMESTAMP [(p)] [WITHOUT TIMEZONE]||TIMESTAMP [(p)] [WITHOUT TIMEZONE]|