All Products
Search
Document Center

Realtime Compute for Apache Flink:PostgreSQL CDC connector (public preview)

Last Updated:Aug 12, 2024

The PostgreSQL change data capture (CDC) connector is used to read existing data and changed data from a PostgreSQL database. The process is divided into two phases based on the data type: full scan phase and incremental capture phase. If a failure occurs, the connector recovers data in an exactly-once manner to ensure that no data is duplicated or lost. This topic describes how to use the PostgreSQL CDC connector.

Background information

The following table describes the capabilities of the PostgreSQL CDC connector.

Item

Description

Table type

Source table

Note

You can use the Java Database Connectivity (JDBC) connector to create a sink or dimension table.

Running mode

Streaming mode

Data format

N/A

Monitoring metrics

  • currentFetchEventTimeLag: the interval between data generation and data fetching to the source operator.

  • currentEmitEventTimeLag: the interval between data generation and data departure from the source operator.

  • sourceIdleTime: the duration from the most recent data generation time in the CDC source to the current time.

Note
  • The currentFetchEventTimeLag and currentEmitEventTimeLag metrics are valid only in the incremental capture phase. The metric values are 0 in the full scan phase.

  • For more information about the metrics, see Metrics.

API type

SQL

Data updates or deletion in a sink table

N/A (the connector supports only source tables)

Features

The PostgreSQL CDC supports the incremental snapshot reading feature in Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 8.0.6 or later. When the connector is run, it first initiates the full scan phase to read the snapshot of a PostgreSQL database. After the full scan is completed, the connector automatically switches to the incremental capture phase to read write-ahead log (WAL) files of the database. If a failure occurs, the connector recovers data in an exactly-once manner to prevent data duplication or loss.

You can use the connector to create a source table, which provides the following advantages:

  • Supports reading full and incremental data to facilitate unified stream and batch processing.

  • Supports parallel reading in the full scan phase, which improves reading performance.

  • Supports automatic and seamless switching from the full scan phase to the incremental capture phase to reduce resource consumption.

  • Supports data resumption in the full scan phase to enhance stability.

  • Supports lock-free reading in the full scan phase to ensure normal business operation.

Prerequisites

The PostgreSQL CDC connector uses the logical replication of PostgreSQL to read data changes from ApsaraDB RDS for PostgreSQL, Amazon RDS for PostgreSQL, and self-managed PostgreSQL databases.

Important

Related configurations may be different in ApsaraDB RDS for PostgreSQL, Amazon RDS for PostgreSQL, and self-managed PostgreSQL databases. Make sure that you complete the required configurations. For more information, see Configure a PostgreSQL database .

After you complete the related configurations, make sure that the following conditions are met:

  • The wal_level parameter is set to logical, which specifies that the information required for logical decoding is included in WAL files.

  • The REPLICA IDENTITY parameter of the subscribed table is set to FULL to ensure data consistency.

    Note

    REPLICA IDENTITY is a PostgreSQL-specific table-level parameter. The parameter specifies whether to include the previous values of the involved table columns in the INSERT and UPDATE events that are generated by the logical decoding plug-in. For more information, see REPLICA IDENTITY.

  • The max_wal_senders and max_replication_slots parameters are set to a value greater than the number of occupied replication slots in the monitored database and the number of required replication slots for the Flink deployment.

  • Your database account is assigned a superuser role or granted the LOGIN and REPLICATION permissions. Your database account is also granted the SELECT permission on the subscribed table for full data query.

Limits

  • The incremental snapshot reading feature of the PostgreSQL CDC connector requires Realtime Compute for Apache Flink that uses VVR 8.0.6 or later.

  • You must manually manage replication slots to avoid disk space waste.

    Replication slots are used to retain WAL segments. If a Flink job is restarted, the job can resume from the most recent checkpoint based on the WAL segments, thereby preventing data loss. If you confirm that a Flink job no longer requires a restart, we recommend that you manually delete the corresponding replication slots to free up the occupied resources. In addition, if the progress marker of a replication slot remains stagnant for a long period of time, PostgreSQL retains the WAL records after the marked position. The retention of unnecessary WAL records may result in a significant increase in disk space usage.

  • If you enable the incremental snapshot reading feature of the PostgreSQL CDC connector, you must enable checkpointing for the connector and define a primary key in the source table. When the incremental snapshot reading feature is enabled, multiple temporary replication slots are created to achieve parallel reading in the full scan phase.

    The number of required replication slots in the full scan phase is equal to Number of sources × Parallelism + 1. After the PostgreSQL CDC connector switches from the full scan phase to the incremental capture phase, the system automatically removes temporary replication slots and retains only one replication slot. If you want to limit the number of replication slots used in the full scan phase, you can reduce the parallelism. Take note that this also reduces the reading speed. If the downstream operator or storage supports idempotence, you can use the following configuration to skip log reading in the full scan phase: scan.incremental.snapshot.backfill.skip = true. In this case, the connector provides only at-least-once guarantees. If you use SQL to perform operations such as aggregation and association, we recommend that you do not skip log reading in the full scan phase.

  • If you disable the incremental snapshot reading feature, the PostgreSQL CDC connector cannot perform checkpoints during the full scan phase.

    If the feature is disabled and checkpoints are triggered during the full scan phase, a job failover may occur due to a checkpoint timeout. To prevent this issue, we recommend that you add the following configurations to the Other Configuration field in the Parameters section of the Configuration tab. For more information, see Reference

    execution.checkpointing.interval: 10min
    execution.checkpointing.tolerable-failed-checkpoints: 100
    restart-strategy: fixed-delay
    restart-strategy.fixed-delay.attempts: 2147483647

    The following table describes the parameters in the preceding configurations.

    Parameter

    Description

    Notes

    execution.checkpointing.interval

    The interval at which checkpoints are triggered.

    Data type: DURATION. Example: 10min and 30s.

    execution.checkpointing.tolerable-failed-checkpoints

    The maximum number of checkpoint failures that are allowed.

    The product of the value of this parameter and the checkpoint scheduling interval is the maximum duration of the full scan phase.

    Note

    If the scanned table is excessively large, we recommend that you specify a sufficiently large value.

    restart-strategy

    The restart policy.

    Valid values:

    • fixed-delay: The job is restarted with a fixed delay.

    • failure-rate: The job is restarted based on the failure rate.

    • exponential-delay: The job is restarted with an exponential delay.

    For more information, see Restart Strategies.

    restart-strategy.fixed-delay.attempts

    The maximum number of restart attempts when you set the restart-strategy parameter to fixed-delay.

    N/A

Syntax

CREATE TABLE postgrescdc_source (
  shipment_id INT,
  order_id INT,
  origin STRING,
  destination STRING,
  is_arrived BOOLEAN
) WITH (
  'connector' = 'postgres-cdc',
  'hostname' = '<yourHostname>',
  'port' = '5432',
  'username' = '<yourUserName>',
  'password' = '<yourPassWord>',
  'database-name' = '<yourDatabaseName>',
  'schema-name' = '<yourSchemaName>',
  'table-name' = '<yourTableName>'
);

Parameters in the WITH clause

Parameter

Description

Data type

Required

Default value

Notes

connector

The type of the connector.

STRING

Yes

N/A

Set the value to postgres-cdc.

hostname

The IP address or hostname of the PostgreSQL database.

STRING

Yes

N/A

N/A

username

The username that is used to access the PostgreSQL database service.

STRING

Yes

N/A

N/A

password

The password that is used to access the PostgreSQL database service.

STRING

Yes

N/A

N/A

database-name

The name of the PostgreSQL database.

STRING

Yes

N/A

If you want to read data from multiple databases, set this parameter to a regular expression.

schema-name

The schema name of the PostgreSQL database.

STRING

Yes

N/A

If you want to read data from multiple schemas, set this parameter to a regular expression.

table-name

The name of the table in the PostgreSQL database.

STRING

Yes

N/A

If you want to read data from multiple tables, set this parameter to a regular expression.

port

The port that is used to access the PostgreSQL database service.

INTEGER

No

5432

N/A

decoding.plugin.name

The name of the logical decoding plug-in that is installed in the PostgreSQL database service.

STRING

No

decoderbufs

Valid values:

  • decoderbufs

  • pgoutput

  • wal2json

  • wal2json_rds

  • wal2json_streaming

  • wal2json_rds_streaming

slot.name

The name of the logical decoding slot.

STRING

Yes (applies to VVR 8.0.1 or later)

flink (applies to VVR versions earlier than 8.0.1) or N/A (applies to VVR 8.0.1 or later)

We recommend that you configure the slot.name parameter for each table to prevent the following error: PSQLException: ERROR: replication slot "debezium" is active for PID 974.

debezium.*

The Debezium properties.

STRING

No

N/A

This parameter is used to achieve fine-grained control over the Debezium client. Example: 'debezium.snapshot.mode' = 'never', which specifies that the connector does not generate snapshots. For more information, see Connector properties.

scan.incremental.snapshot.enabled

Specifies whether to enable the incremental snapshot reading feature.

BOOLEAN

No

false

Valid values:

  • false (default): disables the incremental snapshot reading feature.

  • true: enables the incremental snapshot reading feature.

Note
  • This experimental feature is supported only in Realtime Compute for Apache Flink that uses VVR 8.0.6 or later.

  • For more information about the feature, see the "Features", "Prerequisites", and "Limits" sections of this topic.

scan.startup.mode

The startup mode that is used to consume data.

STRING

No

initial

Valid values:

  • initial (default): performs a full scan on the monitored database during the first startup and then continues to read the most recent WAL files.

  • latest-offset: skips the full scan on the monitored database during the first startup and reads from the end of the WAL files. In this case, only the most recent changes after the connector is started are recorded.

  • snapshot: performs a full scan on the monitored database, reads the generated WAL files during the full scan phase, and then stops reading.

changelog-mode

The changelog mode that is used to encode streaming changes.

String

No

all

Valid values:

  • ALL: encodes all types of changes, including INSERT, DELETE, UPDATE_BEFORE, and UPDATE_AFTER changes.

  • UPSERT: encodes only upsert-type changes, including INSERT, DELETE, and UPDATE_AFTER changes.

heartbeat.interval.ms

The interval at which heartbeat packets are sent.

Duration

No

30s

Unit: millisecond.

The PostgreSQL CDC connector sends heartbeat packets to the monitored database to synchronize the progress offset of replication slots. If table changes are not frequent, configure this parameter to remove unnecessary WAL files at the earliest opportunity.

scan.incremental.snapshot.chunk.key-column

The column that is used to split tables into chunks in the full scan phase.

STRING

No

N/A

By default, the first column of the primary key is used.

scan.incremental.close-idle-reader.enabled

Specifies whether to close the idle readers at the end of the full scan phase.

Boolean

No

false

This parameter is valid only if you set the execution.checkpointing.checkpoints-after-tasks-finish.enabled parameter to true.

scan.incremental.snapshot.backfill.skip

Specifies whether to skip log reading in the full scan phase.

Boolean

No

false

Valid values:

  • true: skips log reading.

    In this case, the connector reads logs from the low watermark in the incremental capture phase.

    If the downstream operator or storage supports idempotence, we recommend that you set this parameter to true. This reduces the number of required replication slots and provides at-least-one instead of exactly-once guarantees.

  • false: does not skip log reading.

    In this case, the connector reads the logs between the low watermark and high watermark to ensure consistency.

    If you use SQL to perform operations such as aggregation and association, we recommend that you do not skip log reading in the full scan phase.

Data type mappings

The following table describes the mapping between data types in PostgreSQL and Realtime Compute for Apache Flink.

Data type in PostgreSQL

Data type in Realtime Compute for Apache Flink

SMALLINT

SMALLINT

INT2

SMALLSERIAL

SERIAL2

INTEGER

INT

SERIAL

BIGINT

BIGINT

BIGSERIAL

REAL

FLOAT

FLOAT4

FLOAT8

DOUBLE

DOUBLE PRECISION

NUMERIC(p, s)

DECIMAL(p, s)

DECIMAL(p, s)

BOOLEAN

BOOLEAN

DATE

DATE

TIME [(p)] [WITHOUT TIMEZONE]

TIME [(p)] [WITHOUT TIMEZONE]

TIMESTAMP [(p)] [WITHOUT TIMEZONE]

TIMESTAMP [(p)] [WITHOUT TIMEZONE]

CHAR(n)

STRING

CHARACTER(n)

VARCHAR(n)

CHARACTER VARYING(n)

TEXT

BYTEA

BYTES

Sample code

CREATE TABLE source (
  id INT NOT NULL,
  name STRING,
  description STRING,
  weight DECIMAL(10,3)
) WITH (
  'connector' = 'postgres-cdc',
  'hostname' = '<host name>',
  'port' = '<port>',
  'username' = '<user name>',
  'password' = '<password>',
  'database-name' = '<database name>',
  'schema-name' = '<schema name>',
  'table-name' = '<table name>'
);

SELECT * FROM source;

References

  • For information about the connectors that are supported by Realtime Compute for Apache Flink, see Supported connectors.

  • For information about how to write data to a sink table in PolarDB for PostgreSQL (Compatible with Oracle), see PolarDB for Oracle 1.0 connector.

  • You can use the MySQL connector to read data from or write data to ApsaraDB RDS for MySQL, PolarDB for MySQL, and self-managed MySQL databases. For more information, see MySQL connector.