All Products
Search
Document Center

PolarDB:Flink CDC kompatibel dengan PolarDB untuk PostgreSQL (Kompatibel dengan Oracle)

Last Updated:Apr 21, 2026

Konektor Flink CDC yang kompatibel dengan (PolarDBO Flink CDC) membaca snapshot lengkap dan data perubahan dari database . Untuk fitur dan penggunaan spesifik, lihat dokumentasi komunitas Postgres CDC.

Karena hanya berbeda dari PostgreSQL komunitas dalam beberapa tipe data dan penanganan objek bawaan, topik ini menjelaskan cara mengadaptasi dan mengemas konektor PolarDBO Flink CDC yang mendukung berdasarkan Postgres CDC komunitas dengan perubahan kode minimal.

Catatan

Tipe DATE di berukuran 64-bit, sedangkan tipe DATE di PostgreSQL komunitas berukuran 32-bit. Oleh karena itu, PolarDBO Flink CDC menyesuaikan penanganan data bertipe DATE.

Paketkan konektor PolarDBO Flink CDC

Penting

Konektor PolarDBO Flink CDC dikembangkan berdasarkan Postgres CDC komunitas. Alibaba Cloud tidak memberikan jaminan Service-Level Agreement (SLA) untuk konektor ini, baik Anda mengemasnya sendiri maupun menggunakan paket JAR yang disediakan dalam topik ini.

Prasyarat

  • Tentukan versi Flink-CDC.

    Jika Anda menggunakan Realtime Compute for Apache Flink Alibaba Cloud, tentukan versi Flink-CDC komunitas yang kompatibel dengan versi Ververica Runtime (VVR) yang sesuai. Untuk detailnya, lihat Pemetaan versi CDC dan VVR.

    Catatan

    Untuk repositori kode Flink-CDC, lihat Flink-CDC.

  • Tentukan versi Debezium.

    Dalam file pom.xml dari versi Flink-CDC yang sesuai, cari kata kunci debezium.version untuk menentukan versi Debezium.

    Catatan

    Untuk repositori kode Debezium, lihat Debezium.

  • Tentukan versi PgJDBC.

    Dalam file pom.xml dari versi Postgres-CDC yang sesuai, cari kata kunci org.postgresql untuk menentukan versi PgJDBC.

    Catatan
    • Untuk versi sebelum release-3.0, jalur file adalah: flink-connector-postgres-cdc/pom.xml.

    • Untuk versi release-3.0 dan yang lebih baru, jalur file adalah: flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/pom.xml.

    • Untuk repositori kode PgJDBC, lihat PgJDBC.

Prosedur

Paket rilis-3.5

Rilis Community Flink-CDC-3.5 kompatibel dengan vvr-11.4-jdk11-flink-1.20 dari Alibaba Cloud Realtime Compute for Apache Flink.

Untuk mengemas versi konektor PolarDB Flink CDC yang sesuai, ikuti langkah-langkah berikut:

  1. Klon kode sumber Flink-CDC, Debezium, dan PgJDBC pada versi yang ditentukan.

    git clone -b release-3.5 --depth=1 https://github.com/apache/flink-cdc.git
    git clone -b REL42.7.3 --depth=1 https://github.com/pgjdbc/pgjdbc.git
    git clone -b v1.9.8.Final --depth=1 https://github.com/debezium/debezium.git
  2. Salin file-file tertentu dari Debezium dan PgJDBC ke dalam Flink-CDC.

    mkdir -p flink-cdc/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/postgresql/core/v3
    mkdir -p flink-cdc/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/postgresql/jdbc
    cp pgjdbc/pgjdbc/src/main/java/org/postgresql/core/v3/ConnectionFactoryImpl.java flink-cdc/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/postgresql/core/v3
    cp pgjdbc/pgjdbc/src/main/java/org/postgresql/core/v3/QueryExecutorImpl.java flink-cdc/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/postgresql/core/v3
    cp pgjdbc/pgjdbc/src/main/java/org/postgresql/jdbc/PgDatabaseMetaData.java flink-cdc/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/postgresql/jdbc
    cp pgjdbc/pgjdbc/src/main/java/org/postgresql/core/Oid.java flink-cdc/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/postgresql/core
    cp debezium/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/TypeRegistry.java flink-cdc/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql
  3. Masuk ke direktori Flink-CDC. Terapkan perbaikan bug konversi timestamp dan perbaikan bug perilaku implisit (SELECT *). Perbaikan ini akan digabungkan ke rilis komunitas 3.6.

    cd flink-cdc
    
    # Terapkan perbaikan bug konversi timestamp. Akan digabungkan ke rilis komunitas 3.6.
    git fetch origin 2f32836a783f80f295c9dce339c11afec2a32dc2
    git cherry-pick 2f32836a783f80f295c9dce339c11afec2a32dc2
    
    git fetch origin 0d86de24494a855c2d83f9b1052c2e888e182cb1
    git cherry-pick 0d86de24494a855c2d83f9b1052c2e888e182cb1
  4. Terapkan file patch yang menambahkan dukungan untuk .

    git apply release-3.5_support_polardbo.patch
    Catatan

    File patch yang digunakan di atas adalah: release-3.5_support_polardbo.patch.

  5. Gunakan Maven untuk mengemas konektor PolarDB untuk PostgreSQL Flink CDC.

    mvn clean install -DskipTests -Dcheckstyle.skip=true -Dspotless.check.skip 
    
    # Setelah pengemasan selesai, temukan file JAR di flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/target.

Mengikuti prosedur di atas, bangun paket JAR untuk konektor PolarDBO Flink CDC menggunakan JDK 11: flink-cdc-pipeline-connector-polardbo-3.5-SNAPSHOT-20260212.jar.

Paket rilis-3.1

Flink-CDC komunitas release-3.1 kompatibel dengan vvr-8.0.x-flink-1.17 dari Realtime Compute for Apache Flink Alibaba Cloud.

Untuk mengemas versi konektor PolarDB Flink CDC yang sesuai, ikuti langkah-langkah berikut:

  1. Klon file kode Flink-CDC, Debezium, dan PgJDBC.

    git clone -b release-3.1 --depth=1 https://github.com/apache/flink-cdc.git
    git clone -b REL42.5.1 --depth=1 https://github.com/pgjdbc/pgjdbc.git
    git clone -b v1.9.8.Final --depth=1 https://github.com/debezium/debezium.git
  2. Salin file-file tertentu dari Debezium dan PgJDBC ke Flink-CDC.

    mkdir -p flink-cdc/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/postgresql/core/v3
    mkdir -p flink-cdc/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/postgresql/jdbc
    cp pgjdbc/pgjdbc/src/main/java/org/postgresql/core/v3/ConnectionFactoryImpl.java flink-cdc/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/postgresql/core/v3
    cp pgjdbc/pgjdbc/src/main/java/org/postgresql/core/v3/QueryExecutorImpl.java flink-cdc/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/postgresql/core/v3
    cp pgjdbc/pgjdbc/src/main/java/org/postgresql/jdbc/PgDatabaseMetaData.java flink-cdc/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/postgresql/jdbc
    cp debezium/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/TypeRegistry.java flink-cdc/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql
  3. Terapkan file patch untuk mendukung .

    git apply release-3.1_support_polardbo.patch
    Catatan

    File patch untuk kompatibilitas PolarDBO Flink CDC adalah release-3.1_support_polardbo.patch.

  4. Kemas konektor PolarDBO Flink CDC menggunakan Maven.

    mvn clean install -DskipTests -Dcheckstyle.skip=true -Dspotless.check.skip -Drat.skip=true
    
    # Setelah pengemasan selesai, temukan paket JAR di folder target flink-sql-connector-postgres-cdc

Ikuti proses yang dijelaskan di atas untuk mengemas file JAR konektor PolarDB Flink CDC menggunakan JDK 8: flink-sql-connector-postgres-cdc-3.1-SNAPSHOT.jar.

Paket rilis-2.3

Flink-CDC komunitas release-2.3 kompatibel dengan vvr-4.0.15-flink-1.13 hingga vvr-6.0.2-flink-1.15 dari Realtime Compute for Apache Flink Alibaba Cloud.

Untuk mengemas versi konektor PolarDB-O Flink CDC yang sesuai, ikuti langkah-langkah berikut:

  1. Klon kode sumber Flink-CDC, Debezium, dan PgJDBC pada versi yang sesuai.

    git clone -b release-2.3 --depth=1 https://github.com/apache/flink-cdc.git
    git clone -b REL42.2.26 --depth=1 https://github.com/pgjdbc/pgjdbc.git
    git clone -b v1.6.4.Final --depth=1 https://github.com/debezium/debezium.git
  2. Salin file-file tertentu dari Debezium dan PgJDBC ke dalam Flink-CDC.

    mkdir -p flink-cdc/flink-connector-postgres-cdc/src/main/java/org/postgresql/core/v3
    mkdir -p flink-cdc/flink-connector-postgres-cdc/src/main/java/org/postgresql/jdbc
    mkdir -p flink-cdc/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql
    cp pgjdbc/pgjdbc/src/main/java/org/postgresql/core/v3/ConnectionFactoryImpl.java flink-cdc/flink-connector-postgres-cdc/src/main/java/org/postgresql/core/v3
    cp pgjdbc/pgjdbc/src/main/java/org/postgresql/core/v3/QueryExecutorImpl.java flink-cdc/flink-connector-postgres-cdc/src/main/java/org/postgresql/core/v3
    cp pgjdbc/pgjdbc/src/main/java/org/postgresql/jdbc/PgDatabaseMetaData.java flink-cdc/flink-connector-postgres-cdc/src/main/java/org/postgresql/jdbc
    cp debezium/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/TypeRegistry.java flink-cdc/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql
  3. Terapkan file patch untuk kompatibilitas dengan .

    git apply release-2.3_support_polardbo.patch
    Catatan

    File patch yang digunakan di atas adalah: release-2.3_support_polardbo.patch.

  4. Kemas konektor PolarDB untuk PostgreSQL Flink CDC menggunakan Maven.

    mvn clean install -DskipTests -Dcheckstyle.skip=true -Dspotless.check.skip -Drat.skip=true
    
    # Setelah pengemasan selesai, temukan file JAR di folder target flink-sql-connector-postgres-cdc

Ikuti langkah-langkah di atas untuk mengemas JAR konektor PolarDB for PostgreSQL Flink CDC menggunakan JDK 8: flink-sql-connector-postgres-cdc-2.3-SNAPSHOT.jar.

Petunjuk penggunaan

Konektor PolarDBO Flink CDC membaca data aliran perubahan menggunakan replikasi logis dari database . Konektor ini memerlukan kondisi berikut:

  • Atur parameter wal_level ke logical. Ini menambahkan informasi yang diperlukan untuk mendukung replikasi logis dalam write-ahead logging (WAL).

    Catatan

    Anda dapat mengatur parameter wal_level di Konsol. Untuk operasi detail, lihat Setel parameter kluster. Kluster akan dimulai ulang setelah memodifikasi parameter ini. Rencanakan operasi bisnis Anda sebelum mengubah parameter, dan lakukan dengan hati-hati.

  • Jalankan perintah ALTER TABLE schema.table REPLICA IDENTITY FULL; untuk mengatur REPLICA IDENTITY tabel yang dilangganan ke FULL. Ini memastikan konsistensi sinkronisasi data untuk tabel tersebut, karena event insert dan update yang dipancarkan mencakup nilai lama dari semua kolom dalam tabel.

    Catatan
    • REPLICA IDENTITY adalah pengaturan unik tingkat tabel di PostgreSQL. Pengaturan ini menentukan apakah plugin decoding logis menyertakan nilai lama kolom tabel yang terpengaruh selama event INSERT dan UPDATE. Untuk detail makna nilai-nilainya, lihat REPLICA IDENTITY.

    • Mengatur REPLICA IDENTITY tabel yang dilangganan ke FULL mungkin mengunci tabel, yang dapat memengaruhi operasi bisnis. Rencanakan operasi bisnis Anda sebelum mengubah parameter. Periksa apakah konfigurasi saat ini sudah FULL menggunakan perintah berikut:

      SELECT relreplident = 'f' FROM pg_class WHERE relname = 'tablename';
  • Pastikan nilai kedua parameter `max_wal_senders` dan `max_replication_slots` lebih besar dari jumlah slot replikasi database yang sedang digunakan ditambah jumlah slot yang dibutuhkan oleh pekerjaan Flink.

  • Gunakan akun istimewa atau akun dengan izin LOGIN dan REPLICATION, serta izin SELECT pada tabel yang dilangganan untuk kueri data lengkap.

  • Hanya terhubung ke titik akhir utama kluster PolarDB. Titik akhir kluster tidak mendukung replikasi logis.

  • Versi release-3.5 dan yang lebih baru mendukung sinkronisasi langsung tabel induk untuk tabel partisi. Konfigurasikan sebagai berikut. Untuk operasi spesifik, lihat dokumentasi komunitas Postgres CDC.

    • Atur scan.include-partitioned-tables.enabled ke true.

    • Buat secara manual PUBLICATION di database dengan opsi publish_via_partition_root=true. Selain itu, tentukan table-name menggunakan parameter debezium.publication.name.

    • table-name hanya dapat menentukan tabel induk. Ekspresi reguler tidak boleh mencocokkan tabel anak; jika tidak, data lengkap akan diduplikasi.

    Selain itu, versi release-3.5 dan yang lebih baru mendukung konektor Pipeline, yang memungkinkan pembacaan data snapshot dan data inkremental, menyediakan kemampuan sinkronisasi data end-to-end untuk seluruh database. Namun, perhatikan bahwa konektor Pipeline saat ini tidak mendukung sinkronisasi perubahan skema tabel. Untuk detailnya, lihat dokumentasi komunitas konektor Pipeline Postgres CDC.

Konektor PolarDBO Flink CDC vs. Postgres CDC

Konektor PolarDBO Flink CDC dikemas berdasarkan Postgres CDC. Untuk sintaks dan parameter spesifik, lihat Postgres CDC. Namun, terdapat perbedaan utama berikut:

  • Parameter konektor dalam WITH harus diatur ke bidang statis: polardbo-cdc.

  • PolarDBO Flink CDC kompatibel dengan semua versi PolarDB untuk PostgreSQL, 1.0, dan 2.0.

    Catatan

    Jika Anda menggunakan PolarDB untuk PostgreSQL, kami merekomendasikan untuk langsung menggunakan Postgres CDC komunitas.

  • Untuk kolom bertipe DATE di 1.0 dan 2.0, tipe yang sesuai untuk tabel sumber dan sink di Flink SQL harus ditentukan sebagai TIMESTAMP.

  • Atur parameter decoding.plugin.name ke pgoutput. Jika tidak, penguraian inkremental mungkin menghasilkan karakter acak untuk database yang tidak menggunakan encoding UTF-8. Untuk pengenalan detail, lihat dokumentasi komunitas.

Pemetaan tipe

Pemetaan tipe bidang antara PolarDB PostgreSQL dan Flink identik dengan versi komunitas PostgreSQL, kecuali untuk tipe DATE. Pemetaan spesifiknya adalah sebagai berikut:

PolarDB untuk PostgreSQL tipe bidang

Tipe bidang Flink

SMALLINT

SMALLINT

INT2

SMALLSERIAL

SERIAL2

INTEGER

INT

SERIAL

BIGINT

BIGINT

BIGSERIAL

REAL

FLOAT

FLOAT4

FLOAT8

DOUBLE

DOUBLE PRECISION

NUMERIC(p, s)

DECIMAL(p, s)

DECIMAL(p, s)

BOOLEAN

BOOLEAN

DATE

  • 1.0: TIMESTAMP

  • 2.0: TIMESTAMP

  • PolarDB untuk PostgreSQL: DATE

TIME [(p)] [WITHOUT TIMEZONE]

TIME [(p)] [WITHOUT TIMEZONE]

TIMESTAMP [(p)] [WITHOUT TIMEZONE]

TIMESTAMP [(p)] [WITHOUT TIMEZONE]

CHAR(n)

STRING

CHARACTER(n)

VARCHAR(n)

CHARACTER VARYING(n)

TEXT

BYTEA

BYTES

Contoh penggunaan

Konektor sumber

Contoh berikut mengilustrasikan cara menyinkronkan tabel `shipments` dari database `flink_source` di kluster 2.0 ke tabel `shipments_sink` di database `flink_sink` menggunakan PolarDBO Flink CDC.

Catatan

Contoh ini hanya memverifikasi bahwa PolarDBO Flink CDC yang telah dikemas dapat berjalan di . Untuk penggunaan produksi, konfigurasikan parameter sesuai kebutuhan dengan merujuk pada dokumentasi komunitas Postgres CDC.

  1. Prasyarat

    • Persiapan

      1. Beli kluster 2.0 di halaman pembelian kluster PolarDB.

      2. Buat akun istimewa.

      3. Lihat titik akhir utama kluster. Jika kluster PolarDB dan Realtime Compute for Apache Flink berada dalam virtual private cloud (VPC) yang sama, Anda dapat langsung menggunakan titik akhir pribadi. Jika tidak, mintalah titik akhir publik.

      4. Atur daftar putih kluster: Tambahkan alamat instans Flink ke daftar putih kluster PolarDB.

      5. Buat database sumber `flink_source` dan database target `flink_sink` di Konsol. Untuk langkah-langkah detail, lihat Buat database.

      6. Jalankan pernyataan berikut untuk membuat tabel `shipments` di database sumber `flink_source` dan masukkan data.

        CREATE TABLE public.shipments (
          shipment_id INT,
          order_id INT,
          origin TEXT,
          destination TEXT,
          is_arrived BOOLEAN,
          order_time DATE,
          PRIMARY KEY (shipment_id) 
        );
        ALTER TABLE public.shipments REPLICA IDENTITY FULL;
        INSERT INTO public.shipments SELECT 1, 1, 'test1', 'test1', false, now();
      7. Jalankan pernyataan berikut untuk membuat tabel `shipments_sink` di database target `flink_sink`.

        CREATE TABLE public.shipments_sink (
           shipment_id INT,
           order_id INT,
           origin TEXT,
           destination TEXT,
           is_arrived BOOLEAN,
           order_time TIMESTAMP,
           PRIMARY KEY (shipment_id)
         );
    • Realtime Compute for Apache Flink Persiapan

      1. Masuk ke Konsol Realtime Compute dan beli instans Realtime Compute for Apache Flink. Untuk informasi lebih lanjut, lihat Beli Realtime Compute for Apache Flink.

        Catatan

        Kami merekomendasikan agar Region dan Virtual Private Cloud (VPC) Realtime Compute for Apache Flink konsisten dengan kluster PolarDB. Anda dapat langsung menggunakan titik akhir pribadi dari titik akhir utama kluster PolarDB sebagai alamat koneksi.

      2. Buat konektor kustom dan unggah PolarDBO Flink CDC yang telah dikemas. Atur Formats ke debezium-json. Untuk langkah-langkah detail, lihat Buat konektor kustom.

        image

  2. Buat pekerjaan Flink

    1. Masuk ke Konsol Realtime Compute. Buat draf pekerjaan SQL baru. Untuk panduan, lihat peta pengembangan pekerjaan. Gunakan pernyataan Flink SQL berikut. Ganti titik akhir utama kluster PolarDB, port, username, dan password dengan nilai Anda.

      Catatan

      Tipe DATE di berukuran 64-bit. Sebaliknya, tipe DATE di Flink SQL dan sebagian besar database berukuran 32-bit. Oleh karena itu, kolom bertipe DATE di tabel sumber harus didefinisikan sebagai TIMESTAMP di kedua tabel sumber dan sink dalam Flink SQL. Jika tidak, pekerjaan akan gagal dengan error ketidakcocokan tipe, seperti: “java.time.DateTimeException: Invalid value for EpochDay (valid values -365243219162 - 365241780471): 1720891573000”.

      CREATE TEMPORARY TABLE shipments (
         shipment_id INT,
         order_id INT,
         origin STRING,
         destination STRING,
         is_arrived BOOLEAN,
         order_time TIMESTAMP,
         PRIMARY KEY (shipment_id) NOT ENFORCED
       ) WITH (
         'connector' = 'polardbo-cdc',
         'hostname' = '<yourHostname>',
         'port' = '<yourPort>',
         'username' = '<yourUserName>',
         'password' = '<yourPassWord>',
         'database-name' = 'flink_source',
         'schema-name' = 'public',
         'table-name' = 'shipments',
         'decoding.plugin.name' = 'pgoutput',
         'slot.name' = 'flink'
       );
      
      CREATE TEMPORARY TABLE shipments_sink (
         shipment_id INT,
         order_id INT,
         origin STRING,
         destination STRING,
         is_arrived BOOLEAN,
         order_time TIMESTAMP,
         PRIMARY KEY (shipment_id) NOT ENFORCED
       ) WITH (
        'connector' = 'jdbc',
        'url' = 'jdbc:postgresql://<yourHostname>:<yourPort>/flink_sink',
        'table-name' = 'shipments_sink',
        'username' = '<yourUserName>',
        'password' = '<yourPassWord>'
       );
      
      INSERT INTO shipments_sink SELECT * FROM shipments;
    2. Deploy dan mulai pekerjaan.

      image

      image

    3. Uji dan validasi.

      • Setelah pekerjaan berhasil dideploy dan statusnya berubah menjadi running, data dari tabel shipments disinkronkan ke tabel shipments_sink di database tujuan flink_sink.

        SELECT * FROM public.shipments_sink;

        Hasilnya adalah sebagai berikut:

         shipment_id | order_id | origin | destination | is_arrived |     order_time      
        -------------+----------+--------+-------------+------------+---------------------
                   1 |        1 | test1  | test1       | f          | 2024-09-18 05:45:08
        (1 row)
      • Jalankan pernyataan DML pada tabel shipments di database sumber flink_source. Insert dan update baru disinkronkan secara real time.

        INSERT INTO public.shipments SELECT 2, 2, 'test2', 'test2', false, now();
        UPDATE public.shipments SET is_arrived = true WHERE shipment_id = 1;
        DELETE FROM public.shipments WHERE shipment_id = 2;
        INSERT INTO public.shipments SELECT 3, 3, 'test3', 'test3', false, now();
        UPDATE public.shipments SET is_arrived = true WHERE shipment_id = 3;

        Data di tabel shipments kini disinkronkan ke tabel shipments_sink di database tujuan flink_sink.

        SELECT * FROM public.shipments_sink;

        Hasilnya adalah sebagai berikut:

         shipment_id | order_id | origin | destination | is_arrived |     order_time      
        -------------+----------+--------+-------------+------------+---------------------
                   1 |        1 | test1  | test1       | t          | 2024-09-18 05:45:08
                   3 |        3 | test3  | test3       | t          | 2024-09-18 07:33:23
        (2 rows)

Pipeline connector

Contoh berikut mengilustrasikan cara menyinkronkan tabel `shipments1` dan `shipments2` dari database `flink_source` di kluster 2.0 menggunakan konektor Pipeline PolarDBO Flink CDC. Saat debugging, gunakan konektor Print. Di lingkungan produksi, pilih konektor yang sesuai sesuai kebutuhan.

Catatan

Contoh ini hanya memverifikasi bahwa PolarDBO Flink CDC yang telah dikemas dapat berjalan di . Untuk penggunaan produksi, konfigurasikan parameter terkait untuk memenuhi kebutuhan bisnis aktual dengan merujuk pada dokumentasi komunitas konektor Pipeline Postgres CDC.

  1. Prasyarat

    • Persiapan

      1. Beli kluster 2.0 di halaman pembelian kluster PolarDB.

      2. Buat akun istimewa.

      3. Lihat titik akhir utama kluster. Jika kluster PolarDB dan Realtime Compute for Apache Flink berada dalam virtual private cloud (VPC) yang sama, Anda dapat langsung menggunakan titik akhir pribadi. Jika tidak, mintalah titik akhir publik.

      4. Atur daftar putih kluster: Tambahkan alamat instans Flink ke daftar putih kluster PolarDB.

      5. Buat database sumber `flink_source` di Konsol. Untuk langkah-langkah detail, lihat Buat database.

      6. Jalankan pernyataan berikut untuk membuat tabel `shipments1` dan `shipments2` di database sumber `flink_source` dan masukkan data.

        CREATE TABLE public.shipments1 (
          shipment_id INT,
          order_id INT,
          origin TEXT,
          destination TEXT,
          is_arrived BOOLEAN,
          order_time DATE,
          PRIMARY KEY (shipment_id) 
        );
        ALTER TABLE public.shipments1 REPLICA IDENTITY FULL;
        INSERT INTO public.shipments1 SELECT 1, 1, 'test1', 'test1', false, now();
        
        CREATE TABLE public.shipments2 (
          shipment_id INT,
          order_id INT,
          origin TEXT,
          destination TEXT,
          is_arrived BOOLEAN,
          order_time DATE,
          PRIMARY KEY (shipment_id) 
        );
        ALTER TABLE public.shipments2 REPLICA IDENTITY FULL;
        INSERT INTO public.shipments2 SELECT 1, 1, 'test1', 'test1', false, now();
    • Realtime Compute for Apache Flink persiapan

      Masuk ke Konsol Realtime Compute dan beli instans Realtime Compute for Apache Flink. Untuk operasi detail, lihat Aktifkan Realtime Compute for Apache Flink.

      Catatan

      Kami merekomendasikan agar region dan virtual private cloud (VPC) Realtime Compute for Apache Flink sesuai dengan kluster PolarDB. Alamat koneksi dapat langsung menggunakan titik akhir pribadi dari titik akhir utama kluster PolarDB.

  2. Buat pekerjaan Flink.

    1. Masuk ke Konsol Realtime Compute dan buat draf ingesti data baru. Lihat Panduan Cepat Pekerjaan Ingesti Data Flink CDC. Gunakan konfigurasi ingesti data berikut, ubah titik akhir utama kluster PolarDB, port, akun, dan password.

      source:
         type: polardbo
         name: PolarDB Oracle Source
         hostname: '<yourHostname>'
         port: '<yourPort>'
         username: '<yourUserName>'
         password: '<yourPassWord>'
         tables: flink_source.public.shipments[12]
         decoding.plugin.name:  pgoutput
         slot.name: pgtest
      
      sink:
        type: values
        name: values Sink
        print.enabled: true
    2. Tambahkan konektor Pipeline yang telah berhasil dikemas di bagian More Configurations di sebelah kiri.image

    3. Deploy dan mulai pekerjaan.

      1. Klik Deploy di pojok kanan atas.image

      2. Buka halaman O&M pekerjaan dan klik Start.

        image

    4. Uji dan verifikasi.

      • Setelah pekerjaan berhasil dideploy dan berjalan, Anda dapat melihat `CreateTableEvent` dan `DataChangeEvent` untuk fase data lengkap di log Job logs > Running Task Managers > Stdout.image

        CreateTableEvent{tableId=public.shipments2, schema=columns={`shipment_id` INT NOT NULL,`order_id` INT,`origin` STRING,`destination` STRING,`is_arrived` BOOLEAN,`order_time` TIMESTAMP(6)}, primaryKeys=shipment_id, options=()}
        CreateTableEvent{tableId=public.shipments1, schema=columns={`shipment_id` INT NOT NULL,`order_id` INT,`origin` STRING,`destination` STRING,`is_arrived` BOOLEAN,`order_time` TIMESTAMP(6)}, primaryKeys=shipment_id, options=()}
        DataChangeEvent{tableId=public.shipments2, before=[], after=[1, 1, test1, test1, false, 2026-01-07T16:30:44], op=INSERT, meta=()}
        DataChangeEvent{tableId=public.shipments1, before=[], after=[1, 1, test1, test1, false, 2026-01-07T16:30:44], op=INSERT, meta=()}
      • Jalankan DML pada tabel `shipments1` dan `shipments2` di database sumber `flink_source`. Penambahan dan modifikasi baru juga akan disinkronkan secara real time.

        INSERT INTO public.shipments1 SELECT 2, 2, 'test2', 'test2', false, now();
        UPDATE public.shipments1 SET is_arrived = true WHERE shipment_id = 1;
        DELETE FROM public.shipments1 WHERE shipment_id = 2;
        INSERT INTO public.shipments1 SELECT 3, 3, 'test3', 'test3', false, now();
        UPDATE public.shipments1 SET is_arrived = true WHERE shipment_id = 3;
        
        INSERT INTO public.shipments2 SELECT 2, 2, 'test2', 'test2', false, now();
        UPDATE public.shipments2 SET is_arrived = true WHERE shipment_id = 1;
        DELETE FROM public.shipments2 WHERE shipment_id = 2;
        INSERT INTO public.shipments2 SELECT 3, 3, 'test3', 'test3', false, now();
        UPDATE public.shipments2 SET is_arrived = true WHERE shipment_id = 3;
      • Anda dapat melihat `DataChangeEvent` untuk fase data inkremental di log Job logs > Running Task Managers > Stdout:

        DataChangeEvent{tableId=public.shipments1, before=[], after=[2, 2, test2, test2, false, 2026-01-07T16:44:50], op=INSERT, meta=()}
        DataChangeEvent{tableId=public.shipments1, before=[1, 1, test1, test1, false, 2026-01-07T16:30:44], after=[1, 1, test1, test1, true, 2026-01-07T16:30:44], op=UPDATE, meta=()}
        DataChangeEvent{tableId=public.shipments1, before=[2, 2, test2, test2, false, 2026-01-07T16:44:50], after=[], op=DELETE, meta=()}
        DataChangeEvent{tableId=public.shipments1, before=[], after=[3, 3, test3, test3, false, 2026-01-07T16:44:50], op=INSERT, meta=()}
        DataChangeEvent{tableId=public.shipments1, before=[3, 3, test3, test3, false, 2026-01-07T16:44:50], after=[3, 3, test3, test3, true, 2026-01-07T16:44:50], op=UPDATE, meta=()}
        DataChangeEvent{tableId=public.shipments2, before=[], after=[2, 2, test2, test2, false, 2026-01-07T16:44:50], op=INSERT, meta=()}
        DataChangeEvent{tableId=public.shipments2, before=[1, 1, test1, test1, false, 2026-01-07T16:30:44], after=[1, 1, test1, test1, true, 2026-01-07T16:30:44], op=UPDATE, meta=()}
        DataChangeEvent{tableId=public.shipments2, before=[2, 2, test2, test2, false, 2026-01-07T16:44:50], after=[], op=DELETE, meta=()}
        DataChangeEvent{tableId=public.shipments2, before=[], after=[3, 3, test3, test3, false, 2026-01-07T16:44:50], op=INSERT, meta=()}
        DataChangeEvent{tableId=public.shipments2, before=[3, 3, test3, test3, false, 2026-01-07T16:44:50], after=[3, 3, test3, test3, true, 2026-01-07T16:44:50], op=UPDATE, meta=()}