The Debezium PolarDBO connector captures row-level changes in a PolarDB for PostgreSQL (Compatible with Oracle) database and streams them as data change events to Kafka topics. It is built on the community edition of the Debezium PostgreSQL connector, with targeted modifications to handle differences in data types and built-in object handling between PolarDB for PostgreSQL (Compatible with Oracle) and PostgreSQL Community Edition.
For general Debezium PostgreSQL connector features and configuration, see Debezium connector for PostgreSQL.
The Debezium PolarDBO connector is not covered by a Service-Level Agreement (SLA), whether you build it manually or use the pre-built JAR package provided in this topic.
How it works
The connector reads row-level changes from the PolarDB write-ahead logging (WAL) using logical replication.
Each change is serialized as a data change event.
Events are streamed to the configured Kafka topic.
All examples in this topic use Debezium Community Edition 2.6.2.Final, which supports Kafka Connect 2.x and 3.x and PostgreSQL versions 10 to 16.
Build the Debezium PolarDBO connector
You can build the connector from source or download the pre-built JAR package.
Prerequisites
Before you begin, make sure you have:
Java 11 or later — all Debezium versions require Java 11+
A Debezium version compatible with your Kafka Connect and PolarDB versions. For version compatibility, see Debezium Releases Overview.
PolarDB to PostgreSQL version mapping:
| PolarDB for PostgreSQL (Compatible with Oracle) | Compatible PostgreSQL version |
|---|---|
| 2.0 | 14 |
| 1.0 | 11 |
Finding the pgJDBC version:
Search for the version.postgresql.driver keyword in the pom.xml file of your target Debezium version. For pgJDBC source code and documentation, see pgJDBC on GitHub.
Build from source
The following steps build the connector based on Debezium Community Edition 2.6.2.Final.
Clone the Debezium and pgJDBC repositories at the required versions.
git clone -b v2.6.2.Final --depth=1 https://github.com/debezium/debezium.git git clone -b REL42.6.1 --depth=1 https://github.com/pgjdbc/pgjdbc.gitFor Debezium source code and documentation, see Debezium on GitHub.
Copy the required pgJDBC source files into the Debezium connector directory.
mkdir -p debezium/debezium-connector-postgres/src/main/java/org/postgresql/core/v3 mkdir -p debezium/debezium-connector-postgres/src/main/java/org/postgresql/jdbc cp pgjdbc/pgjdbc/src/main/java/org/postgresql/core/v3/ConnectionFactoryImpl.java \ debezium/debezium-connector-postgres/src/main/java/org/postgresql/core/v3 cp pgjdbc/pgjdbc/src/main/java/org/postgresql/core/v3/QueryExecutorImpl.java \ debezium/debezium-connector-postgres/src/main/java/org/postgresql/core/v3 cp pgjdbc/pgjdbc/src/main/java/org/postgresql/jdbc/PgDatabaseMetaData.java \ debezium/debezium-connector-postgres/src/main/java/org/postgresql/jdbcApply the PolarDB patch to the Debezium source code.
NoteThe patch adds the following dependencies to the JAR package by default: debezium-api, debezium-core, pgJDBC, and protobuf-java. To exclude any of these, remove them from
pom.xmlbefore building.git apply v2.6.2.Final-support-polardbo-v1.patchDownload the patch file from v2.6.2.Final-support-polardbo-v1.patch.
Build and package the connector with Maven.
mvn clean package -pl :debezium-connector-postgres -DskipITs -Dquick # The JAR package is saved to the debezium-connector-postgres/ directory after packaging completes.
Alternatively, download the pre-built JAR package directly: debezium-connector-postgres-polardbo-v1.0-2.6.2.Final.jar.
Prerequisites for using the connector
The connector captures incremental changes through logical replication. All of the following conditions must be met before you deploy the connector.
PolarDB-specific requirements: If you are already familiar with the community Debezium PostgreSQL connector, the following sections highlight the requirements that are specific to PolarDB for PostgreSQL (Compatible with Oracle). Pay particular attention to the primary endpoint requirement and the connector class name.
Set wal_level to logical
Changing wal_level requires a cluster restart. All active connections are dropped and must reconnect. Plan this change during a maintenance window.
Set the wal_level parameter to logical in the PolarDB console so that WAL records include the information required for logical replication. For steps, see the Procedure section of "Configure cluster parameters."
After the cluster restarts, verify the setting:
SHOW wal_level;Expected output:
wal_level
-----------
logicalSet REPLICA IDENTITY to FULL for each subscribed table
REPLICA IDENTITY is a PostgreSQL table-level parameter that controls how much column data is included in WAL records for UPDATE and DELETE operations:
| Value | Effect on UPDATE/DELETE events |
|---|---|
DEFAULT | Includes previous values for primary key columns only. If the table has no primary key, UPDATE and DELETE events are not emitted. |
NOTHING | No previous column values are included. |
FULL | All previous column values are included. Required for the Debezium PolarDBO connector. |
INDEX index-name | Includes previous values for columns in the specified index. |
Setting REPLICA IDENTITY to FULL may briefly lock the table. Plan this change based on your business requirements.
Run the following statement for each table you want to subscribe to:
ALTER TABLE schema.table REPLICA IDENTITY FULL;To verify the setting:
SELECT relreplident = 'f' FROM pg_class WHERE relname = 'tablename';Expected output: true.
Configure replication capacity parameters
Make sure the values of max_wal_senders and max_replication_slots exceed the total number of replication slots in use plus the number of slots required by your Kafka jobs.
Use a privileged account or a replication account
The database account used by the connector must be either:
A privileged account, or
A standard account with LOGIN and REPLICATION permissions, plus SELECT permission on each subscribed table.
Connect using the primary endpoint
Always connect to the PolarDB cluster through the primary endpoint, not the cluster endpoint. Logical replication is not supported on the cluster endpoint.
To find the primary endpoint, see View endpoints and ports.
Configure connector-specific parameters
The following two parameters differ from the community Debezium PostgreSQL connector and must be configured correctly:
| Parameter | Value | Notes |
|---|---|---|
connector.class | io.debezium.connector.postgresql.PolarDBOConnector (required) | Replaces the community connector class io.debezium.connector.postgresql.PostgresConnector |
plugin.name | pgoutput (recommended) | Prevents data corruption or garbled text when replicating non-UTF-8 encoded databases. See community documentation. |
Example: sync tables to Kafka
This example synchronizes the t1 and t2 tables from the dbz_db database in a PolarDB for PostgreSQL (Compatible with Oracle) 2.0 cluster to a Kafka topic named pg_dbz_event.
Set up Kafka and Kafka Connect
Deploy a Kafka instance accessible from the Kafka Connect host, or use ApsaraMQ for Kafka. For setup instructions, see Quick start.
Create a topic named
pg_dbz_eventin the Kafka instance.NoteUse a single-partition topic for testing. Use a multi-partition topic for production.
Start Kafka Connect locally in distributed mode on port 8083.
Copy the connector JAR package to the
plugin.pathdirectory of Kafka Connect.# Replace ${plugin.path} with the actual path. mkdir ${plugin.path}/debezium-connector-polardbo cp debezium-connector-postgres-polardbo-v1.0-2.6.2.Final.jar ${plugin.path}/debezium-connector-polardbo
Set up PolarDB
Purchase a PolarDB for PostgreSQL (Compatible with Oracle) 2.0 cluster on the PolarDB cluster purchase page.
Configure the cluster to meet all prerequisites listed in the Prerequisites for using the connector section.
Create a privileged account. For steps, see Create an account.
Obtain the primary endpoint. For steps, see View endpoints and ports. If the PolarDB cluster and Kafka Connect instance are in the same zone, use the private endpoint. Otherwise, apply for a public endpoint and add the Kafka Connect host IP addresses to the cluster whitelist. For steps, see Configure a whitelist.
Create a database named
dbz_dbin the PolarDB console. For steps, see Create a database.Create the test tables and insert initial data.
CREATE TABLE public.t1 (a int PRIMARY KEY, b text, c TIMESTAMP); ALTER TABLE public.t1 REPLICA IDENTITY FULL; INSERT INTO public.t1(a, b, c) VALUES(1, 'a', now()); CREATE TABLE public.t2 (a int PRIMARY KEY, b text, c DATE); ALTER TABLE public.t2 REPLICA IDENTITY FULL; INSERT INTO public.t2(a, b, c) VALUES(1, 'a', now());
Register the connector
Create the connector configuration file
config/postgresql-connector.json. TheByLogicalTableRoutertransform in this example aggregates all per-table topics into a singlepg_dbz_eventtopic. Without this transform, Debezium creates a separate topic for each table.{ "name": "dbz-polardb", "config": { "connector.class": "io.debezium.connector.postgresql.PolarDBOConnector", "database.hostname": "<yourHostname>", "database.port": "<yourPort>", "database.user": "<yourUserName>", "database.password": "<yourPassWord>", "database.dbname": "dbz_db", "plugin.name": "pgoutput", "slot.name": "dbz_polardb", "table.include.list": "public.t1,public.t2", "topic.prefix": "polardb", "transforms": "Combine", "transforms.Combine.type": "io.debezium.transforms.ByLogicalTableRouter", "transforms.Combine.topic.regex": "(.*)", "transforms.Combine.topic.replacement": "pg_dbz_event" } }Replace the following placeholders:
Placeholder Description <yourHostname>Primary endpoint of the PolarDB cluster <yourPort>Port of the PolarDB cluster <yourUserName>Database account with replication permissions <yourPassWord>Password for the database account For connector configuration reference, see Connector configuration example.
Register the connector with Kafka Connect.
curl -i -X POST \ -H "Accept:application/json" \ -H "Content-Type:application/json" \ 'http://localhost:8083/connectors' \ -d @config/postgresql-connector.jsonAfter the connector is registered, it performs an initial snapshot. The full data from
t1andt2becomes available in thepg_dbz_eventtopic.To verify that the connector is registered and running:
curl -s http://localhost:8083/connectors/dbz-polardb/statusExpected output includes
"state":"RUNNING"in the connector and task fields.
Verify incremental change capture
Run the following DML statements in the dbz_db database:
INSERT INTO public.t1(a, b, c) VALUES(2, 'b', now());
UPDATE public.t1 SET b = 'c' WHERE a = 1;
DELETE FROM public.t1 WHERE a = 2;
INSERT INTO public.t1(a, b, c) VALUES(4, 'd', now());
INSERT INTO public.t2(a, b, c) VALUES(2, 'b', now());
UPDATE public.t2 SET b = 'c' WHERE a = 1;
DELETE FROM public.t2 WHERE a = 2;
INSERT INTO public.t2(a, b, c) VALUES(4, 'd', now());Query the pg_dbz_event topic in Kafka to confirm that the incremental change events appear.