All Products
Search
Document Center

PolarDB:Debezium connector compatible with PolarDB for PostgreSQL (Compatible with Oracle)

Last Updated:Apr 16, 2025

The Debezium connector specifically designed for PolarDB for PostgreSQL (Compatible with Oracle) is known as the Debezium PolarDBO connector. The Debezium PolarDBO connector captures row-level changes within a PolarDB for PostgreSQL (Compatible with Oracle) database, generates data change event records, and then streams them to Kafka topics. For information about the specific features and usage instructions, see Debezium connector for PostgreSQL.

PolarDB for PostgreSQL (Compatible with Oracle) and PostgreSQL Community Edition have minor differences in data types and built-in object handling. This topic describes how to build the Debezium PolarDBO connector based on the community edition of the Debezium PostgreSQL connector with minimal code modifications.

Build the Debezium PolarDBO connector

Important

The Debezium PolarDBO connector is developed based on the community edition of the Debezium PostgreSQL connector. The Debezium PolarDBO connector does not provide Service-Level Agreement (SLA) guarantees, regardless of whether you manually build the connector or use the JAR package provided in this topic.

Prerequisites

  • Configure the Java environment

    All versions of Debezium must run in Java 11 or later. Make sure that the Java 11 environment is set up before you build or run the Debezium PolarDBO connector.

  • Determine the Debezium version

    Determine the Debezium version based on the versions of Kafka or Kafka Connect and PolarDB for PostgreSQL (Compatible with Oracle) that you want to use. For information about version compatibilities, see Debezium Releases Overview.

    Note
    • For information about the source code and documentation related to Debezium, go to Debezium.

    • The following items describe the version compatibility between PolarDB for PostgreSQL (Compatible with Oracle) and PostgreSQL Community Edition:

      • PolarDB for PostgreSQL (Compatible with Oracle) 2.0 is compatible with PostgreSQL 14.

      • PolarDB for PostgreSQL (Compatible with Oracle) 1.0 is compatible with PostgreSQL 11.

  • Determine the pgJDBC version

    To determine the PgJDBC version, search for the version.postgresql.driver keyword in the pom.xml file of the corresponding Debezium version.

    Note

    For information about the source code and documentation related to pgJDBC, go to pgJDBC.

Procedure

Debezium Community Edition 2.6.2.Final supports Kafka Connect 2.x and 3.x and PostgreSQL versions 10 to 16.

The following procedure describes how to build the Debezium PolarDBO connector based on Debezium Community Edition 2.6.2.Final:

  1. Clone the code files of Debezium and pgJDBC of the corresponding versions.

    git clone -b v2.6.2.Final --depth=1 https://github.com/debezium/debezium.git
    git clone -b REL42.6.1 --depth=1 https://github.com/pgjdbc/pgjdbc.git
  2. Copy the required pgJDBC files to Debezium.

    mkdir -p debezium/debezium-connector-postgres/src/main/java/org/postgresql/core/v3       
    mkdir -p debezium/debezium-connector-postgres/src/main/java/org/postgresql/jdbc
    cp pgjdbc/pgjdbc/src/main/java/org/postgresql/core/v3/ConnectionFactoryImpl.java debezium/debezium-connector-postgres/src/main/java/org/postgresql/core/v3 
    cp pgjdbc/pgjdbc/src/main/java/org/postgresql/core/v3/QueryExecutorImpl.java debezium/debezium-connector-postgres/src/main/java/org/postgresql/core/v3 
    cp pgjdbc/pgjdbc/src/main/java/org/postgresql/jdbc/PgDatabaseMetaData.java debezium/debezium-connector-postgres/src/main/java/org/postgresql/jdbc
  3. Apply a patch file to the Debezium source code to add support for PolarDB for PostgreSQL (Compatible with Oracle).

    git apply v2.6.2.Final-support-polardbo-v1.patch
    Note
    • You can download the patch file from v2.6.2.Final-support-polardbo-v1.patch.

    • By default, the patch file adds dependencies to the JAR package, including debezium-api, debezium-core, pgJDBC, and protobuf-java. If you do not require these dependencies, you can remove the dependencies from the pom.xml file.

  4. Use Maven to build and package the Debezium PolarDBO connector.

    mvn clean package -pl :debezium-connector-postgres -DskipITs -Dquick
    # After you complete the packaging process, you can obtain the JAR package from the corresponding directory within the debezium-connector-postgres/ directory.

    You can perform the preceding steps to build the JAR package of the Debezium PolarDBO connector based on JDK 11. You can also directly download the package from debezium-connector-postgres-polardbo-v1.0-2.6.2.Final.jar.

Usage notes

The Debezium PolarDBO connector captures incremental changes of a PolarDB for PostgreSQL (Compatible with Oracle) database by using logical replication. The following conditions must be met:

  • Set the wal_level parameter to logical to ensure that the required information for logical replication is written to the write-ahead logging (WAL) records.

    Note

    You can configure the wal_level parameter in the PolarDB console. For more information, see the Procedure section of the "Configure cluster parameters" topic. The cluster restarts after you modify the parameter. Proceed with caution.

  • Execute the ALTER TABLE schema.table REPLICA IDENTITY FULL; statement to set the REPLICA IDENTITY parameter to FULL for each table to which you want to subscribe. This setting ensures that the previous values of the involved table columns are available to the logical decoding plug-in for update and delete operations.

    Note
    • REPLICA IDENTITY is a PostgreSQL-specific and table-level parameter that determines whether the previous values of the involved table columns are available to the logical decoding plug-in for update and delete operations. For more information about REPLICA IDENTITY values, see Replica identity.

    • If you set the REPLICA IDENTITY parameter to FULL for a table to which you want to subscribe, the table may be locked. This affects your business. To minimize disruption, plan this change and adjust business processes based on your business requirements. To check whether the REPLICA IDENTITY parameter for a table is set to FULL, execute the following statement:

      SELECT relreplident = 'f' FROM pg_class WHERE relname = 'tablename';
  • Make sure that the max_wal_senders and max_replication_slots parameter values are greater than the number of used replication slots and the number of slots required by Kafka jobs.

  • Make sure that you use a privileged account or a standard account that has LOGIN and REPLICATION permissions. The account must have SELECT permissions on the tables to which you want to subscribe for full data queries.

  • Connect to a PolarDB cluster by using the primary endpoint of the cluster. If you connect to the cluster by using the cluster endpoint, logical replication is not supported.

  • Set the connector.class parameter to io.debezium.connector.postgresql.PolarDBOConnector.

  • We recommend that you set the plugin.name parameter to pgoutput. This prevents data corruption or garbled text during the incremental parsing of non-UTF-8 encoded databases. For more information, see community documentation.

Examples

The following examples describe how to use the Debezium PolarDBO connector to synchronize the tables named t1 and t2 in the dbz_db database of the PolarDB for PostgreSQL Oracle syntax compatibility 2.0 cluster to a Kafka message queue.

Preparations

  1. Set up Kafka.

    1. Deploy a Kafka instance and make sure that the instance can be accessed from the Kafka Connect host. You can also use ApsaraMQ for Kafka. For more information, see Quick Start.

    2. Create a topic named pg_dbz_event in the Kafka instance to receive messages.

      Note

      To ensure reading convenience in a testing scenario, we recommend that you create a single-partition topic. In actual business scenarios, we recommend that you create a multi-partition topic.

  2. Start Kafka Connect locally in distributed mode on port 8083.

    • Copy the JAR package of the Debezium PolarDBO connector to the plugin.path directory of Kafka Connect. For more information, see the fourth step in the "Procedure" section in this topic.

      # ${plugin.path} Replace this with the specific path.
      mkdir ${plugin.path}/debezium-connector-polardbo
      cp debezium-connector-postgres-polardbo-v1.0-2.6.2.Final.jar ${plugin.path}/debezium-connector-polardbo
  3. Configure PolarDB for PostgreSQL (Compatible with Oracle).

    1. Purchase a PolarDB for PostgreSQL (Compatible with Oracle) 2.0 cluster on the PolarDB cluster purchase page.

    2. Configure the PolarDB cluster. Make sure that the cluster meets the prerequisites for using the Debezium PolarDBO connector. For more information, see Usage notes.

    3. Create a privileged account. For more information, see Create an account.

    4. Obtain the primary endpoint of the cluster. For more information, see View endpoints and ports. If the PolarDB cluster and Kafka Connect instance are in the same zone, use the private endpoint. Otherwise, apply for a public endpoint and add the endpoints of the Kafka Connect instance to a whitelist of the PolarDB cluster. For more information, see Configure a whitelist.

    5. Create a database named dbz_db in the PolarDB console. For more information, see Create a database.

    6. Execute the following SQL statements to create t1 and t2 tables in the dbz_db database and populate the tables with data.

      CREATE TABLE public.t1 (a int PRIMARY KEY, b text, c TIMESTAMP);
      ALTER TABLE public.t1 REPLICA IDENTITY FULL;
      INSERT INTO public.t1(a, b, c) VALUES(1, 'a', now());
      CREATE TABLE public.t2 (a int PRIMARY KEY, b text, c DATE);
      ALTER TABLE public.t2 REPLICA IDENTITY FULL;
      INSERT INTO public.t2(a, b, c) VALUES(1, 'a', now());

Testing

  1. Create a configuration file named config/postgresql-connector.json. For more information, see Connector configuration example.

    {
      "name": "dbz-polardb",
      "config": {
        "connector.class": "io.debezium.connector.postgresql.PolarDBOConnector",
        "database.hostname": "<yourHostname>", 
        "database.port": "<yourPort>", 
        "database.user": "<yourUserName>", 
        "database.password": "<yourPassWord>", 
        "database.dbname" : "dbz_db",
        "plugin.name": "pgoutput",
        "slot.name": "dbz_polardb",
        "table.include.list": "public.t1,public.t2",
        "topic.prefix": "polardb"
        "transforms": "Combine",
        "transforms.Combine.type": "io.debezium.transforms.ByLogicalTableRouter",
        "transforms.Combine.topic.regex": "(.*)",
        "transforms.Combine.topic.replacement": "pg_dbz_event"
      }
    }
    Note

    By default, a separate topic is created for each table. The preceding configurations aggregate the topics.

  2. Add the connector.

    curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" 'http://localhost:8083/connectors' -d @config/postgresql-connector.json

    After you add the connector, you can obtain full data from the pg_dbz_event topic in Kafka.

  3. Execute the following DML statements in the dbz_db database of the PolarDB cluster:

    INSERT INTO public.t1(a, b, c) VALUES(2, 'b', now());
    UPDATE public.t1 SET b = 'c' WHERE a = 1;
    DELETE FROM public.t1 WHERE a = 2;
    INSERT INTO public.t1(a, b, c) VALUES(4, 'd', now());
    
    INSERT INTO public.t2(a, b, c) VALUES(2, 'b', now());
    UPDATE public.t2 SET b = 'c' WHERE a = 1;
    DELETE FROM public.t2 WHERE a = 2;
    INSERT INTO public.t2(a, b, c) VALUES(4, 'd', now());

    Then, you can query the incremental data in the Kafka topic.