This topic provides the DDL syntax that is used to create a PostgreSQL Change Data Capture (CDC) source table, describes the parameters in the WITH clause, and provides data type mappings and sample code.

Notice PostgreSQL CDC source tables are in public preview. If you have high requirements for job stability, we recommend that you do not use PostgreSQL CDC source tables.

What is a PostgreSQL CDC source table?

A PostgreSQL CDC source table is a streaming source table of PostgreSQL. This table is used to read full snapshot data and change data in sequence from a PostgreSQL database. The exactly-once processing semantics is used to ensure data accuracy even if a failure occurs.
Note If you want to use a PostgreSQL CDC connector to read data from an open source PostgreSQL database, you must make sure that the database is of PostgreSQL 9.6 or later.

Prerequisites

A self-managed PostgreSQL database or an ApsaraDB RDS for PostgreSQL database is created, and tables are created in the database. For more information about how to create an ApsaraDB RDS for PostgreSQL database and create a table in the database, see Create a database and an account on an ApsaraDB RDS for PostgreSQL instance.

Limits

Only Flink that uses Ververica Runtime (VVR) 2.1.2 or later supports PostgreSQL CDC connectors.

DDL syntax

CREATE TABLE postgrescdc_source (
  shipment_id INT,
  order_id INT,
  origin STRING,
  destination STRING,
  is_arrived BOOLEAN
) WITH (
  'connector' = 'postgres-cdc',
  'hostname' = '<yourHostname>',
  'port' = '5432',
  'username' = '<yourUserName>',
  'password' = '<yourPassWord>',
  'database-name' = '<yourDatabaseName>',
  'schema-name' = '<yourSchemaName>',
  'table-name' = '<yourTableName>'
);

Parameters in the WITH clause

Parameter Description Required Data type Remarks
connector The type of the source table. Yes STRING Set the value to postgres-cdc.
hostname The IP address or hostname of the PostgreSQL database. Yes STRING N/A.
username The username that is used to access the PostgreSQL database service. Yes STRING N/A.
password The password that is used to access the PostgreSQL database service. Yes STRING N/A.
database-name The name of the PostgreSQL database. Yes STRING You can set this parameter to a regular expression to read data from multiple databases.
schema-name The schema name of the PostgreSQL database. Yes STRING You can set this parameter to a regular expression to read data from multiple schemas.
table-name The name of the PostgreSQL table. Yes STRING You can set this parameter to a regular expression to read data from multiple tables.
port The port that is used to access the PostgreSQL database service. No INTEGER Default value: 5432.
decoding.plugin.name The name of the PostgreSQL logical decoding plug-in. No STRING The plug-in name is determined based on the plug-in that is installed in the PostgreSQL database service. Valid values:
  • decoderbufs (default value)
  • wal2json
  • wal2json_rds
  • wal2json_streaming
  • wal2json_rds_streaming
  • pgoutput
Note If you use ApsaraDB RDS for PostgreSQL, you must enable the logical decoding feature that is supported by the wal2json plug-in. For more information, see Use the wal2json plug-in.
debezium.* The Debezium property parameters. No STRING The parameters are used to impose fine-grained control over the behavior of Debezium clients. For example, you can set debezium.snapshot.mode to never. For more information, see Configure properties.
Note We recommend that you set the debezium.slot.name parameter for each table to prevent the following error: PSQLException: ERROR: replication slot "debezium" is active for PID 974.

Data type mapping

The following table describes the data type mappings between PostgreSQL CDC and Flink fields.
Data type of PostgreSQL CDC Data type of Flink
SMALLINT SMALLINT
INT2
SMALLSERIAL
SERIAL2
INTEGER INT
SERIAL
BIGINT BIGINT
BIGSERIAL
BIGINT BIGINT
REAL FLOAT
FLOAT4
FLOAT8 DOUBLE
DOUBLE PRECISION
NUMERIC(p, s) DECIMAL(p, s)
DECIMAL(p, s)
BOOLEAN BOOLEAN
DATE DATE
TIME [(p)] [WITHOUT TIMEZONE] TIME [(p)] [WITHOUT TIMEZONE]
TIMESTAMP [(p)] [WITHOUT TIMEZONE] TIMESTAMP [(p)] [WITHOUT TIMEZONE]
CHAR(n) STRING
CHARACTER(n)
VARCHAR(n)
CHARACTER VARYING(n)
TEXT
BYTEA BYTES