全部产品
Search
文档中心

PolarDB:PolarDBO Flink CDC

更新时间:Jul 06, 2025

Anda dapat menggunakan konektor PolarDBO Change Data Capture (CDC), yang kompatibel dengan PolarDB for PostgreSQL (Compatible with Oracle), untuk membaca secara berurutan snapshot penuh dan perubahan data dalam database PolarDB for PostgreSQL (Compatible with Oracle). Untuk informasi lebih lanjut tentang fitur dan penggunaan, lihat dokumentasi Postgres CDC komunitas.

PolarDB for PostgreSQL (Compatible with Oracle) hanya berbeda dari PostgreSQL komunitas dalam beberapa tipe data dan objek bawaan. Topik ini menjelaskan cara mengemas konektor PolarDBO Flink CDC yang mendukung PolarDB for PostgreSQL (Compatible with Oracle) dengan perubahan kode minimal berdasarkan Postgres CDC komunitas.

Catatan

PolarDB for PostgreSQL (Compatible with Oracle) menggunakan tipe DATE 64-bit, sedangkan PostgreSQL komunitas menggunakan tipe DATE 32-bit. Konektor PolarDBO Flink CDC kompatibel dengan data tipe DATE 64-bit.

Kemas konektor PolarDBO Flink CDC

Penting

Konektor PolarDBO Flink CDC dikembangkan berdasarkan Postgres CDC komunitas. Tidak ada jaminan SLA (Service Level Agreement) yang diberikan untuk konektor yang Anda kemas sendiri atau paket JAR yang disediakan dalam dokumen ini.

Prasyarat

  • Konfirmasi versi Flink CDC

    Jika Anda menggunakan Realtime Compute for Apache Flink, konfirmasikan versi Flink CDC komunitas yang kompatibel untuk versi Ververica Runtime (VVR) yang sesuai. Untuk informasi lebih lanjut, lihat Korespondensi Versi CDC dan VVR.

    Catatan

    Untuk repositori kode Flink CDC, kunjungi Flink CDC di GitHub.

  • Konfirmasikan versi Debezium.

    Dalam file pom.xml Flink CDC Anda, cari kata kunci debezium.version untuk menentukan versi Debezium.

    Catatan

    Untuk repositori kode Debezium, kunjungi Debezium di GitHub.

  • Konfirmasi versi PgJDBC

    Dalam file pom.xml Postgres CDC Anda, cari kata kunci org.postgresql untuk menentukan versi PgJDBC.

    Catatan
    • Untuk versi sebelum release-3.0, jalur file adalah: flink-connector-postgres-cdc/pom.xml.

    • Untuk versi release-3.0 dan seterusnya, jalur file adalah: flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/pom.xml.

    • Untuk repositori kode PgJDBC, kunjungi PgJDBC di GitHub.

Prosedur

Untuk release-3.1

Versi community Flink-CDC release-3.1 kompatibel dengan Realtime Compute for Apache Flink vvr-8.0.x-flink-1.17.

Untuk mengemas konektor PolarDBO Flink CDC versi yang sesuai, lakukan langkah-langkah berikut:

  1. Kloning repositori kode untuk Flink CDC, Debezium, dan PgJDBC, yang versinya sesuai dengan yang Anda konfirmasi.

    git clone -b release-3.1 --depth=1 https://github.com/apache/flink-cdc.git
    git clone -b REL42.5.1 --depth=1 https://github.com/pgjdbc/pgjdbc.git
    git clone -b v1.9.8.Final --depth=1 https://github.com/debezium/debezium.git
  2. Salin file yang diperlukan dari Debezium dan PgJDBC ke Flink CDC.

    mkdir -p flink-cdc/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/postgresql/core/v3
    mkdir -p flink-cdc/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/postgresql/jdbc
    cp pgjdbc/pgjdbc/src/main/java/org/postgresql/core/v3/ConnectionFactoryImpl.java flink-cdc/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/postgresql/core/v3
    cp pgjdbc/pgjdbc/src/main/java/org/postgresql/core/v3/QueryExecutorImpl.java flink-cdc/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/postgresql/core/v3
    cp pgjdbc/pgjdbc/src/main/java/org/postgresql/jdbc/PgDatabaseMetaData.java flink-cdc/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/postgresql/jdbc
    cp debezium/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/TypeRegistry.java flink-cdc/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql
  3. Terapkan file patch untuk PolarDB for PostgreSQL (Compatible with Oracle).

    git apply release-3.1_support_polardbo.patch
    Catatan

    File patch kompatibel PolarDBO Flink CDC berikut digunakan dalam contoh di atas: release-3.1_support_polardbo.patch.

  4. Kemas konektor PolarDBO Flink CDC menggunakan Maven.

    mvn clean install -DskipTests -Dcheckstyle.skip=true -Dspotless.check.skip -Drat.skip=true
    
    # Anda dapat menemukan paket jar di direktori target flink-sql-connector-postgres-cdc

File JAR konektor PolarDBO Flink CDC berikut dibuat berdasarkan JDK8: flink-sql-connector-postgres-cdc-3.1-SNAPSHOT.jar.

Untuk release-2.3

Versi community Flink-CDC release-2.3 kompatibel dengan Realtime Compute for Apache Flink dari vvr-4.0.15-flink-1.13 hingga vvr-6.0.2-flink-1.15.

Untuk mengemas konektor PolarDBO Flink CDC versi yang sesuai, lakukan langkah-langkah berikut:

  1. Kloning repositori kode untuk Flink CDC, Debezium, dan PgJDBC, yang versinya sesuai dengan yang Anda konfirmasi.

    git clone -b release-2.3 --depth=1 https://github.com/apache/flink-cdc.git
    git clone -b REL42.2.26 --depth=1 https://github.com/pgjdbc/pgjdbc.git
    git clone -b v1.6.4.Final --depth=1 https://github.com/debezium/debezium.git
  2. Salin file yang diperlukan dari Debezium dan PgJDBC ke Flink-CDC.

    mkdir -p flink-cdc/flink-connector-postgres-cdc/src/main/java/org/postgresql/core/v3
    mkdir -p flink-cdc/flink-connector-postgres-cdc/src/main/java/org/postgresql/jdbc
    mkdir -p flink-cdc/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql
    cp pgjdbc/pgjdbc/src/main/java/org/postgresql/core/v3/ConnectionFactoryImpl.java flink-cdc/flink-connector-postgres-cdc/src/main/java/org/postgresql/core/v3
    cp pgjdbc/pgjdbc/src/main/java/org/postgresql/core/v3/QueryExecutorImpl.java flink-cdc/flink-connector-postgres-cdc/src/main/java/org/postgresql/core/v3
    cp pgjdbc/pgjdbc/src/main/java/org/postgresql/jdbc/PgDatabaseMetaData.java flink-cdc/flink-connector-postgres-cdc/src/main/java/org/postgresql/jdbc
    cp debezium/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/TypeRegistry.java flink-cdc/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql
  3. Terapkan file patch untuk PolarDB for PostgreSQL (Compatible with Oracle).

    git apply release-2.3_support_polardbo.patch
    Catatan

    File patch kompatibel PolarDBO Flink CDC berikut digunakan dalam contoh di atas: release-2.3_support_polardbo.patch.

  4. Kemas konektor PolarDBO Flink CDC menggunakan Maven.

    mvn clean install -DskipTests -Dcheckstyle.skip=true -Dspotless.check.skip -Drat.skip=true
    
    # Anda dapat menemukan paket jar di direktori target flink-sql-connector-postgres-cdc

File JAR konektor PolarDBO Flink CDC berikut dibuat berdasarkan JDK8: flink-sql-connector-postgres-cdc-2.3-SNAPSHOT.jar.

Penggunaan

Sebelum menggunakan konektor PolarDBO Flink CDC untuk membaca perubahan data menggunakan fitur replikasi logis dari database PolarDB for PostgreSQL (Compatible with Oracle), pastikan persyaratan berikut terpenuhi:

  • Nilai parameter wal_level diatur ke logical, yang menentukan bahwa informasi yang diperlukan untuk replikasi logis ditulis ke write-ahead logging (WAL) logs.

    Catatan

    Anda dapat mengonfigurasi parameter wal_level di konsol PolarDB. Untuk informasi lebih lanjut, lihat Prosedur topik Konfigurasi parameter kluster. Kluster akan restart setelah Anda memodifikasi parameter ini. Lanjutkan dengan hati-hati.

  • Parameter REPLICA IDENTITY diatur ke FULL untuk tabel yang berlangganan dengan mengeksekusi pernyataan ALTER TABLE schema.table REPLICA IDENTITY FULL; untuk memastikan konsistensi data tabel selama replikasi. Nilai FULL menentukan bahwa acara untuk operasi INSERT dan UPDATE berisi nilai sebelumnya dari semua kolom dalam tabel.

    Catatan
    • REPLICA IDENTITY adalah pengaturan tingkat tabel spesifik PostgreSQL yang memeriksa apakah nilai sebelumnya dari kolom tabel yang terlibat tersedia untuk plug-in decoding logis saat acara UPDATE dan DELETE terjadi. Untuk informasi lebih lanjut tentang deskripsi nilai parameter REPLICA IDENTITY, lihat dokumentasi Debezium tentang REPLICA IDENTITY.

    • Untuk mengatur parameter REPLICA IDENTITY ke FULL untuk tabel yang berlangganan, Anda mungkin perlu mengunci tabel, yang dapat memengaruhi bisnis Anda. Lanjutkan dengan hati-hati. Anda dapat mengeksekusi pernyataan berikut untuk memeriksa apakah parameter diatur ke FULL:

      SELECT relreplident = 'f' FROM pg_class WHERE relname = 'tablename';
  • Nilai parameter max_wal_senders dan max_replication_slots lebih besar dari jumlah slot replikasi yang ditempati di database dan jumlah slot replikasi yang diperlukan untuk draft Realtime Compute for Apache Flink.

  • Salah satu akun berikut digunakan: Akun istimewa atau akun database yang memiliki izin LOGIN dan REPLICATION serta izin SELECT pada tabel yang berlangganan untuk kueri data penuh.

  • Konektor terhubung ke titik akhir utama kluster PolarDB. Titik akhir kluster tidak mendukung replikasi logis.

Perbedaan antara konektor Polardb Flink CDC dan Postgres CDC

Konektor PolarDBO Flink CDC dikembangkan dan dikemas berdasarkan Postgres CDC. Untuk informasi tentang sintaksis dan parameter konektor Postgres CDC, lihat dokumentasi Postgres CDC. Bagian berikut menjelaskan perbedaan utama antara kedua konektor tersebut:

  • Parameter konektor dalam klausa WITH konektor PolarDBO Flink CDC harus diatur ke polardbo-cdc.

  • PolarDBO Flink CDC kompatibel dengan PolarDB for PostgreSQL, PolarDB for PostgreSQL (Compatible with Oracle) 1.0 dan PolarDB for PostgreSQL (Compatible with Oracle) 2.0.

    Catatan

    Kami merekomendasikan Anda menggunakan konektor Postgres CDC untuk PolarDB for PostgreSQL.

  • Kolom tipe DATE dalam PolarDB for PostgreSQL (Compatible with Oracle) 1.0 dan 2.0 harus dipetakan ke tipe timestamp untuk tabel sumber dan sink dalam layanan Flink.

  • Kami merekomendasikan Anda mengatur parameter decoding.plugin.name ke pgoutput. Jika tidak, karakter tidak valid mungkin terjadi dalam penguraian inkremental untuk database yang tidak dikodekan UTF-8. Untuk informasi lebih lanjut, lihat dokumentasi komunitas.

Pemetaan tipe data

Tabel berikut menjelaskan pemetaan tipe data antara PolarDB for PostgreSQL dan Flink. Pemetaan tersebut sama dengan pemetaan antara PostgreSQL komunitas dan tipe data Flink, kecuali untuk tipe DATE.

Tipe data yang didukung oleh PolarDB for PostgreSQL

Tipe data yang didukung oleh Flink

SMALLINT

SMALLINT

INT2

SMALLSERIAL

SERIAL2

INTEGER

INT

SERIAL

BIGINT

BIGINT

BIGSERIAL

REAL

FLOAT

FLOAT4

FLOAT8

DOUBLE

DOUBLE PRECISION

NUMERIC(p, s)

DECIMAL(p, s)

DECIMAL(p, s)

BOOLEAN

BOOLEAN

DATE

  • PolarDB for PostgreSQL (Compatible with Oracle) 1.0: TIMESTAMP

  • PolarDB for PostgreSQL (Compatible with Oracle) 2.0: TIMESTAMP

  • PolarDB for PostgreSQL: DATE

TIME [(p)] [WITHOUT TIMEZONE]

TIME [(p)] [WITHOUT TIMEZONE]

TIMESTAMP [(p)] [WITHOUT TIMEZONE]

TIMESTAMP [(p)] [WITHOUT TIMEZONE]

CHAR(n)

STRING

CHARACTER(n)

VARCHAR(n)

CHARACTER VARYING(n)

TEXT

BYTEA

BYTES

Contoh

Contoh berikut menjelaskan cara menggunakan konektor PolarDBO Flink CDC untuk menyinkronkan data dalam tabel shipments database flink_source ke tabel shipments_sink database flink_sink dari kluster PolarDB for PostgreSQL (Compatible with Oracle) 2.0.

Catatan

Contoh berikut hanya berfungsi sebagai tes dasar untuk memastikan konektor PolarDBO Flink CDC yang dikemas bekerja pada kluster PolarDB for PostgreSQL (Compatible with Oracle). Untuk lingkungan produksi, lihat dokumentasi resmi Postgres CDC dan konfigurasikan parameter berdasarkan kebutuhan bisnis Anda.

  1. Buat persiapan.

    • Buat persiapan untuk kluster PolarDB for PostgreSQL (Compatible with Oracle).

      1. Di halaman pembelian PolarDB, beli kluster PolarDB for PostgreSQL (Compatible with Oracle) 2.0.

      2. Buat akun istimewa. Untuk informasi lebih lanjut, lihat bagian Buat akun topik Buat akun database.

      3. Ajukan endpoint utama untuk kluster. Untuk informasi lebih lanjut, lihat Lihat atau ajukan endpoint. Jika kluster PolarDB dan workspace Realtime Compute for Apache Flink berada di zona yang sama, Anda dapat menggunakan endpoint privat utama kluster. Jika tidak, Anda harus mengajukan endpoint publik. Tambahkan endpoint dari workspace Realtime Compute for Apache Flink ke daftar putih alamat IP kluster PolarDB. Untuk informasi lebih lanjut, lihat Konfigurasikan daftar putih untuk kluster.

      4. Buat database sumber bernama flink_source dan database tujuan bernama flink_sink di konsol PolarDB. Untuk informasi lebih lanjut, lihat Manajemen database.

      5. Eksekusi pernyataan berikut untuk membuat tabel bernama shipments dalam database flink_source dan menulis data ke tabel tersebut:

        CREATE TABLE public.shipments (
          shipment_id INT,
          order_id INT,
          origin TEXT,
          destination TEXT,
          is_arrived BOOLEAN,
          order_time DATE,
          PRIMARY KEY (shipment_id) 
        );
        ALTER TABLE public.shipments REPLICA IDENTITY FULL;
        INSERT INTO public.shipments SELECT 1, 1, 'test1', 'test1', false, now();
      6. Eksekusi pernyataan berikut untuk membuat tabel bernama shipments_sink dalam database flink_sink:

        CREATE TABLE public.shipments_sink (
           shipment_id INT,
           order_id INT,
           origin TEXT,
           destination TEXT,
           is_arrived BOOLEAN,
           order_time TIMESTAMP,
           PRIMARY KEY (shipment_id)
         );
    • Buat persiapan untuk workspace Realtime Compute for Apache Flink.

      1. Masuk ke konsol Realtime Compute for Apache Flink untuk membeli workspace Realtime Compute for Apache Flink. Untuk informasi lebih lanjut, lihat Aktifkan Realtime Compute for Apache Flink.

        Catatan

        Kami merekomendasikan Anda mengonfigurasi region dan virtual private cloud (VPC) yang sama untuk workspace Realtime Compute for Apache Flink seperti kluster PolarDB. Dalam hal ini, Anda dapat menggunakan endpoint privat utama kluster PolarDB sebagai endpoint dari workspace Realtime Compute for Apache Flink.

      2. Klik Buat Konektor Kustom. Di kotak dialog Buat konektor kustom, unggah paket PolarDBO Flink CDC di langkah Provide JAR. Di langkah Lihat Konektor, pilih debezium-json dari daftar drop-down Formats. Untuk informasi lebih lanjut, lihat Kelola konektor kustom.

        image

  2. Membuat draft Flink

    1. Masuk ke konsol Realtime Compute for Apache Flink dan buat draft SQL. Untuk informasi lebih lanjut, lihat Kembangkan draft SQL. Eksekusi pernyataan Flink SQL berikut untuk mengubah endpoint utama, nomor port, nama akun database, dan kata sandi akun database dari kluster PolarDB:

      Catatan

      PolarDB for PostgreSQL (Compatible with Oracle) menggunakan tipe DATE 64-bit. Flink SQL dan sebagian besar database lainnya menggunakan tipe DATE 32-bit. Oleh karena itu, kolom tipe DATE dalam tabel sumber harus dipetakan ke kolom tipe TIMESTAMP dalam tabel _source dan _sink Flink SQL. Jika tidak, draft mungkin gagal dengan kesalahan serupa dengan berikut ini: “java.time.DateTimeException: Invalid value for EpochDay (valid values -365243219162 - 365241780471): 1720891573000”.

      CREATE TEMPORARY TABLE shipments (
         shipment_id INT,
         order_id INT,
         origin STRING,
         destination STRING,
         is_arrived BOOLEAN,
         order_time TIMESTAMP,
         PRIMARY KEY (shipment_id) NOT ENFORCED
       ) WITH (
         'connector' = 'polardbo-cdc',
         'hostname' = '<yourHostname>',
         'port' = '<yourPort>',
         'username' = '<yourUserName>',
         'password' = '<yourPassWord>',
         'database-name' = 'flink_source',
         'schema-name' = 'public',
         'table-name' = 'shipments',
         'decoding.plugin.name' = 'pgoutput',
         'slot.name' = 'flink'
       );
      
      CREATE TEMPORARY TABLE shipments_sink (
         shipment_id INT,
         order_id INT,
         origin STRING,
         destination STRING,
         is_arrived BOOLEAN,
         order_time TIMESTAMP,
         PRIMARY KEY (shipment_id) NOT ENFORCED
       ) WITH (
        'connector' = 'jdbc',
        'url' = 'jdbc:postgresql://<yourHostname>:<yourPort>/flink_sink',
        'table-name' = 'shipments_sink',
        'username' = '<yourUserName>',
        'password' = '<yourPassWord>'
      );
      
      INSERT INTO shipments_sink SELECT * FROM shipments;
    2. Sebarkan dan mulai draft.

      image

      image

    3. Uji dan verifikasi hasil.

      • Setelah status draft berubah menjadi RUNNING, eksekusi pernyataan berikut untuk memverifikasi bahwa data dalam tabel shipments database flink_source disinkronkan ke tabel shipments_sink database flink_sink:

        SELECT * FROM public.shipments_sink;

        Hasil contoh:

         shipment_id | order_id | origin | destination | is_arrived |     order_time      
        -------------+----------+--------+-------------+------------+---------------------
                   1 |        1 | test1  | test1       | f          | 2024-09-18 05:45:08
        (1 row)
      • Eksekusi pernyataan DML berikut pada tabel shipments database flink_source. Perubahan data diharapkan disinkronkan secara real-time.

        INSERT INTO public.shipments SELECT 2, 2, 'test2', 'test2', false, now();
        UPDATE public.shipments SET is_arrived = true WHERE shipment_id = 1;
        DELETE FROM public.shipments WHERE shipment_id = 2;
        INSERT INTO public.shipments SELECT 3, 3, 'test3', 'test3', false, now();
        UPDATE public.shipments SET is_arrived = true WHERE shipment_id = 3;

        Eksekusi pernyataan berikut untuk memverifikasi bahwa perubahan data dalam tabel shipments disinkronkan ke tabel shipments_sink database flink_sink:

        SELECT * FROM public.shipments_sink;

        Hasil contoh:

         shipment_id | order_id | origin | destination | is_arrived |     order_time      
        -------------+----------+--------+-------------+------------+---------------------
                   1 |        1 | test1  | test1       | t          | 2024-09-18 05:45:08
                   3 |        3 | test3  | test3       | t          | 2024-09-18 07:33:23
        (2 rows)