The Debezium connector specifically designed for PolarDB for PostgreSQL (Compatible with Oracle) is known as the Debezium PolarDBO connector. The Debezium PolarDBO connector captures row-level changes within a PolarDB for PostgreSQL (Compatible with Oracle) database, generates data change event records, and then streams them to Kafka topics. For information about the specific features and usage instructions, see Debezium connector for PostgreSQL.
PolarDB for PostgreSQL (Compatible with Oracle) and PostgreSQL Community Edition have minor differences in data types and built-in object handling. This topic describes how to build the Debezium PolarDBO connector based on the community edition of the Debezium PostgreSQL connector with minimal code modifications.
Build the Debezium PolarDBO connector
The Debezium PolarDBO connector is developed based on the community edition of the Debezium PostgreSQL connector. The Debezium PolarDBO connector does not provide Service-Level Agreement (SLA) guarantees, regardless of whether you manually build the connector or use the JAR package provided in this topic.
Prerequisites
Configure the Java environment
All versions of Debezium must run in Java 11 or later. Make sure that the Java 11 environment is set up before you build or run the Debezium PolarDBO connector.
Determine the Debezium version
Determine the Debezium version based on the versions of Kafka or Kafka Connect and PolarDB for PostgreSQL (Compatible with Oracle) that you want to use. For information about version compatibilities, see Debezium Releases Overview.
NoteFor information about the source code and documentation related to Debezium, go to Debezium.
The following items describe the version compatibility between PolarDB for PostgreSQL (Compatible with Oracle) and PostgreSQL Community Edition:
PolarDB for PostgreSQL (Compatible with Oracle) 2.0 is compatible with PostgreSQL 14.
PolarDB for PostgreSQL (Compatible with Oracle) 1.0 is compatible with PostgreSQL 11.
Determine the pgJDBC version
To determine the PgJDBC version, search for the
version.postgresql.driverkeyword in thepom.xmlfile of the corresponding Debezium version.NoteFor information about the source code and documentation related to pgJDBC, go to pgJDBC.
Procedure
Debezium Community Edition 2.6.2.Final supports Kafka Connect 2.x and 3.x and PostgreSQL versions 10 to 16.
The following procedure describes how to build the Debezium PolarDBO connector based on Debezium Community Edition 2.6.2.Final:
Clone the code files of Debezium and pgJDBC of the corresponding versions.
git clone -b v2.6.2.Final --depth=1 https://github.com/debezium/debezium.git git clone -b REL42.6.1 --depth=1 https://github.com/pgjdbc/pgjdbc.gitCopy the required pgJDBC files to Debezium.
mkdir -p debezium/debezium-connector-postgres/src/main/java/org/postgresql/core/v3 mkdir -p debezium/debezium-connector-postgres/src/main/java/org/postgresql/jdbc cp pgjdbc/pgjdbc/src/main/java/org/postgresql/core/v3/ConnectionFactoryImpl.java debezium/debezium-connector-postgres/src/main/java/org/postgresql/core/v3 cp pgjdbc/pgjdbc/src/main/java/org/postgresql/core/v3/QueryExecutorImpl.java debezium/debezium-connector-postgres/src/main/java/org/postgresql/core/v3 cp pgjdbc/pgjdbc/src/main/java/org/postgresql/jdbc/PgDatabaseMetaData.java debezium/debezium-connector-postgres/src/main/java/org/postgresql/jdbcApply a patch file to the Debezium source code to add support for PolarDB for PostgreSQL (Compatible with Oracle).
git apply v2.6.2.Final-support-polardbo-v1.patchNoteYou can download the patch file from v2.6.2.Final-support-polardbo-v1.patch.
By default, the patch file adds dependencies to the JAR package, including debezium-api, debezium-core, pgJDBC, and protobuf-java. If you do not require these dependencies, you can remove the dependencies from the pom.xml file.
Use Maven to build and package the Debezium PolarDBO connector.
mvn clean package -pl :debezium-connector-postgres -DskipITs -Dquick # After you complete the packaging process, you can obtain the JAR package from the corresponding directory within the debezium-connector-postgres/ directory.You can perform the preceding steps to build the JAR package of the Debezium PolarDBO connector based on JDK 11. You can also directly download the package from debezium-connector-postgres-polardbo-v1.0-2.6.2.Final.jar.
Usage notes
The Debezium PolarDBO connector captures incremental changes of a PolarDB for PostgreSQL (Compatible with Oracle) database by using logical replication. The following conditions must be met:
Set the wal_level parameter to logical to ensure that the required information for logical replication is written to the write-ahead logging (WAL) records.
NoteYou can configure the wal_level parameter in the PolarDB console. For more information, see the Procedure section of the "Configure cluster parameters" topic. The cluster restarts after you modify the parameter. Proceed with caution.
Execute the
ALTER TABLE schema.table REPLICA IDENTITY FULL;statement to set theREPLICA IDENTITYparameter toFULLfor each table to which you want to subscribe. This setting ensures that the previous values of the involved table columns are available to the logical decoding plug-in for update and delete operations.NoteREPLICA IDENTITY is a PostgreSQL-specific and table-level parameter that determines whether the previous values of the involved table columns are available to the logical decoding plug-in for update and delete operations. For more information about REPLICA IDENTITY values, see Replica identity.
If you set the REPLICA IDENTITY parameter to FULL for a table to which you want to subscribe, the table may be locked. This affects your business. To minimize disruption, plan this change and adjust business processes based on your business requirements. To check whether the
REPLICA IDENTITYparameter for a table is set to FULL, execute the following statement:SELECT relreplident = 'f' FROM pg_class WHERE relname = 'tablename';
Make sure that the max_wal_senders and max_replication_slots parameter values are greater than the number of used replication slots and the number of slots required by Kafka jobs.
Make sure that you use a privileged account or a standard account that has LOGIN and REPLICATION permissions. The account must have SELECT permissions on the tables to which you want to subscribe for full data queries.
Connect to a PolarDB cluster by using the primary endpoint of the cluster. If you connect to the cluster by using the cluster endpoint, logical replication is not supported.
Set the connector.class parameter to
io.debezium.connector.postgresql.PolarDBOConnector.We recommend that you set the
plugin.nameparameter topgoutput. This prevents data corruption or garbled text during the incremental parsing of non-UTF-8 encoded databases. For more information, see community documentation.
Examples
The following examples describe how to use the Debezium PolarDBO connector to synchronize the tables named t1 and t2 in the dbz_db database of the PolarDB for PostgreSQL Oracle syntax compatibility 2.0 cluster to a Kafka message queue.
Preparations
Set up Kafka.
Deploy a Kafka instance and make sure that the instance can be accessed from the Kafka Connect host. You can also use ApsaraMQ for Kafka. For more information, see Quick Start.
Create a topic named pg_dbz_event in the Kafka instance to receive messages.
NoteTo ensure reading convenience in a testing scenario, we recommend that you create a single-partition topic. In actual business scenarios, we recommend that you create a multi-partition topic.
Start Kafka Connect locally in distributed mode on port 8083.
Copy the JAR package of the Debezium PolarDBO connector to the
plugin.pathdirectory of Kafka Connect. For more information, see the fourth step in the "Procedure" section in this topic.# ${plugin.path} Replace this with the specific path. mkdir ${plugin.path}/debezium-connector-polardbo cp debezium-connector-postgres-polardbo-v1.0-2.6.2.Final.jar ${plugin.path}/debezium-connector-polardbo
Configure PolarDB for PostgreSQL (Compatible with Oracle).
Purchase a PolarDB for PostgreSQL (Compatible with Oracle) 2.0 cluster on the PolarDB cluster purchase page.
Configure the PolarDB cluster. Make sure that the cluster meets the prerequisites for using the Debezium PolarDBO connector. For more information, see Usage notes.
Create a privileged account. For more information, see Create an account.
Obtain the primary endpoint of the cluster. For more information, see View endpoints and ports. If the PolarDB cluster and Kafka Connect instance are in the same zone, use the private endpoint. Otherwise, apply for a public endpoint and add the endpoints of the Kafka Connect instance to a whitelist of the PolarDB cluster. For more information, see Configure a whitelist.
Create a database named dbz_db in the PolarDB console. For more information, see Create a database.
Execute the following SQL statements to create t1 and t2 tables in the dbz_db database and populate the tables with data.
CREATE TABLE public.t1 (a int PRIMARY KEY, b text, c TIMESTAMP); ALTER TABLE public.t1 REPLICA IDENTITY FULL; INSERT INTO public.t1(a, b, c) VALUES(1, 'a', now()); CREATE TABLE public.t2 (a int PRIMARY KEY, b text, c DATE); ALTER TABLE public.t2 REPLICA IDENTITY FULL; INSERT INTO public.t2(a, b, c) VALUES(1, 'a', now());
Testing
Create a configuration file named config/postgresql-connector.json. For more information, see Connector configuration example.
{ "name": "dbz-polardb", "config": { "connector.class": "io.debezium.connector.postgresql.PolarDBOConnector", "database.hostname": "<yourHostname>", "database.port": "<yourPort>", "database.user": "<yourUserName>", "database.password": "<yourPassWord>", "database.dbname" : "dbz_db", "plugin.name": "pgoutput", "slot.name": "dbz_polardb", "table.include.list": "public.t1,public.t2", "topic.prefix": "polardb" "transforms": "Combine", "transforms.Combine.type": "io.debezium.transforms.ByLogicalTableRouter", "transforms.Combine.topic.regex": "(.*)", "transforms.Combine.topic.replacement": "pg_dbz_event" } }NoteBy default, a separate topic is created for each table. The preceding configurations aggregate the topics.
Add the connector.
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" 'http://localhost:8083/connectors' -d @config/postgresql-connector.jsonAfter you add the connector, you can obtain full data from the pg_dbz_event topic in Kafka.
Execute the following DML statements in the dbz_db database of the PolarDB cluster:
INSERT INTO public.t1(a, b, c) VALUES(2, 'b', now()); UPDATE public.t1 SET b = 'c' WHERE a = 1; DELETE FROM public.t1 WHERE a = 2; INSERT INTO public.t1(a, b, c) VALUES(4, 'd', now()); INSERT INTO public.t2(a, b, c) VALUES(2, 'b', now()); UPDATE public.t2 SET b = 'c' WHERE a = 1; DELETE FROM public.t2 WHERE a = 2; INSERT INTO public.t2(a, b, c) VALUES(4, 'd', now());Then, you can query the incremental data in the Kafka topic.