This topic provides the DDL syntax that is used to create a PostgreSQL Change Data Capture (CDC) source table, describes the parameters in the WITH clause, and provides data type mappings and sample code.

Notice PostgreSQL CDC source tables are in public preview. If you have high requirements for job stability, we recommend that you do not use PostgreSQL CDC source tables.

What is a PostgreSQL CDC source table?

A PostgreSQL CDC source table is a streaming source table of PostgreSQL. This table is used to read full snapshot data and change data in sequence from a PostgreSQL database. The exactly-once processing semantics is used to ensure data accuracy even if a failure occurs.
Note If you want to use the PostgreSQL CDC connector to read data from an open source PostgreSQL database, you must make sure that the database is of PostgreSQL 9.6 or later.

Prerequisites

Limits

  • Only Flink that uses Ververica Runtime (VVR) 2.1.2 or later supports the PostgreSQL CDC connector.
  • The PostgreSQL CDC connector reads all data at a time when the connector scans full table data. In this case, no offset exists for data restoration. Therefore, checkpoints cannot be triggered during a full table scan.

    To prevent a checkpoint from being triggered, the PostgreSQL CDC source table keeps the running checkpoint waiting. If the table that is scanned is excessively large, the scan is time-consuming, which causes the checkpoint to time out. The checkpoint that times out is considered a failed checkpoint. When the default configurations of Flink are used, a failed checkpoint triggers a Flink job failover.

    To prevent a job failover due to the timeout of a checkpoint, we recommend that you configure the following parameters in the Additional Configuration section on the Advanced tab of the Draft Editor page if the size of the table is too large.
    execution.checkpointing.interval: 10min
    execution.checkpointing.tolerable-failed-checkpoints: 100
    restart-strategy: fixed-delay
    restart-strategy.fixed-delay.attempts: 2147483647
    Parameter Description
    execution.checkpointing.interval The interval at which checkpoints are triggered.

    The value of this parameter is a duration, such as 10min or 30s.

    execution.checkpointing.tolerable-failed-checkpoints The maximum number of checkpoint failures that are allowed.

    The product of the value of this parameter and the value of the execution.checkpointing.interval parameter is the duration during which snapshots can be read. If the table that needs to be scanned is too large, you can set this parameter to a large value.

    restart-strategy The restart policy. Valid values:
    • fixed-delay: The job is restarted at a fixed delay.
    • failure-rate: The job is restarted based on the failure rate.
    • exponential-delay: The job is restarted with an exponential delay.

    For more information, see Restart Strategies.

    restart-strategy.fixed-delay.attempts The maximum number of restart attempts when a fixed delay restart policy is used.

DDL syntax

CREATE TABLE postgrescdc_source (
  shipment_id INT,
  order_id INT,
  origin STRING,
  destination STRING,
  is_arrived BOOLEAN
) WITH (
  'connector' = 'postgres-cdc',
  'hostname' = '<yourHostname>',
  'port' = '5432',
  'username' = '<yourUserName>',
  'password' = '<yourPassWord>',
  'database-name' = '<yourDatabaseName>',
  'schema-name' = '<yourSchemaName>',
  'table-name' = '<yourTableName>'
);

Parameters in the WITH clause

Parameter Description Required Data type Remarks
connector The type of the source table. Yes STRING Set the value to postgres-cdc.
hostname The IP address or hostname of the PostgreSQL database. Yes STRING N/A.
username The username that is used to access the PostgreSQL database service. Yes STRING N/A.
password The password that is used to access the PostgreSQL database service. Yes STRING N/A.
database-name The name of the database. Yes STRING You can set this parameter to a regular expression to read data from multiple databases.
schema-name The schema name of the PostgreSQL database. Yes STRING You can set this parameter to a regular expression to read data from multiple schemas.
table-name The name of the PostgreSQL table. Yes STRING You can set this parameter to a regular expression to read data from multiple tables.
port The port that is used to access the PostgreSQL database service. No INTEGER Default value: 5432.
decoding.plugin.name The name of the PostgreSQL logical decoding plug-in. No STRING The plug-in name is determined based on the plug-in that is installed in the PostgreSQL database service. Valid values:
  • decoderbufs (default value)
  • wal2json
  • wal2json_rds
  • wal2json_streaming
  • wal2json_rds_streaming
  • pgoutput
Note If you use ApsaraDB RDS for PostgreSQL, you must enable the logical decoding feature that is supported by the wal2json plug-in. For more information, see Use the wal2json plug-in.
debezium.* The Debezium property parameters. No STRING The parameters are used to impose fine-grained control over the behavior of Debezium clients. For example, you can set debezium.snapshot.mode to never to ensure that the connector never reads snapshots. For more information, see Configure properties.
Note We recommend that you configure the debezium.slot.name parameter for each table to avoid the following error: PSQLException: ERROR: replication slot "debezium" is active for PID 974.

Data type mappings

The following table describes the data type mappings between PostgreSQL CDC and Flink fields.
Data type of PostgreSQL CDC Data type of Flink
SMALLINT SMALLINT
INT2
SMALLSERIAL
SERIAL2
INTEGER INT
SERIAL
BIGINT BIGINT
BIGSERIAL
BIGINT BIGINT
REAL FLOAT
FLOAT4
FLOAT8 DOUBLE
DOUBLE PRECISION
NUMERIC(p, s) DECIMAL(p, s)
DECIMAL(p, s)
BOOLEAN BOOLEAN
DATE DATE
TIME [(p)] [WITHOUT TIMEZONE] TIME [(p)] [WITHOUT TIMEZONE]
TIMESTAMP [(p)] [WITHOUT TIMEZONE] TIMESTAMP [(p)] [WITHOUT TIMEZONE]
CHAR(n) STRING
CHARACTER(n)
VARCHAR(n)
CHARACTER VARYING(n)
TEXT
BYTEA BYTES