全部产品
Search
文档中心

PolarDB:Konektor Debezium yang kompatibel dengan PolarDB for PostgreSQL (Kompatibel dengan Oracle)

更新时间:Jul 06, 2025

Konektor Debezium yang dirancang khusus untuk PolarDB for PostgreSQL (Kompatibel dengan Oracle), dikenal sebagai Debezium PolarDBO connector, menangkap perubahan tingkat baris dalam database PolarDB for PostgreSQL (Kompatibel dengan Oracle). Konektor ini menghasilkan catatan peristiwa perubahan data dan mengalirkannya ke topik Kafka. Untuk informasi lebih lanjut tentang fitur spesifik dan petunjuk penggunaan, lihat Debezium connector for PostgreSQL.

PolarDB for PostgreSQL (Kompatibel dengan Oracle) dan PostgreSQL Community Edition memiliki beberapa perbedaan kecil dalam tipe data dan penanganan objek bawaan. Bagian ini menjelaskan cara membangun Debezium PolarDBO connector berdasarkan edisi komunitas dari Debezium PostgreSQL connector dengan modifikasi kode minimal.

Membangun Debezium PolarDBO connector

Penting

Debezium PolarDBO connector dikembangkan berdasarkan edisi komunitas dari Debezium PostgreSQL connector. Konektor ini tidak menyediakan jaminan Service-Level Agreement (SLA), baik dibangun secara manual maupun menggunakan paket JAR yang disediakan dalam dokumen ini.

Prasyarat

  • Konfigurasikan lingkungan Java

    Semua versi Debezium harus dijalankan pada Java 11 atau lebih baru. Pastikan lingkungan Java 11 telah disiapkan sebelum membangun atau menjalankan Debezium PolarDBO connector.

  • Tentukan versi Debezium

    Tentukan versi Debezium berdasarkan versi Kafka atau Kafka Connect dan PolarDB for PostgreSQL (Kompatibel dengan Oracle) yang ingin digunakan. Untuk informasi tentang kompatibilitas versi, lihat Ikhtisar Rilis Debezium.

    Catatan
    • Untuk informasi tentang kode sumber dan dokumentasi terkait Debezium, kunjungi Debezium.

    • Berikut adalah kompatibilitas versi antara PolarDB for PostgreSQL (Kompatibel dengan Oracle) dan PostgreSQL Community Edition:

      • PolarDB for PostgreSQL (Kompatibel dengan Oracle) 2.0 kompatibel dengan PostgreSQL 14.

      • PolarDB for PostgreSQL (Kompatibel dengan Oracle) 1.0 kompatibel dengan PostgreSQL 11.

  • Tentukan versi pgJDBC

    Untuk menentukan versi PgJDBC, cari kata kunci version.postgresql.driver dalam file pom.xml dari versi Debezium yang sesuai.

    Catatan

    Untuk informasi tentang kode sumber dan dokumentasi terkait pgJDBC, kunjungi pgJDBC.

Prosedur

Debezium Community Edition 2.6.2.Final mendukung Kafka Connect 2.x dan 3.x serta PostgreSQL versi 10 hingga 16.

Berikut adalah langkah-langkah untuk membangun Debezium PolarDBO connector berdasarkan Debezium Community Edition 2.6.2.Final:

  1. Kloning kode sumber Debezium dan pgJDBC dari versi yang sesuai.

    git clone -b v2.6.2.Final --depth=1 https://github.com/debezium/debezium.git
    git clone -b REL42.6.1 --depth=1 https://github.com/pgjdbc/pgjdbc.git
  2. Salin file pgJDBC yang diperlukan ke Debezium.

    mkdir -p debezium/debezium-connector-postgres/src/main/java/org/postgresql/core/v3       
    mkdir -p debezium/debezium-connector-postgres/src/main/java/org/postgresql/jdbc
    cp pgjdbc/pgjdbc/src/main/java/org/postgresql/core/v3/ConnectionFactoryImpl.java debezium/debezium-connector-postgres/src/main/java/org/postgresql/core/v3 
    cp pgjdbc/pgjdbc/src/main/java/org/postgresql/core/v3/QueryExecutorImpl.java debezium/debezium-connector-postgres/src/main/java/org/postgresql/core/v3 
    cp pgjdbc/pgjdbc/src/main/java/org/postgresql/jdbc/PgDatabaseMetaData.java debezium/debezium-connector-postgres/src/main/java/org/postgresql/jdbc
  3. Terapkan file patch ke kode sumber Debezium untuk menambahkan dukungan untuk PolarDB for PostgreSQL (Kompatibel dengan Oracle).

    git apply v2.6.2.Final-support-polardbo-v1.patch
    Catatan
    • Anda dapat mengunduh file patch dari v2.6.2.Final-support-polardbo-v1.patch.

    • Secara default, file patch menambahkan dependensi ke paket JAR, termasuk debezium-api, debezium-core, pgJDBC, dan protobuf-java. Jika Anda tidak memerlukan dependensi ini, Anda dapat menghapusnya dari file pom.xml.

  4. Gunakan Maven untuk membangun dan mengemas Debezium PolarDBO connector.

    mvn clean package -pl :debezium-connector-postgres -DskipITs -Dquick
    # Setelah Anda menyelesaikan proses pengemasan, Anda dapat memperoleh paket JAR dari direktori yang sesuai dalam direktori debezium-connector-postgres/.

    Anda dapat melakukan langkah-langkah di atas untuk membangun paket JAR dari Debezium PolarDBO connector berdasarkan JDK 11. Anda juga dapat langsung mengunduh paket dari debezium-connector-postgres-polardbo-v1.0-2.6.2.Final.jar.

Catatan Penggunaan

Debezium PolarDBO connector menangkap perubahan inkremental pada database PolarDB for PostgreSQL (Kompatibel dengan Oracle) menggunakan replikasi logis. Berikut adalah kondisi yang harus dipenuhi:

  • Atur parameter wal_level ke logis untuk memastikan bahwa informasi yang diperlukan untuk replikasi logis ditulis ke catatan write-ahead logging (WAL).

    Catatan

    Anda dapat mengonfigurasi parameter wal_level di konsol PolarDB. Untuk informasi lebih lanjut, lihat bagian Prosedur dari topik "Konfigurasi parameter kluster". Kluster akan restart setelah Anda memodifikasi parameter. Lanjutkan dengan hati-hati.

  • Eksekusi pernyataan ALTER TABLE schema.table REPLICA IDENTITY FULL; untuk mengatur parameter REPLICA IDENTITY ke FULL untuk setiap tabel yang ingin dilanggan. Pengaturan ini memastikan bahwa nilai sebelumnya dari kolom tabel yang terlibat tersedia untuk plug-in penguraian logis untuk operasi pembaruan dan penghapusan.

    Catatan
    • REPLICA IDENTITY adalah parameter spesifik PostgreSQL dan tingkat tabel yang menentukan apakah nilai sebelumnya dari kolom tabel yang terlibat tersedia untuk plug-in penguraian logis untuk operasi pembaruan dan penghapusan. Untuk informasi lebih lanjut tentang nilai REPLICA IDENTITY, lihat Identitas replika.

    • Jika Anda mengatur parameter REPLICA IDENTITY ke FULL untuk tabel yang ingin dilanggan, tabel tersebut mungkin terkunci. Ini memengaruhi bisnis Anda. Untuk meminimalkan gangguan, rencanakan perubahan ini dan sesuaikan proses bisnis berdasarkan persyaratan bisnis Anda. Untuk memeriksa apakah parameter REPLICA IDENTITY untuk tabel diatur ke FULL, eksekusi pernyataan berikut:

      SELECT relreplident = 'f' FROM pg_class WHERE relname = 'tablename';
  • Pastikan bahwa nilai parameter max_wal_senders dan max_replication_slots lebih besar dari jumlah slot replikasi yang digunakan dan jumlah slot yang dibutuhkan oleh pekerjaan Kafka.

  • Pastikan Anda menggunakan akun istimewa atau akun standar yang memiliki izin LOGIN dan REPLICATION. Akun tersebut harus memiliki izin SELECT pada tabel yang ingin dilanggan untuk kueri data penuh.

  • Sambungkan ke kluster PolarDB menggunakan titik akhir utama kluster. Jika Anda menyambungkan ke kluster menggunakan titik akhir kluster, replikasi logis tidak didukung.

  • Atur parameter connector.class ke io.debezium.connector.postgresql.PolarDBOConnector.

  • Kami sarankan Anda mengatur parameter plugin.name ke pgoutput. Ini mencegah korupsi data atau teks kacau selama penguraian inkremental basis data yang tidak dikodekan dalam UTF-8. Untuk informasi lebih lanjut, lihat dokumentasi komunitas.

Contoh

Contoh berikut menjelaskan cara menggunakan Debezium PolarDBO connector untuk menyinkronkan tabel bernama t1 dan t2 dalam database dbz_db dari kluster PolarDB for PostgreSQL Kompatibilitas sintaks Oracle 2.0 ke antrian pesan Kafka.

Persiapan

  1. Siapkan Kafka.

    1. Deploy instance Kafka dan pastikan instance tersebut dapat diakses dari host Kafka Connect. Anda juga dapat menggunakan ApsaraMQ for Kafka. Untuk informasi lebih lanjut, lihat Memulai Cepat.

    2. Buat topik bernama pg_dbz_event dalam instance Kafka untuk menerima pesan.

      Catatan

      Untuk memastikan kenyamanan membaca dalam skenario pengujian, kami sarankan Anda membuat topik satu-partisi. Dalam skenario bisnis aktual, kami sarankan Anda membuat topik multi-partisi.

  2. Jalankan Kafka Connect secara lokal dalam mode terdistribusi pada port 8083.

    • Salin paket JAR dari Debezium PolarDBO connector ke direktori plugin.path Kafka Connect. Untuk informasi lebih lanjut, lihat langkah keempat dalam bagian "Prosedur" di topik ini.

      # ${plugin.path} Ganti ini dengan jalur spesifik.
      mkdir ${plugin.path}/debezium-connector-polardbo
      cp debezium-connector-postgres-polardbo-v1.0-2.6.2.Final.jar ${plugin.path}/debezium-connector-polardbo
  3. Konfigurasikan PolarDB for PostgreSQL (Kompatibel dengan Oracle).

    1. Beli kluster PolarDB for PostgreSQL (Kompatibel dengan Oracle) 2.0 di halaman pembelian kluster PolarDB.

    2. Konfigurasikan kluster PolarDB. Pastikan kluster memenuhi prasyarat untuk menggunakan Debezium PolarDBO connector. Untuk informasi lebih lanjut, lihat Catatan Penggunaan.

    3. Buat akun istimewa. Untuk informasi lebih lanjut, lihat Buat akun.

    4. Dapatkan titik akhir utama kluster. Untuk informasi lebih lanjut, lihat Lihat titik akhir dan port. Jika kluster PolarDB dan instance Kafka Connect berada di zona yang sama, gunakan titik akhir privat. Jika tidak, ajukan permohonan untuk titik akhir publik dan tambahkan titik akhir instance Kafka Connect ke daftar putih kluster PolarDB. Untuk informasi lebih lanjut, lihat Konfigurasi daftar putih.

    5. Buat database bernama dbz_db di konsol PolarDB. Untuk informasi lebih lanjut, lihat Buat database.

    6. Eksekusi pernyataan SQL berikut untuk membuat tabel t1 dan t2 dalam database dbz_db dan isi tabel dengan data.

      CREATE TABLE public.t1 (a int PRIMARY KEY, b text, c TIMESTAMP);
      ALTER TABLE public.t1 REPLICA IDENTITY FULL;
      INSERT INTO public.t1(a, b, c) VALUES(1, 'a', now());
      CREATE TABLE public.t2 (a int PRIMARY KEY, b text, c DATE);
      ALTER TABLE public.t2 REPLICA IDENTITY FULL;
      INSERT INTO public.t2(a, b, c) VALUES(1, 'a', now());

Pengujian

  1. Buat file konfigurasi bernama config/postgresql-connector.json. Untuk informasi lebih lanjut, lihat Contoh konfigurasi konektor.

    {
      "name": "dbz-polardb",
      "config": {
        "connector.class": "io.debezium.connector.postgresql.PolarDBOConnector",
        "database.hostname": "<yourHostname>", 
        "database.port": "<yourPort>", 
        "database.user": "<yourUserName>", 
        "database.password": "<yourPassWord>", 
        "database.dbname" : "dbz_db",
        "plugin.name": "pgoutput",
        "slot.name": "dbz_polardb",
        "table.include.list": "public.t1,public.t2",
        "topic.prefix": "polardb"
        "transforms": "Combine",
        "transforms.Combine.type": "io.debezium.transforms.ByLogicalTableRouter",
        "transforms.Combine.topic.regex": "(.*)",
        "transforms.Combine.topic.replacement": "pg_dbz_event"
      }
    }
    Catatan

    Secara default, topik terpisah dibuat untuk setiap tabel. Konfigurasi di atas menggabungkan topik-topik tersebut.

  2. Tambahkan konektor.

    curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" 'http://localhost:8083/connectors' -d @config/postgresql-connector.json

    Setelah Anda menambahkan konektor, Anda dapat memperoleh data penuh dari topik pg_dbz_event di Kafka.

  3. Eksekusi pernyataan DML berikut dalam database dbz_db dari kluster PolarDB:

    INSERT INTO public.t1(a, b, c) VALUES(2, 'b', now());
    UPDATE public.t1 SET b = 'c' WHERE a = 1;
    DELETE FROM public.t1 WHERE a = 2;
    INSERT INTO public.t1(a, b, c) VALUES(4, 'd', now());
    
    INSERT INTO public.t2(a, b, c) VALUES(2, 'b', now());
    UPDATE public.t2 SET b = 'c' WHERE a = 1;
    DELETE FROM public.t2 WHERE a = 2;
    INSERT INTO public.t2(a, b, c) VALUES(4, 'd', now());

    Kemudian, Anda dapat menanyakan data inkremental dalam topik Kafka.