All Products
Search
Document Center

Realtime Compute for Apache Flink:PostgreSQL CDC connector (public preview)

Last Updated:Feb 19, 2024

The PostgreSQL Change Data Capture (CDC) connector is used to read full savepoint data and change data in sequence from a PostgreSQL database. The exactly-once processing semantics is used to ensure data accuracy even if a failure occurs. This topic describes how to use the Postgres CDC connector.

Background information

The following table describes the capabilities supported by the PostgreSQL CDC connector.

Item

Description

Table type

Source table.

Note

You can use the Java Database Connectivity (JDBC) connector for a result table or a dimension table.

Running mode

Streaming mode.

Data format

N/A.

Metric

  • currentFetchEventTimeLag: the interval from the time when data is generated to the time when the data is pulled to the source operator.

    The value of this metric is greater than 0 only during incremental data reading. The value of this metric is fixed to 0 during full data reading.

  • currentEmitEventTimeLag: the interval from the time when data is generated to the time when the data leaves the source operator.

    The value of this metric is greater than 0 only during incremental data reading. The value of this metric is fixed to 0 during full data reading.

  • sourceIdleTime: the interval from the time when the latest data is generated in the source to the current time.

Note

For more information about the metrics and how to view the metrics, see Report metrics of fully managed Flink to other platforms.

API type

SQL API.

Data update or deletion in a result table

N/A.

Prerequisites

When you use the Postgres CDC connector to read CDC change streams based on the logical replication feature of PostgreSQL databases, make sure that the following conditions are met:

  • The wal_level parameter is set to logical. This indicates that information required for logical coding is added to write-ahead logging (WAL) logs.

  • The REPLICA IDENTITY parameter of the change tracking table is set to FULL by using the ALTER TABLE schema.table REPLICA IDENTITY FULL; command to ensure data consistency.

  • The value of the max_wal_senders parameter is greater than the number of replication slots used in the database and the value of the max_replication_slots parameter is greater than the number of slots required by your fully managed Flink deployment.

  • Your Alibaba Cloud account is a superuser or is granted the LOGIN and REPLICATION permissions. Your Alibaba Cloud account is also granted the SELECT permission on the subscription table for full data query.

Note

The configurations related to the preceding conditions in the ApsaraDB RDS for PostgreSQL, Amazon RDS for PostgreSQL, and self-managed PostgreSQL databases may be different. For more information, see Configure a PostgreSQL database.

Limits

  • Only Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 2.0 or later supports the PostgreSQL CDC connector.

  • The PostgreSQL CDC connector does not support checkpoints during a full table scan.

    If checkpoints are triggered in your deployment during a full table scan, a deployment failover may occur due to a checkpoint timeout. To prevent this issue, we recommend that you add the following configurations to the Other Configuration field in the Parameters section of the Configuration tab. For more information, see How do I configure parameters for deployment running?

    execution.checkpointing.interval: 10min
    execution.checkpointing.tolerable-failed-checkpoints: 100
    restart-strategy: fixed-delay
    restart-strategy.fixed-delay.attempts: 2147483647

    The following table describes the parameters in the preceding configurations.

    Parameter

    Description

    Remarks

    execution.checkpointing.interval

    The interval at which checkpoints are triggered.

    The value of this parameter is a duration, such as 10min or 30s.

    execution.checkpointing.tolerable-failed-checkpoints

    The maximum number of checkpoint failures that are allowed.

    The product of the value of this parameter and the value of the execution.checkpointing.interval parameter is the period of time during which savepoints can be read.

    Note

    If the table that needs to be scanned is excessively large, we recommend that you set this parameter to a large value.

    restart-strategy

    The restart policy.

    Valid values:

    • fixed-delay: The deployment is restarted at a fixed delay.

    • failure-rate: The deployment is restarted based on the failure rate.

    • exponential-delay: The deployment is restarted with an exponential delay.

    For more information, see Restart Strategies.

    restart-strategy.fixed-delay.attempts

    The maximum number of restart attempts when the restart-strategy parameter is set to fixed-delay.

    N/A.

Syntax

CREATE TABLE postgrescdc_source (
  shipment_id INT,
  order_id INT,
  origin STRING,
  destination STRING,
  is_arrived BOOLEAN
) WITH (
  'connector' = 'postgres-cdc',
  'hostname' = '<yourHostname>',
  'port' = '5432',
  'username' = '<yourUserName>',
  'password' = '<yourPassWord>',
  'database-name' = '<yourDatabaseName>',
  'schema-name' = '<yourSchemaName>',
  'table-name' = '<yourTableName>'
);

Parameters in the WITH clause

Parameter

Description

Data type

Required

Default value

Remarks

connector

The type of the table.

STRING

Yes

No default value

Set the value to postgres-cdc.

hostname

The IP address or hostname of the PostgreSQL database.

STRING

Yes

No default value

N/A.

username

The username that is used to access the PostgreSQL database service.

STRING

Yes

No default value

N/A.

password

The password that is used to access the PostgreSQL database service.

STRING

Yes

No default value

N/A.

database-name

The name of the database.

STRING

Yes

No default value

If you want to read data from multiple databases, you can set this parameter to a regular expression.

schema-name

The schema name of the PostgreSQL database.

STRING

Yes

No default value

If you want to read data from multiple schemas, you can set this parameter to a regular expression.

table-name

The name of the PostgreSQL table.

STRING

Yes

No default value

If you want to read data from multiple tables, you can set this parameter to a regular expression.

port

The port that is used to access the PostgreSQL database service.

INTEGER

No

5432

N/A.

decoding.plugin.name

The name of the PostgreSQL logical decoding plug-in.

STRING

No

decoderbufs

The plug-in name is determined based on the plug-in that is installed in the PostgreSQL database service. Valid values:

  • decoderbufs (default value)

  • wal2json

  • wal2json_rds

  • wal2json_streaming

  • wal2json_rds_streaming

  • pgoutput

Note

If you use ApsaraDB RDS for PostgreSQL, you must enable the logical decoding feature that is supported by the wal2json plug-in. For more information, see Use the wal2json extension.

debezium.*

The Debezium property parameters.

STRING

No

No default value

The parameters are used to impose fine-grained control over the behavior of Debezium clients. For example, you can set the debezium.snapshot.mode parameter to never to ensure that the connector never reads savepoints. For more information, see Configure properties.

slot.name

The name of the logical decoding slot.

STRING

For Realtime Compute for Apache Flink that uses VVR of a version earlier than 8.0.1, this parameter is not required. For Realtime Compute for Apache Flink that uses VVR 8.0.1 or later, this parameter is required.

For Realtime Compute for Apache Flink that uses VVR of a version earlier than 8.0.1, the default value is flink. For Realtime Compute for Apache Flink that uses VVR 8.0.1 or later, this parameter does not have a default value.

We recommend that you configure the slot.name parameter for each table to avoid the following error: PSQLException: ERROR: replication slot "debezium" is active for PID 974.

Data type mappings

The following table describes the data type mappings between PostgreSQL CDC and Flink fields.

Data type of Postgres CDC

Data type of Flink

SMALLINT

SMALLINT

INT2

SMALLSERIAL

SERIAL2

INTEGER

INT

SERIAL

BIGINT

BIGINT

BIGSERIAL

REAL

FLOAT

FLOAT4

FLOAT8

DOUBLE

DOUBLE PRECISION

NUMERIC(p, s)

DECIMAL(p, s)

DECIMAL(p, s)

BOOLEAN

BOOLEAN

DATE

DATE

TIME [(p)] [WITHOUT TIMEZONE]

TIME [(p)] [WITHOUT TIMEZONE]

TIMESTAMP [(p)] [WITHOUT TIMEZONE]

TIMESTAMP [(p)] [WITHOUT TIMEZONE]

CHAR(n)

STRING

CHARACTER(n)

VARCHAR(n)

CHARACTER VARYING(n)

TEXT

BYTEA

BYTES

Sample code

CREATE TABLE source (
  id INT NOT NULL,
  name STRING,
  description STRING,
  weight DECIMAL(10,3)
) WITH (
  'connector' = 'postgres-cdc',
  'hostname' = '<host name>',
  'port' = '<port>',
  'username' = '<user name>',
  'password' = '<password>',
  'database-name' = '<database name>',
  'schema-name' = '<schema name>',
  'table-name' = '<table name>'
);

SELECT * FROM source;

References

  • For more information about the connectors that are supported by Realtime Compute for Apache Flink, see Supported connectors.

  • For more information about how to write data to a PolarDB for Oracle 1.0 result table, see PolarDB for Oracle 1.0 connector.

  • You can use the MySQL connector to read data from or write data to ApsaraDB RDS for MySQL, PolarDB for MySQL, and self-managed MySQL databases. For more information, see MySQL connector.