Konektor Flink CDC yang kompatibel dengan (PolarDBO Flink CDC) membaca snapshot lengkap dan data perubahan dari database . Untuk fitur dan penggunaan spesifik, lihat dokumentasi komunitas Postgres CDC.
Karena hanya berbeda dari PostgreSQL komunitas dalam beberapa tipe data dan penanganan objek bawaan, topik ini menjelaskan cara mengadaptasi dan mengemas konektor PolarDBO Flink CDC yang mendukung berdasarkan Postgres CDC komunitas dengan perubahan kode minimal.
Tipe DATE di berukuran 64-bit, sedangkan tipe DATE di PostgreSQL komunitas berukuran 32-bit. Oleh karena itu, PolarDBO Flink CDC menyesuaikan penanganan data bertipe DATE.
Paketkan konektor PolarDBO Flink CDC
Konektor PolarDBO Flink CDC dikembangkan berdasarkan Postgres CDC komunitas. Alibaba Cloud tidak memberikan jaminan Service-Level Agreement (SLA) untuk konektor ini, baik Anda mengemasnya sendiri maupun menggunakan paket JAR yang disediakan dalam topik ini.
Prasyarat
Tentukan versi Flink-CDC.
Jika Anda menggunakan Realtime Compute for Apache Flink Alibaba Cloud, tentukan versi Flink-CDC komunitas yang kompatibel dengan versi Ververica Runtime (VVR) yang sesuai. Untuk detailnya, lihat Pemetaan versi CDC dan VVR.
CatatanUntuk repositori kode Flink-CDC, lihat Flink-CDC.
Tentukan versi Debezium.
Dalam file
pom.xmldari versi Flink-CDC yang sesuai, cari kata kuncidebezium.versionuntuk menentukan versi Debezium.CatatanUntuk repositori kode Debezium, lihat Debezium.
Tentukan versi PgJDBC.
Dalam file
pom.xmldari versi Postgres-CDC yang sesuai, cari kata kunciorg.postgresqluntuk menentukan versi PgJDBC.CatatanUntuk versi sebelum release-3.0, jalur file adalah:
flink-connector-postgres-cdc/pom.xml.Untuk versi release-3.0 dan yang lebih baru, jalur file adalah:
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/pom.xml.Untuk repositori kode PgJDBC, lihat PgJDBC.
Prosedur
Paket rilis-3.5
Rilis Community Flink-CDC-3.5 kompatibel dengan vvr-11.4-jdk11-flink-1.20 dari Alibaba Cloud Realtime Compute for Apache Flink.
Untuk mengemas versi konektor PolarDB Flink CDC yang sesuai, ikuti langkah-langkah berikut:
Klon kode sumber Flink-CDC, Debezium, dan PgJDBC pada versi yang ditentukan.
git clone -b release-3.5 --depth=1 https://github.com/apache/flink-cdc.git git clone -b REL42.7.3 --depth=1 https://github.com/pgjdbc/pgjdbc.git git clone -b v1.9.8.Final --depth=1 https://github.com/debezium/debezium.gitSalin file-file tertentu dari Debezium dan PgJDBC ke dalam Flink-CDC.
mkdir -p flink-cdc/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/postgresql/core/v3 mkdir -p flink-cdc/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/postgresql/jdbc cp pgjdbc/pgjdbc/src/main/java/org/postgresql/core/v3/ConnectionFactoryImpl.java flink-cdc/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/postgresql/core/v3 cp pgjdbc/pgjdbc/src/main/java/org/postgresql/core/v3/QueryExecutorImpl.java flink-cdc/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/postgresql/core/v3 cp pgjdbc/pgjdbc/src/main/java/org/postgresql/jdbc/PgDatabaseMetaData.java flink-cdc/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/postgresql/jdbc cp pgjdbc/pgjdbc/src/main/java/org/postgresql/core/Oid.java flink-cdc/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/postgresql/core cp debezium/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/TypeRegistry.java flink-cdc/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresqlMasuk ke direktori Flink-CDC. Terapkan perbaikan bug konversi timestamp dan perbaikan bug perilaku implisit (SELECT *). Perbaikan ini akan digabungkan ke rilis komunitas 3.6.
cd flink-cdc # Terapkan perbaikan bug konversi timestamp. Akan digabungkan ke rilis komunitas 3.6. git fetch origin 2f32836a783f80f295c9dce339c11afec2a32dc2 git cherry-pick 2f32836a783f80f295c9dce339c11afec2a32dc2 git fetch origin 0d86de24494a855c2d83f9b1052c2e888e182cb1 git cherry-pick 0d86de24494a855c2d83f9b1052c2e888e182cb1Terapkan file patch yang menambahkan dukungan untuk .
git apply release-3.5_support_polardbo.patchCatatanFile patch yang digunakan di atas adalah: release-3.5_support_polardbo.patch.
Gunakan Maven untuk mengemas konektor PolarDB untuk PostgreSQL Flink CDC.
mvn clean install -DskipTests -Dcheckstyle.skip=true -Dspotless.check.skip # Setelah pengemasan selesai, temukan file JAR di flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/target.
Mengikuti prosedur di atas, bangun paket JAR untuk konektor PolarDBO Flink CDC menggunakan JDK 11: flink-cdc-pipeline-connector-polardbo-3.5-SNAPSHOT-20260212.jar.
Paket rilis-3.1
Flink-CDC komunitas release-3.1 kompatibel dengan vvr-8.0.x-flink-1.17 dari Realtime Compute for Apache Flink Alibaba Cloud.
Untuk mengemas versi konektor PolarDB Flink CDC yang sesuai, ikuti langkah-langkah berikut:
Klon file kode Flink-CDC, Debezium, dan PgJDBC.
git clone -b release-3.1 --depth=1 https://github.com/apache/flink-cdc.git git clone -b REL42.5.1 --depth=1 https://github.com/pgjdbc/pgjdbc.git git clone -b v1.9.8.Final --depth=1 https://github.com/debezium/debezium.gitSalin file-file tertentu dari Debezium dan PgJDBC ke Flink-CDC.
mkdir -p flink-cdc/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/postgresql/core/v3 mkdir -p flink-cdc/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/postgresql/jdbc cp pgjdbc/pgjdbc/src/main/java/org/postgresql/core/v3/ConnectionFactoryImpl.java flink-cdc/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/postgresql/core/v3 cp pgjdbc/pgjdbc/src/main/java/org/postgresql/core/v3/QueryExecutorImpl.java flink-cdc/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/postgresql/core/v3 cp pgjdbc/pgjdbc/src/main/java/org/postgresql/jdbc/PgDatabaseMetaData.java flink-cdc/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/postgresql/jdbc cp debezium/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/TypeRegistry.java flink-cdc/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresqlTerapkan file patch untuk mendukung .
git apply release-3.1_support_polardbo.patchCatatanFile patch untuk kompatibilitas PolarDBO Flink CDC adalah release-3.1_support_polardbo.patch.
Kemas konektor PolarDBO Flink CDC menggunakan Maven.
mvn clean install -DskipTests -Dcheckstyle.skip=true -Dspotless.check.skip -Drat.skip=true # Setelah pengemasan selesai, temukan paket JAR di folder target flink-sql-connector-postgres-cdc
Ikuti proses yang dijelaskan di atas untuk mengemas file JAR konektor PolarDB Flink CDC menggunakan JDK 8: flink-sql-connector-postgres-cdc-3.1-SNAPSHOT.jar.
Paket rilis-2.3
Flink-CDC komunitas release-2.3 kompatibel dengan vvr-4.0.15-flink-1.13 hingga vvr-6.0.2-flink-1.15 dari Realtime Compute for Apache Flink Alibaba Cloud.
Untuk mengemas versi konektor PolarDB-O Flink CDC yang sesuai, ikuti langkah-langkah berikut:
Klon kode sumber Flink-CDC, Debezium, dan PgJDBC pada versi yang sesuai.
git clone -b release-2.3 --depth=1 https://github.com/apache/flink-cdc.git git clone -b REL42.2.26 --depth=1 https://github.com/pgjdbc/pgjdbc.git git clone -b v1.6.4.Final --depth=1 https://github.com/debezium/debezium.gitSalin file-file tertentu dari Debezium dan PgJDBC ke dalam Flink-CDC.
mkdir -p flink-cdc/flink-connector-postgres-cdc/src/main/java/org/postgresql/core/v3 mkdir -p flink-cdc/flink-connector-postgres-cdc/src/main/java/org/postgresql/jdbc mkdir -p flink-cdc/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql cp pgjdbc/pgjdbc/src/main/java/org/postgresql/core/v3/ConnectionFactoryImpl.java flink-cdc/flink-connector-postgres-cdc/src/main/java/org/postgresql/core/v3 cp pgjdbc/pgjdbc/src/main/java/org/postgresql/core/v3/QueryExecutorImpl.java flink-cdc/flink-connector-postgres-cdc/src/main/java/org/postgresql/core/v3 cp pgjdbc/pgjdbc/src/main/java/org/postgresql/jdbc/PgDatabaseMetaData.java flink-cdc/flink-connector-postgres-cdc/src/main/java/org/postgresql/jdbc cp debezium/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/TypeRegistry.java flink-cdc/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresqlTerapkan file patch untuk kompatibilitas dengan .
git apply release-2.3_support_polardbo.patchCatatanFile patch yang digunakan di atas adalah: release-2.3_support_polardbo.patch.
Kemas konektor PolarDB untuk PostgreSQL Flink CDC menggunakan Maven.
mvn clean install -DskipTests -Dcheckstyle.skip=true -Dspotless.check.skip -Drat.skip=true # Setelah pengemasan selesai, temukan file JAR di folder target flink-sql-connector-postgres-cdc
Ikuti langkah-langkah di atas untuk mengemas JAR konektor PolarDB for PostgreSQL Flink CDC menggunakan JDK 8: flink-sql-connector-postgres-cdc-2.3-SNAPSHOT.jar.
Petunjuk penggunaan
Konektor PolarDBO Flink CDC membaca data aliran perubahan menggunakan replikasi logis dari database . Konektor ini memerlukan kondisi berikut:
Atur parameter
wal_levelkelogical. Ini menambahkan informasi yang diperlukan untuk mendukung replikasi logis dalam write-ahead logging (WAL).CatatanAnda dapat mengatur parameter wal_level di Konsol. Untuk operasi detail, lihat Setel parameter kluster. Kluster akan dimulai ulang setelah memodifikasi parameter ini. Rencanakan operasi bisnis Anda sebelum mengubah parameter, dan lakukan dengan hati-hati.
Jalankan perintah
ALTER TABLE schema.table REPLICA IDENTITY FULL;untuk mengaturREPLICA IDENTITYtabel yang dilangganan keFULL. Ini memastikan konsistensi sinkronisasi data untuk tabel tersebut, karena event insert dan update yang dipancarkan mencakup nilai lama dari semua kolom dalam tabel.CatatanREPLICA IDENTITY adalah pengaturan unik tingkat tabel di PostgreSQL. Pengaturan ini menentukan apakah plugin decoding logis menyertakan nilai lama kolom tabel yang terpengaruh selama event INSERT dan UPDATE. Untuk detail makna nilai-nilainya, lihat REPLICA IDENTITY.
Mengatur
REPLICA IDENTITYtabel yang dilangganan keFULLmungkin mengunci tabel, yang dapat memengaruhi operasi bisnis. Rencanakan operasi bisnis Anda sebelum mengubah parameter. Periksa apakah konfigurasi saat ini sudahFULLmenggunakan perintah berikut:SELECT relreplident = 'f' FROM pg_class WHERE relname = 'tablename';
Pastikan nilai kedua parameter `max_wal_senders` dan `max_replication_slots` lebih besar dari jumlah slot replikasi database yang sedang digunakan ditambah jumlah slot yang dibutuhkan oleh pekerjaan Flink.
Gunakan akun istimewa atau akun dengan izin LOGIN dan REPLICATION, serta izin SELECT pada tabel yang dilangganan untuk kueri data lengkap.
Hanya terhubung ke titik akhir utama kluster PolarDB. Titik akhir kluster tidak mendukung replikasi logis.
Versi release-3.5 dan yang lebih baru mendukung sinkronisasi langsung tabel induk untuk tabel partisi. Konfigurasikan sebagai berikut. Untuk operasi spesifik, lihat dokumentasi komunitas Postgres CDC.
Atur
scan.include-partitioned-tables.enabledketrue.Buat secara manual
PUBLICATIONdi database dengan opsipublish_via_partition_root=true. Selain itu, tentukantable-namemenggunakan parameterdebezium.publication.name.table-namehanya dapat menentukan tabel induk. Ekspresi reguler tidak boleh mencocokkan tabel anak; jika tidak, data lengkap akan diduplikasi.
Selain itu, versi release-3.5 dan yang lebih baru mendukung konektor Pipeline, yang memungkinkan pembacaan data snapshot dan data inkremental, menyediakan kemampuan sinkronisasi data end-to-end untuk seluruh database. Namun, perhatikan bahwa konektor Pipeline saat ini tidak mendukung sinkronisasi perubahan skema tabel. Untuk detailnya, lihat dokumentasi komunitas konektor Pipeline Postgres CDC.
Konektor PolarDBO Flink CDC vs. Postgres CDC
Konektor PolarDBO Flink CDC dikemas berdasarkan Postgres CDC. Untuk sintaks dan parameter spesifik, lihat Postgres CDC. Namun, terdapat perbedaan utama berikut:
Parameter konektor dalam WITH harus diatur ke bidang statis:
polardbo-cdc.PolarDBO Flink CDC kompatibel dengan semua versi PolarDB untuk PostgreSQL, 1.0, dan 2.0.
CatatanJika Anda menggunakan PolarDB untuk PostgreSQL, kami merekomendasikan untuk langsung menggunakan Postgres CDC komunitas.
Untuk kolom bertipe
DATEdi 1.0 dan 2.0, tipe yang sesuai untuk tabel sumber dan sink di Flink SQL harus ditentukan sebagaiTIMESTAMP.Atur parameter
decoding.plugin.namekepgoutput. Jika tidak, penguraian inkremental mungkin menghasilkan karakter acak untuk database yang tidak menggunakan encoding UTF-8. Untuk pengenalan detail, lihat dokumentasi komunitas.
Pemetaan tipe
Pemetaan tipe bidang antara PolarDB PostgreSQL dan Flink identik dengan versi komunitas PostgreSQL, kecuali untuk tipe DATE. Pemetaan spesifiknya adalah sebagai berikut:
PolarDB untuk PostgreSQL tipe bidang | Tipe bidang Flink |
SMALLINT | SMALLINT |
INT2 | |
SMALLSERIAL | |
SERIAL2 | |
INTEGER | INT |
SERIAL | |
BIGINT | BIGINT |
BIGSERIAL | |
REAL | FLOAT |
FLOAT4 | |
FLOAT8 | DOUBLE |
DOUBLE PRECISION | |
NUMERIC(p, s) | DECIMAL(p, s) |
DECIMAL(p, s) | |
BOOLEAN | BOOLEAN |
DATE |
|
TIME [(p)] [WITHOUT TIMEZONE] | TIME [(p)] [WITHOUT TIMEZONE] |
TIMESTAMP [(p)] [WITHOUT TIMEZONE] | TIMESTAMP [(p)] [WITHOUT TIMEZONE] |
CHAR(n) | STRING |
CHARACTER(n) | |
VARCHAR(n) | |
CHARACTER VARYING(n) | |
TEXT | |
BYTEA | BYTES |
Contoh penggunaan
Konektor sumber
Contoh berikut mengilustrasikan cara menyinkronkan tabel `shipments` dari database `flink_source` di kluster 2.0 ke tabel `shipments_sink` di database `flink_sink` menggunakan PolarDBO Flink CDC.
Contoh ini hanya memverifikasi bahwa PolarDBO Flink CDC yang telah dikemas dapat berjalan di . Untuk penggunaan produksi, konfigurasikan parameter sesuai kebutuhan dengan merujuk pada dokumentasi komunitas Postgres CDC.
Prasyarat
Persiapan
Beli kluster 2.0 di halaman pembelian kluster PolarDB.
Lihat titik akhir utama kluster. Jika kluster PolarDB dan Realtime Compute for Apache Flink berada dalam virtual private cloud (VPC) yang sama, Anda dapat langsung menggunakan titik akhir pribadi. Jika tidak, mintalah titik akhir publik.
Atur daftar putih kluster: Tambahkan alamat instans Flink ke daftar putih kluster PolarDB.
Buat database sumber `flink_source` dan database target `flink_sink` di Konsol. Untuk langkah-langkah detail, lihat Buat database.
Jalankan pernyataan berikut untuk membuat tabel `shipments` di database sumber `flink_source` dan masukkan data.
CREATE TABLE public.shipments ( shipment_id INT, order_id INT, origin TEXT, destination TEXT, is_arrived BOOLEAN, order_time DATE, PRIMARY KEY (shipment_id) ); ALTER TABLE public.shipments REPLICA IDENTITY FULL; INSERT INTO public.shipments SELECT 1, 1, 'test1', 'test1', false, now();Jalankan pernyataan berikut untuk membuat tabel `shipments_sink` di database target `flink_sink`.
CREATE TABLE public.shipments_sink ( shipment_id INT, order_id INT, origin TEXT, destination TEXT, is_arrived BOOLEAN, order_time TIMESTAMP, PRIMARY KEY (shipment_id) );
Realtime Compute for Apache Flink Persiapan
Masuk ke Konsol Realtime Compute dan beli instans Realtime Compute for Apache Flink. Untuk informasi lebih lanjut, lihat Beli Realtime Compute for Apache Flink.
CatatanKami merekomendasikan agar Region dan Virtual Private Cloud (VPC) Realtime Compute for Apache Flink konsisten dengan kluster PolarDB. Anda dapat langsung menggunakan titik akhir pribadi dari titik akhir utama kluster PolarDB sebagai alamat koneksi.
Buat konektor kustom dan unggah PolarDBO Flink CDC yang telah dikemas. Atur Formats ke debezium-json. Untuk langkah-langkah detail, lihat Buat konektor kustom.

-
Buat pekerjaan Flink
-
Masuk ke Konsol Realtime Compute. Buat draf pekerjaan SQL baru. Untuk panduan, lihat peta pengembangan pekerjaan. Gunakan pernyataan Flink SQL berikut. Ganti titik akhir utama kluster PolarDB, port, username, dan password dengan nilai Anda.
CatatanTipe DATE di berukuran 64-bit. Sebaliknya, tipe DATE di Flink SQL dan sebagian besar database berukuran 32-bit. Oleh karena itu, kolom bertipe DATE di tabel sumber harus didefinisikan sebagai TIMESTAMP di kedua tabel sumber dan sink dalam Flink SQL. Jika tidak, pekerjaan akan gagal dengan error ketidakcocokan tipe, seperti:
“java.time.DateTimeException: Invalid value for EpochDay (valid values -365243219162 - 365241780471): 1720891573000”.CREATE TEMPORARY TABLE shipments ( shipment_id INT, order_id INT, origin STRING, destination STRING, is_arrived BOOLEAN, order_time TIMESTAMP, PRIMARY KEY (shipment_id) NOT ENFORCED ) WITH ( 'connector' = 'polardbo-cdc', 'hostname' = '<yourHostname>', 'port' = '<yourPort>', 'username' = '<yourUserName>', 'password' = '<yourPassWord>', 'database-name' = 'flink_source', 'schema-name' = 'public', 'table-name' = 'shipments', 'decoding.plugin.name' = 'pgoutput', 'slot.name' = 'flink' ); CREATE TEMPORARY TABLE shipments_sink ( shipment_id INT, order_id INT, origin STRING, destination STRING, is_arrived BOOLEAN, order_time TIMESTAMP, PRIMARY KEY (shipment_id) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:postgresql://<yourHostname>:<yourPort>/flink_sink', 'table-name' = 'shipments_sink', 'username' = '<yourUserName>', 'password' = '<yourPassWord>' ); INSERT INTO shipments_sink SELECT * FROM shipments; -
Deploy dan mulai pekerjaan.


-
Uji dan validasi.
-
Setelah pekerjaan berhasil dideploy dan statusnya berubah menjadi running, data dari tabel shipments disinkronkan ke tabel shipments_sink di database tujuan flink_sink.
SELECT * FROM public.shipments_sink;Hasilnya adalah sebagai berikut:
shipment_id | order_id | origin | destination | is_arrived | order_time -------------+----------+--------+-------------+------------+--------------------- 1 | 1 | test1 | test1 | f | 2024-09-18 05:45:08 (1 row) -
Jalankan pernyataan DML pada tabel shipments di database sumber flink_source. Insert dan update baru disinkronkan secara real time.
INSERT INTO public.shipments SELECT 2, 2, 'test2', 'test2', false, now(); UPDATE public.shipments SET is_arrived = true WHERE shipment_id = 1; DELETE FROM public.shipments WHERE shipment_id = 2; INSERT INTO public.shipments SELECT 3, 3, 'test3', 'test3', false, now(); UPDATE public.shipments SET is_arrived = true WHERE shipment_id = 3;Data di tabel shipments kini disinkronkan ke tabel shipments_sink di database tujuan flink_sink.
SELECT * FROM public.shipments_sink;Hasilnya adalah sebagai berikut:
shipment_id | order_id | origin | destination | is_arrived | order_time -------------+----------+--------+-------------+------------+--------------------- 1 | 1 | test1 | test1 | t | 2024-09-18 05:45:08 3 | 3 | test3 | test3 | t | 2024-09-18 07:33:23 (2 rows)
-
-
Pipeline connector
Contoh berikut mengilustrasikan cara menyinkronkan tabel `shipments1` dan `shipments2` dari database `flink_source` di kluster 2.0 menggunakan konektor Pipeline PolarDBO Flink CDC. Saat debugging, gunakan konektor Print. Di lingkungan produksi, pilih konektor yang sesuai sesuai kebutuhan.
Contoh ini hanya memverifikasi bahwa PolarDBO Flink CDC yang telah dikemas dapat berjalan di . Untuk penggunaan produksi, konfigurasikan parameter terkait untuk memenuhi kebutuhan bisnis aktual dengan merujuk pada dokumentasi komunitas konektor Pipeline Postgres CDC.
Prasyarat
Persiapan
Beli kluster 2.0 di halaman pembelian kluster PolarDB.
Lihat titik akhir utama kluster. Jika kluster PolarDB dan Realtime Compute for Apache Flink berada dalam virtual private cloud (VPC) yang sama, Anda dapat langsung menggunakan titik akhir pribadi. Jika tidak, mintalah titik akhir publik.
Atur daftar putih kluster: Tambahkan alamat instans Flink ke daftar putih kluster PolarDB.
Buat database sumber `flink_source` di Konsol. Untuk langkah-langkah detail, lihat Buat database.
Jalankan pernyataan berikut untuk membuat tabel `shipments1` dan `shipments2` di database sumber `flink_source` dan masukkan data.
CREATE TABLE public.shipments1 ( shipment_id INT, order_id INT, origin TEXT, destination TEXT, is_arrived BOOLEAN, order_time DATE, PRIMARY KEY (shipment_id) ); ALTER TABLE public.shipments1 REPLICA IDENTITY FULL; INSERT INTO public.shipments1 SELECT 1, 1, 'test1', 'test1', false, now(); CREATE TABLE public.shipments2 ( shipment_id INT, order_id INT, origin TEXT, destination TEXT, is_arrived BOOLEAN, order_time DATE, PRIMARY KEY (shipment_id) ); ALTER TABLE public.shipments2 REPLICA IDENTITY FULL; INSERT INTO public.shipments2 SELECT 1, 1, 'test1', 'test1', false, now();
Realtime Compute for Apache Flink persiapan
Masuk ke Konsol Realtime Compute dan beli instans Realtime Compute for Apache Flink. Untuk operasi detail, lihat Aktifkan Realtime Compute for Apache Flink.
CatatanKami merekomendasikan agar region dan virtual private cloud (VPC) Realtime Compute for Apache Flink sesuai dengan kluster PolarDB. Alamat koneksi dapat langsung menggunakan titik akhir pribadi dari titik akhir utama kluster PolarDB.
Buat pekerjaan Flink.
Masuk ke Konsol Realtime Compute dan buat draf ingesti data baru. Lihat Panduan Cepat Pekerjaan Ingesti Data Flink CDC. Gunakan konfigurasi ingesti data berikut, ubah titik akhir utama kluster PolarDB, port, akun, dan password.
source: type: polardbo name: PolarDB Oracle Source hostname: '<yourHostname>' port: '<yourPort>' username: '<yourUserName>' password: '<yourPassWord>' tables: flink_source.public.shipments[12] decoding.plugin.name: pgoutput slot.name: pgtest sink: type: values name: values Sink print.enabled: trueTambahkan konektor Pipeline yang telah berhasil dikemas di bagian More Configurations di sebelah kiri.

Deploy dan mulai pekerjaan.
Klik Deploy di pojok kanan atas.

Buka halaman O&M pekerjaan dan klik Start.

Uji dan verifikasi.
Setelah pekerjaan berhasil dideploy dan berjalan, Anda dapat melihat `CreateTableEvent` dan `DataChangeEvent` untuk fase data lengkap di log .

CreateTableEvent{tableId=public.shipments2, schema=columns={`shipment_id` INT NOT NULL,`order_id` INT,`origin` STRING,`destination` STRING,`is_arrived` BOOLEAN,`order_time` TIMESTAMP(6)}, primaryKeys=shipment_id, options=()} CreateTableEvent{tableId=public.shipments1, schema=columns={`shipment_id` INT NOT NULL,`order_id` INT,`origin` STRING,`destination` STRING,`is_arrived` BOOLEAN,`order_time` TIMESTAMP(6)}, primaryKeys=shipment_id, options=()} DataChangeEvent{tableId=public.shipments2, before=[], after=[1, 1, test1, test1, false, 2026-01-07T16:30:44], op=INSERT, meta=()} DataChangeEvent{tableId=public.shipments1, before=[], after=[1, 1, test1, test1, false, 2026-01-07T16:30:44], op=INSERT, meta=()}Jalankan DML pada tabel `shipments1` dan `shipments2` di database sumber `flink_source`. Penambahan dan modifikasi baru juga akan disinkronkan secara real time.
INSERT INTO public.shipments1 SELECT 2, 2, 'test2', 'test2', false, now(); UPDATE public.shipments1 SET is_arrived = true WHERE shipment_id = 1; DELETE FROM public.shipments1 WHERE shipment_id = 2; INSERT INTO public.shipments1 SELECT 3, 3, 'test3', 'test3', false, now(); UPDATE public.shipments1 SET is_arrived = true WHERE shipment_id = 3; INSERT INTO public.shipments2 SELECT 2, 2, 'test2', 'test2', false, now(); UPDATE public.shipments2 SET is_arrived = true WHERE shipment_id = 1; DELETE FROM public.shipments2 WHERE shipment_id = 2; INSERT INTO public.shipments2 SELECT 3, 3, 'test3', 'test3', false, now(); UPDATE public.shipments2 SET is_arrived = true WHERE shipment_id = 3;Anda dapat melihat `DataChangeEvent` untuk fase data inkremental di log :
DataChangeEvent{tableId=public.shipments1, before=[], after=[2, 2, test2, test2, false, 2026-01-07T16:44:50], op=INSERT, meta=()} DataChangeEvent{tableId=public.shipments1, before=[1, 1, test1, test1, false, 2026-01-07T16:30:44], after=[1, 1, test1, test1, true, 2026-01-07T16:30:44], op=UPDATE, meta=()} DataChangeEvent{tableId=public.shipments1, before=[2, 2, test2, test2, false, 2026-01-07T16:44:50], after=[], op=DELETE, meta=()} DataChangeEvent{tableId=public.shipments1, before=[], after=[3, 3, test3, test3, false, 2026-01-07T16:44:50], op=INSERT, meta=()} DataChangeEvent{tableId=public.shipments1, before=[3, 3, test3, test3, false, 2026-01-07T16:44:50], after=[3, 3, test3, test3, true, 2026-01-07T16:44:50], op=UPDATE, meta=()} DataChangeEvent{tableId=public.shipments2, before=[], after=[2, 2, test2, test2, false, 2026-01-07T16:44:50], op=INSERT, meta=()} DataChangeEvent{tableId=public.shipments2, before=[1, 1, test1, test1, false, 2026-01-07T16:30:44], after=[1, 1, test1, test1, true, 2026-01-07T16:30:44], op=UPDATE, meta=()} DataChangeEvent{tableId=public.shipments2, before=[2, 2, test2, test2, false, 2026-01-07T16:44:50], after=[], op=DELETE, meta=()} DataChangeEvent{tableId=public.shipments2, before=[], after=[3, 3, test3, test3, false, 2026-01-07T16:44:50], op=INSERT, meta=()} DataChangeEvent{tableId=public.shipments2, before=[3, 3, test3, test3, false, 2026-01-07T16:44:50], after=[3, 3, test3, test3, true, 2026-01-07T16:44:50], op=UPDATE, meta=()}