All Products
Search
Document Center

PolarDB:PolarDBO Flink CDC

Last Updated:Oct 28, 2024

You can use the PolarDBO Change Data Capture (CDC) connector, which is compatible with PolarDB for PostgreSQL (Compatible with Oracle), to sequentially read the full snapshot data and data changes in PolarDB for PostgreSQL (Compatible with Oracle) databases. For more information about the features and usage, see community Postgres CDC documentation.

PolarDB for PostgreSQL (Compatible with Oracle) differs from community PostgreSQL only in a few data types and built-in objects. This topic describes how to package a PolarDBO Flink CDC connector that supports PolarDB for PostgreSQL (Compatible with Oracle) with minimal code changes based on the community Postgres CDC.

Note

PolarDB for PostgreSQL (Compatible with Oracle) uses the 64-bit DATE type, and community PostgreSQL uses the 32-bit DATE type. The PolarDBO Flink CDC connector is compatible with data of the 64-bit DATE type.

Package the PolarDBO Flink CDC connector

Important

The PolarDBO Flink CDC connector is developed based on the community Postgres CDC. No service-level agreement (SLA) guarantees are provided for the connector that you package yourself or the JAR package provided in this document.

Prerequisites

  • Confirm the Flink CDC version

    If you use Realtime Compute for Apache Flink, confirm the compatible community Flink CDC version for the corresponding Ververica Runtime (VVR) version. For more information, see CDC and VVR Version Correspondence.

    Note

    For the Flink CDC code repository, visit Flink CDC on GitHub.

  • Confirm the Debezium version

    In the pom.xml file of your Flink CDC, search the debezium.version keyword to determine the Debezium version.

    Note

    For the Debezium code repository, visit Debezium on GitHub.

  • Confirm the PgJDBC version

    In the pom.xml file of your Postgres CDC, search the org.postgresql keyword to determine the PgJDBC version.

    Note
    • For versions earlier than release-3.0, the file path is: flink-connector-postgres-cdc/pom.xml.

    • For versions release-3.0 and later, the file path is: flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/pom.xml.

    • For the PgJDBC code repository, visit PgJDBC on GitHub.

Procedure

For release-3.1

The community Flink-CDC release-3.1 version is compatible with Realtime Compute for Apache Flink vvr-8.0.x-flink-1.17.

To package the PolarDBO Flink CDC connector of the corresponding version, perform the following steps:

  1. Clone the code repositories for Flink CDC, Debezium, and PgJDBC, whose versions correspond to the ones that you confirm.

    git clone -b release-3.1 --depth=1 https://github.com/apache/flink-cdc.git
    git clone -b REL42.5.1 --depth=1 https://github.com/pgjdbc/pgjdbc.git
    git clone -b v1.9.8.Final --depth=1 https://github.com/debezium/debezium.git
  2. Copy necessary files from Debezium and PgJDBC to Flink CDC.

    mkdir -p flink-cdc/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/postgresql/core/v3
    mkdir -p flink-cdc/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/postgresql/jdbc
    cp pgjdbc/pgjdbc/src/main/java/org/postgresql/core/v3/ConnectionFactoryImpl.java flink-cdc/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/postgresql/core/v3
    cp pgjdbc/pgjdbc/src/main/java/org/postgresql/core/v3/QueryExecutorImpl.java flink-cdc/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/postgresql/core/v3
    cp pgjdbc/pgjdbc/src/main/java/org/postgresql/jdbc/PgDatabaseMetaData.java flink-cdc/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/postgresql/jdbc
    cp debezium/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/TypeRegistry.java flink-cdc/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql
  3. Apply the patch file for PolarDB for PostgreSQL (Compatible with Oracle).

    git apply release-3.1_support_polardbo.patch
    Note

    The following PolarDBO Flink CDC compatible patch file is used in the preceding example: release-3.1_support_polardbo.patch.

  4. Package the PolarDBO Flink CDC connector by using Maven.

    mvn clean install -DskipTests -Dcheckstyle.skip=true -Dspotless.check.skip -Drat.skip=true
    
    # You can find the jar package in the target directory of flink-sql-connector-postgres-cdc

The following JAR file of the PolarDBO Flink CDC connector is generated based on JDK8: flink-sql-connector-postgres-cdc-3.1-SNAPSHOT.jar.

For release-2.3

The community Flink-CDC release-2.3 version is compatible with Realtime Compute for Apache Flink from vvr-4.0.15-flink-1.13 to vvr-6.0.2-flink-1.15.

To package the PolarDBO Flink CDC connector of the corresponding version, perform the following steps:

  1. Clone the code repositories for Flink CDC, Debezium, and PgJDBC, whose versions correspond to the ones that you confirm.

    git clone -b release-2.3 --depth=1 https://github.com/apache/flink-cdc.git
    git clone -b REL42.2.26 --depth=1 https://github.com/pgjdbc/pgjdbc.git
    git clone -b v1.6.4.Final --depth=1 https://github.com/debezium/debezium.git
  2. Copy necessary files from Debezium and PgJDBC to Flink-CDC.

    mkdir -p flink-cdc/flink-connector-postgres-cdc/src/main/java/org/postgresql/core/v3
    mkdir -p flink-cdc/flink-connector-postgres-cdc/src/main/java/org/postgresql/jdbc
    mkdir -p flink-cdc/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql
    cp pgjdbc/pgjdbc/src/main/java/org/postgresql/core/v3/ConnectionFactoryImpl.java flink-cdc/flink-connector-postgres-cdc/src/main/java/org/postgresql/core/v3
    cp pgjdbc/pgjdbc/src/main/java/org/postgresql/core/v3/QueryExecutorImpl.java flink-cdc/flink-connector-postgres-cdc/src/main/java/org/postgresql/core/v3
    cp pgjdbc/pgjdbc/src/main/java/org/postgresql/jdbc/PgDatabaseMetaData.java flink-cdc/flink-connector-postgres-cdc/src/main/java/org/postgresql/jdbc
    cp debezium/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/TypeRegistry.java flink-cdc/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql
  3. Apply the patch file for PolarDB for PostgreSQL (Compatible with Oracle).

    git apply release-2.3_support_polardbo.patch
    Note

    The following PolarDBO Flink CDC compatible patch file is used in the preceding example: release-2.3_support_polardbo.patch.

  4. Package the PolarDBO Flink CDC connector by using Maven.

    mvn clean install -DskipTests -Dcheckstyle.skip=true -Dspotless.check.skip -Drat.skip=true
    
    # You can find the jar package in the target directory of flink-sql-connector-postgres-cdc

The following JAR file of the PolarDBO Flink CDC connector is generated based on JDK8: flink-sql-connector-postgres-cdc-2.3-SNAPSHOT.jar.

Usage

Before you use the PolarDBO Flink CDC connector to read data changes by using the logical replication feature of PolarDB for PostgreSQL (Compatible with Oracle) databases, make sure that the following requirements are met:

  • The value of the wal_level parameter is set to logical, which specifies that the information required for logical replication is written to write-ahead logging (WAL) logs.

    Note

    You can configure the wal_level parameter in the PolarDB console. For more information, see Procedure of the Configure cluster parameters topic. The cluster restarts after you modify this parameter. Proceed with caution.

  • The REPLICA IDENTITY parameter is set to FULL for a subscribed table by executing the ALTER TABLE schema.table REPLICA IDENTITY FULL; statement to ensure data consistency for the table during replication. A value of FULL specifies that events for INSERT and UPDATE operations contain the previous values of all columns in the table.

    Note
    • REPLICA IDENTITY is a PostgreSQL-specific table-level setting that checks whether the previous values of the involved table columns are available to the logical decoding plug-in when UPDATE and DELETE events occur. For more information about the descriptions for the values of the REPLICA IDENTITY parameter, see Debezium documentation on REPLICA IDENTITY.

    • To set the REPLICA IDENTITY parameter to FULL for the subscribed table, you may need to lock the table, which may affect your business. Proceed with caution. You can execute the following statement to check whether the parameter is set to FULL:

      SELECT relreplident = 'f' FROM pg_class WHERE relname = 'tablename';
  • The values of the max_wal_senders parameter and the max_replication_slots parameter are greater than the number of occupied replication slots in the database and the number of required replication slots for the Realtime Compute for Apache Flink draft.

  • One of the following accounts is used: A privileged account or a database account that has the LOGIN and REPLICATION permissions and the SELECT permission on the subscribed table for full data query.

  • The connector is connected to the primary endpoint of the PolarDB cluster. Cluster endpoints do not support logical replication.

Differences between Polardb Flink CDC connector and Postgres CDC

The PolarDBO Flink CDC connector is developed and packaged based on the Postgres CDC. For information about the syntax and parameters of the Postgres CDC connector, see Postgres CDC documentation. The following section describes the key differences between the two connectors:

  • The connector parameter in the WITH clause of the PolarDBO Flink CDC connector must be set to polardbo-cdc.

  • PolarDBO Flink CDC is compatible with PolarDB for PostgreSQL, PolarDB for PostgreSQL (Compatible with Oracle) 1.0 and PolarDB for PostgreSQL (Compatible with Oracle) 2.0.

    Note

    We recommend that you use the Postgres CDC connector for PolarDB for PostgreSQL.

  • Columns of the DATE type in PolarDB for PostgreSQL (Compatible with Oracle) 1.0 and 2.0 must be mapped to the timestamp type for source and sink tables in a Flink service.

  • We recommend that you set the decoding.plugin.name parameter to pgoutput. Otherwise, garbled characters may occur in incremental parsing for databases that are not UTF-8 encoded. For more information, see community documentation.

Data type mapping

The following table describes the data type mappings between PolarDB for PostgreSQL and Flink. The mappings are the same as those between community PostgreSQL and Flink data types, except for the DATE type.

Data type supported by PolarDB for PostgreSQL

Data type supported by Flink

SMALLINT

SMALLINT

INT2

SMALLSERIAL

SERIAL2

INTEGER

INT

SERIAL

BIGINT

BIGINT

BIGSERIAL

REAL

FLOAT

FLOAT4

FLOAT8

DOUBLE

DOUBLE PRECISION

NUMERIC(p, s)

DECIMAL(p, s)

DECIMAL(p, s)

BOOLEAN

BOOLEAN

DATE

  • PolarDB for PostgreSQL (Compatible with Oracle) 1.0: TIMESTAMP

  • PolarDB for PostgreSQL (Compatible with Oracle) 2.0: TIMESTAMP

  • PolarDB for PostgreSQL: DATE

TIME [(p)] [WITHOUT TIMEZONE]

TIME [(p)] [WITHOUT TIMEZONE]

TIMESTAMP [(p)] [WITHOUT TIMEZONE]

TIMESTAMP [(p)] [WITHOUT TIMEZONE]

CHAR(n)

STRING

CHARACTER(n)

VARCHAR(n)

CHARACTER VARYING(n)

TEXT

BYTEA

BYTES

Examples

The following example describes how to use the PolarDBO Flink CDC connector to synchronize data in the shipments table of the flink_source database to the shipments_sink table of the flink_sink database of the PolarDB for PostgreSQL (Compatible with Oracle) 2.0 cluster.

Note

The following example serves only as a basic test to ensure that the packaged PolarDBO Flink CDC connector works on a PolarDB for PostgreSQL (Compatible with Oracle) cluster. For production environments, refer to the official Postgres CDC documentation and configure the parameters based on your business requirements.

  1. Make preparations.

    • Make preparations for a PolarDB for PostgreSQL (Compatible with Oracle) cluster.

      1. On the PolarDB buy page, purchase a PolarDB for PostgreSQL (Compatible with Oracle) 2.0 cluster.

      2. Create a privileged account. For more information, see the Create an account section of the Create a database account topic.

      3. Apply for a primary endpoint for the cluster. For more information, see View or apply for an endpoint. If the PolarDB cluster and the Realtime Compute for Apache Flink workspace reside in the same zone, you can use a private endpoint of the cluster. Otherwise, you must apply for a public endpoint. Add the endpoint of the Realtime Compute for Apache Flink workspace to an IP address whitelist of the PolarDB cluster. For more information, see Configure a whitelist for a cluster.

      4. Create a source database named flink_source and a destination database named flink_sink in the PolarDB console. For more information, see Database management.

      5. Execute the following statements to create a table named shipments in the flink_source database and write data to the table:

        CREATE TABLE public.shipments (
          shipment_id INT,
          order_id INT,
          origin TEXT,
          destination TEXT,
          is_arrived BOOLEAN,
          order_time DATE,
          PRIMARY KEY (shipment_id) 
        );
        ALTER TABLE public.shipments REPLICA IDENTITY FULL;
        INSERT INTO public.shipments SELECT 1, 1, 'test1', 'test1', false, now();
      6. Execute the following statement to create a table named shipments_sink in the flink_sink database:

        CREATE TABLE public.shipments_sink (
           shipment_id INT,
           order_id INT,
           origin TEXT,
           destination TEXT,
           is_arrived BOOLEAN,
           order_time TIMESTAMP,
           PRIMARY KEY (shipment_id)
         );
    • Make preparations for a Realtime Compute for Apache Flink workspace.

      1. Log on to the Realtime Compute for Apache Flink console to purchase a Realtime Compute for Apache Flink workspace. For more information, see Activate Realtime Compute for Apache Flink.

        Note

        We recommend that you configure the same region and virtual private cloud (VPC) for the Realtime Compute for Apache Flink workspace as the PolarDB cluster. In this case, you can use the primary private endpoint of the PolarDB cluster as the endpoint of the Realtime Compute for Apache Flink workspace.

      2. Click Create Custom Connector. In the Create custom connector dialog box, upload the PolarDBO Flink CDC package in the Provide JAR step. In the View Connector step, select debezium-json from the Formats drop-down list. For more information, see Manage custom connectors.

        image

  2. Creating a Flink draft

    1. Log on to the Realtime Compute for Apache Flink console and create an SQL draft. For more information, see Develop an SQL draft. Execute the following Flink SQL statements to change the primary endpoint, port number, database account name, and database account password of the PolarDB cluster:

      Note

      PolarDB for PostgreSQL (Compatible with Oracle) uses the 64-bit DATE type. Flink SQL and most other databases use the 32-bit DATE type. Therefore, columns of the DATE type in the source table must be mapped to columns of the TIMESTAMP type in both the _source and _sink tables of Flink SQL. Otherwise, drafts may fail with errors similar to the following one: “java.time.DateTimeException: Invalid value for EpochDay (valid values -365243219162 - 365241780471): 1720891573000”.

      CREATE TEMPORARY TABLE shipments (
         shipment_id INT,
         order_id INT,
         origin STRING,
         destination STRING,
         is_arrived BOOLEAN,
         order_time TIMESTAMP,
         PRIMARY KEY (shipment_id) NOT ENFORCED
       ) WITH (
         'connector' = 'polardbo-cdc',
         'hostname' = '<yourHostname>',
         'port' = '<yourPort>',
         'username' = '<yourUserName>',
         'password' = '<yourPassWord>',
         'database-name' = 'flink_source',
         'schema-name' = 'public',
         'table-name' = 'shipments',
         'decoding.plugin.name' = 'pgoutput',
         'slot.name' = 'flink'
       );
      
      CREATE TEMPORARY TABLE shipments_sink (
         shipment_id INT,
         order_id INT,
         origin STRING,
         destination STRING,
         is_arrived BOOLEAN,
         order_time TIMESTAMP,
         PRIMARY KEY (shipment_id) NOT ENFORCED
       ) WITH (
        'connector' = 'jdbc',
        'url' = 'jdbc:postgresql://<yourHostname>:<yourPort>/flink_sink',
        'table-name' = 'shipments_sink',
        'username' = '<yourUserName>',
        'password' = '<yourPassWord>'
      );
      
      INSERT INTO shipments_sink SELECT * FROM shipments;
    2. Deploy and start the draft.

      image

      image

    3. Test and verify the results.

      • After the status of the draft changes to RUNNING, execute the following statement to verify that the data in the shipments table of the flink_source database is synchronized to the shipments_sink table of the flink_sink database:

        SELECT * FROM public.shipments_sink;

        Sample result:

         shipment_id | order_id | origin | destination | is_arrived |     order_time      
        -------------+----------+--------+-------------+------------+---------------------
                   1 |        1 | test1  | test1       | f          | 2024-09-18 05:45:08
        (1 row)
      • Execute the following DML statements on the shipments table of the flink_source database. The data changes are expected to be synchronized in real time.

        INSERT INTO public.shipments SELECT 2, 2, 'test2', 'test2', false, now();
        UPDATE public.shipments SET is_arrived = true WHERE shipment_id = 1;
        DELETE FROM public.shipments WHERE shipment_id = 2;
        INSERT INTO public.shipments SELECT 3, 3, 'test3', 'test3', false, now();
        UPDATE public.shipments SET is_arrived = true WHERE shipment_id = 3;

        Execute the following statement to verify that the data changes in the shipments table are synchronized to the shipments_sink table of the flink_sink database:

        SELECT * FROM public.shipments_sink;

        Sample result:

         shipment_id | order_id | origin | destination | is_arrived |     order_time      
        -------------+----------+--------+-------------+------------+---------------------
                   1 |        1 | test1  | test1       | t          | 2024-09-18 05:45:08
                   3 |        3 | test3  | test3       | t          | 2024-09-18 07:33:23
        (2 rows)