全部产品
Search
文档中心

Realtime Compute for Apache Flink:PolarDB-X CDC (Pratinjau Publik)

更新时间:Feb 06, 2026

Topik ini menjelaskan cara menggunakan konektor PolarDB-X.

Informasi latar belakang

PolarDB for Xscale (PolarDB-X) adalah layanan database terdistribusi cloud-native berkinerja tinggi dari Alibaba Cloud yang menawarkan throughput tinggi, kapasitas penyimpanan besar, latensi rendah, skalabilitas mudah, dan ketersediaan tinggi.

Penting

Konektor ini mendukung Ververica Runtime (VVR) 11.5 atau versi yang lebih baru dan hanya kompatibel dengan PolarDB-X 2.0 atau versi yang lebih baru.

Konektor PolarDB-X CDC hanya dapat digunakan sebagai tabel sumber. Untuk mengkueri tabel dimensi atau menulis ke tabel sink pada instans PolarDB-X, gunakan konektor MySQL (pratinjau publik).

Kategori

Rincian

Tipe yang didukung

Source table

Mode runtime

Hanya mode streaming

Format data

Tidak berlaku

Metrik pemantauan spesifik

  • currentFetchEventTimeLag: Interval antara saat data dihasilkan dan saat data ditarik oleh Source Operator.

    Metrik ini hanya berlaku selama fase binary logging. Selama fase snapshot, nilainya selalu 0.

  • currentEmitEventTimeLag: Interval antara saat data dihasilkan dan saat data meninggalkan Source Operator.

    Metrik ini hanya berlaku selama fase binary logging. Selama fase snapshot, nilainya selalu 0.

  • sourceIdleTime: Durasi waktu tabel sumber dalam keadaan idle.

Tipe API

SQL

Mendukung pembaruan atau penghapusan pada tabel sink

Tidak

Fitur

Konektor PolarDB-X CDC mengoptimalkan kinerja selama fase parsing log biner dengan mendukung pemfilteran dan pemotongan log biner yang tidak relevan di sisi server, sehingga meningkatkan throughput dan menghemat lebar pita jaringan.

Contoh subscription log biner sesuai permintaan

Versi ini mendukung pemfilteran log biner di sisi server, hanya mengirimkan log perubahan yang diperlukan ke klien. Pendekatan ini mengurangi lalu lintas jaringan dan meningkatkan throughput konsumsi log.

Sebagai contoh, untuk berlangganan hanya pada data perubahan dari tabel db.table1 dan db.table2 pada server PolarDB-X, konfigurasikan job Flink SQL sebagai berikut:

CREATE TABLE polardbx_table_foo (
  ... -- Definisikan skema tabel di sini
) WITH (
  'connector' = 'polardbx-cdc',
  'database-name' = 'db',
  'table-name' = '.*',
  ..., -- Parameter lainnya
  'polardbx.binlog.include.tables' = 'db.table1,db.table2' -- Berlangganan data hanya dari tabel yang sesuai
);

Berbeda dengan konektor MySQL CDC yang memuat semua log perubahan dari seluruh instans untuk pemfilteran di sisi klien, konektor PolarDB-X CDC melakukan pemfilteran log biner di sisi server. Hal ini memungkinkan klien berlangganan log biner sesuai kebutuhan dan secara signifikan mengurangi overhead I/O jaringan.

Batasan

Pemfilteran log biner di sisi server dan subscription ke tabel tertentu memerlukan PolarDB-X versi 2.5.0 atau lebih baru serta komponen Simple Log Service versi 5.4.20 atau lebih baru.

SQL

Sintaksis

CREATE TABLE polardbx_customer_table(
  `id` STRING,
  [columnName dataType,]*
  PRIMARY KEY(`id`) NOT ENFORCED
) WITH (
  'connector' = 'polardbx-cdc',
  'hosts' = 'pxc-**************-pub.polarx.rds.aliyuncs.com',
  'username' = 'pdx_user',
  'password' = 'pdx_password',
  'database' = 'full_db',
  'collection' = 'customers'
)

DENGAN parameter

Parameter

Deskripsi

Tipe data

Wajib

Nilai default

Catatan

connector

Nama konektor.

STRING

Ya

Tidak ada

Nilainya harus polardbx-cdc.

hostname

Alamat IP atau hostname database PolarDB-X.

STRING

Ya

Tidak ada

Tentukan cluster endpoint dari instans tersebut.

port

Nomor port layanan database PolarDB-X.

INTEGER

Tidak

3306

Tidak ada.

username

Username untuk layanan database PolarDB-X.

STRING

Ya

Tidak ada

Tidak ada.

password

Password untuk layanan database PolarDB-X.

STRING

Ya

Tidak ada

Tidak ada.

database-name

Nama database PolarDB-X.

STRING

Ya

Tidak ada

Anda dapat menggunakan ekspresi reguler untuk membaca data dari beberapa database.

Catatan

Saat menggunakan ekspresi reguler, jangan gunakan simbol ^ dan $ untuk mencocokkan awal dan akhir string.

table-name

Nama tabel PolarDB-X.

STRING

Ya

Tidak ada

Anda dapat menggunakan ekspresi reguler untuk membaca data dari beberapa tabel.

Catatan

Saat menggunakan ekspresi reguler, jangan gunakan simbol ^ dan $ untuk mencocokkan awal dan akhir string.

server-time-zone

Zona waktu sesi yang digunakan oleh database.

STRING

Tidak

Zona waktu wilayah tempat job dijalankan.

Tentukan identifier zona waktu IANA, seperti Asia/Shanghai. Parameter ini mengontrol bagaimana tipe TIMESTAMP di tabel sumber dikonversi ke tipe STRING.

scan.incremental.snapshot.chunk.size

Ukuran (jumlah baris) setiap chunk saat membaca data dari snapshot inkremental.

INTEGER

Tidak

8096

PolarDB-X membagi tabel menjadi beberapa chunk untuk dibaca dan menyimpan data chunk tersebut di memori. Mengurangi jumlah baris per chunk akan meningkatkan jumlah total chunk. Hal ini memberikan pemulihan kesalahan yang lebih granular tetapi juga meningkatkan risiko error kehabisan memori (OOM) dan mengurangi throughput. Konfigurasikan ukuran chunk yang wajar untuk menyeimbangkan kinerja.

scan.snapshot.fetch.size

Jumlah maksimum catatan yang ditarik sekaligus saat membaca data lengkap dari tabel.

INTEGER

Tidak

1024

Tidak ada.

connect.timeout

Waktu maksimum menunggu sebelum mencoba koneksi ulang setelah koneksi ke server database PolarDB-X timeout.

DURATION

Tidak

30s

Tidak ada.

connection.pool.size

Ukuran kolam koneksi database.

INTEGER

Tidak

20

Kolam koneksi database menggunakan kembali koneksi untuk mengurangi jumlah koneksi ke database.

connect.max-retries

Jumlah maksimum percobaan ulang setelah koneksi ke layanan database MySQL gagal.

INTEGER

Tidak

3

Tidak ada.

scan.startup.mode

Mode startup untuk konsumsi data.

STRING

Tidak

initial

Nilai yang valid:

  • initial (default): Saat job pertama kali dijalankan, sistem memindai semua data historis lalu membaca data log biner terbaru.

  • latest-offset: Saat job pertama kali dijalankan, sistem tidak memindai data historis. Pembacaan dimulai dari akhir log biner, artinya hanya membaca perubahan yang terjadi setelah konektor dijalankan.

  • earliest-offset: Tidak memindai data historis. Pembacaan dimulai dari log biner paling awal yang tersedia.

  • specific-offset: Tidak memindai data historis. Pembacaan dimulai dari offset log biner tertentu. Anda dapat menentukan offset dengan mengonfigurasi scan.startup.specific-offset.file dan scan.startup.specific-offset.pos untuk memulai dari file dan posisi log biner tertentu. Anda juga dapat hanya mengonfigurasi scan.startup.specific-offset.gtid-set untuk memulai dari GTID set tertentu.

  • timestamp: Tidak memindai data historis. Pembacaan log biner dimulai dari timestamp tertentu. Timestamp ditentukan dalam milidetik melalui scan.startup.timestamp-millis.

Penting

Untuk mode startup earliest-offset, specific-offset, dan timestamp, skema tabel saat startup harus sesuai dengan skema pada offset yang ditentukan. Ketidaksesuaian skema akan menyebabkan job gagal. Pastikan skema tabel tidak berubah antara offset log biner yang ditentukan dan saat job dijalankan.

scan.startup.specific-offset.file

Nama file log biner untuk offset awal saat menggunakan mode offset spesifik.

STRING

Tidak

Tidak ada

Saat menggunakan parameter ini, Anda harus mengatur scan.startup.mode ke specific-offset. Contoh format nama file: mysql-bin.000003.

scan.startup.specific-offset.pos

Posisi dalam file log biner yang ditentukan untuk offset awal saat menggunakan mode offset spesifik.

INTEGER

Tidak

Tidak ada

Saat menggunakan parameter ini, Anda harus mengatur scan.startup.mode ke specific-offset.

scan.startup.specific-offset.gtid-set

GTID set untuk offset awal saat menggunakan mode offset spesifik.

STRING

Tidak

Tidak ada

Saat menggunakan parameter ini, Anda harus mengatur scan.startup.mode ke specific-offset. Contoh format GTID set: 24DA167-0C0C-11E8-8442-00059A3C7B00:1-19.

scan.startup.timestamp-millis

Timestamp dalam milidetik untuk offset awal saat menggunakan mode waktu spesifik.

LONG

Tidak

Tidak ada

Saat menggunakan parameter ini, Anda harus mengatur scan.startup.mode ke timestamp. Timestamp dalam satuan milidetik.

scan.startup.specific-offset.skip-events

Jumlah event log biner yang dilewati saat membaca dari offset tertentu.

INTEGER

Tidak

Tidak ada

Saat menggunakan parameter ini, Anda harus mengatur scan.startup.mode ke specific-offset.

scan.startup.specific-offset.skip-rows

Jumlah perubahan baris yang dilewati saat membaca dari offset tertentu. Satu event log biner dapat berkorespondensi dengan beberapa perubahan baris.

INTEGER

Tidak

Tidak ada

Saat menggunakan parameter ini, Anda harus mengatur scan.startup.mode ke specific-offset.

heartbeat.interval

Interval di mana sumber menggunakan event heartbeat untuk memajukan offset log biner.

DURATION

Tidak

Tidak ada

Event heartbeat memaksa offset log biner maju di sisi sumber. Mekanisme ini mencegah log biner kedaluwarsa karena pembaruan yang jarang terjadi. Log biner yang kedaluwarsa menyebabkan job gagal dan hanya dapat dipulihkan melalui restart tanpa status.

chunk-meta.group.size

Ukuran metadata chunk.

INTEGER

Tidak

1000

Jika metadata lebih besar dari nilai ini, metadata tersebut dibagi menjadi beberapa bagian untuk transmisi.

chunk-key.even-distribution.factor.upper-bound

Batas atas faktor distribusi chunk untuk sharding merata.

DOUBLE

Tidak

1000.0

Jika faktor distribusi lebih besar dari nilai ini, sharding tidak merata digunakan.

Faktor distribusi chunk = (MAX(kunci-chunk) - MIN(kunci-chunk) + 1) / Jumlah total baris.

chunk-key.even-distribution.factor.lower-bound

Batas bawah faktor distribusi chunk untuk sharding merata.

DOUBLE

Tidak

0.05

Jika faktor distribusi kurang dari nilai ini, sharding tidak merata digunakan.

Faktor distribusi chunk = (MAX(kunci-chunk) - MIN(kunci-chunk) + 1) / Jumlah total baris.

scan.newly-added-table.enabled

Menentukan apakah akan memindai tabel yang baru ditambahkan saat job restart dari checkpoint.

BOOLEAN

Tidak

false

Jika diaktifkan, sistem menyinkronkan tabel yang baru ditambahkan yang sebelumnya tidak cocok dan menghapus tabel yang tidak lagi cocok dari state. Ini berlaku saat restart dari checkpoint atau titik simpan.

scan.incremental.snapshot.chunk.key-column

Menentukan kolom yang digunakan untuk sharding data selama fase snapshot.

STRING

Lihat Catatan

Tidak ada

  • Wajib untuk tabel tanpa primary key. Kolom yang dipilih harus bertipe NOT NULL.

  • Opsional untuk tabel dengan primary key. Anda hanya dapat memilih satu kolom dari primary key.

scan.incremental.close-idle-reader.enabled

Menentukan apakah akan mematikan reader yang idle setelah fase snapshot berakhir.

BOOLEAN

Tidak

false

Agar konfigurasi ini berlaku, Anda juga harus mengatur execution.checkpointing.checkpoints-after-tasks-finish.enabled ke true.

scan.incremental.snapshot.backfill.skip

Menentukan apakah akan melewati backfill selama fase pembacaan snapshot.

BOOLEAN

Tidak

false

Nilai yang valid:

  • true: Melewati backfill selama fase pembacaan snapshot.

  • false (default): Tidak melewati backfill selama fase pembacaan snapshot.

Jika backfill dilewati, perubahan pada tabel selama fase snapshot akan dibaca pada fase inkremental berikutnya, bukan digabungkan ke dalam snapshot.

Penting

Melewati backfill dapat menyebabkan ketidakkonsistenan data karena perubahan yang terjadi selama fase snapshot mungkin diputar ulang. Hanya semantik at-least-once yang dijamin.

scan.parse.online.schema.changes.enabled

Pada fase inkremental, menentukan apakah akan mencoba mengurai event DDL perubahan lockless RDS.

BOOLEAN

Tidak

false

Nilai yang valid:

  • true: Mengurai event DDL perubahan lockless RDS.

  • false (default): Tidak mengurai event DDL perubahan lockless RDS.

Ini adalah fitur eksperimen. Sebelum melakukan perubahan lockless online, buat titik simpan untuk job Flink guna memudahkan pemulihan.

scan.only.deserialize.captured.tables.changelog.enabled

Pada fase inkremental, menentukan apakah hanya akan mendeserialisasi event perubahan untuk tabel yang ditentukan.

BOOLEAN

Tidak

true

Nilai yang valid:

  • true: Hanya mendeserialisasi data perubahan untuk tabel target guna mempercepat pembacaan log biner.

  • false (default): Mendeserialisasi data perubahan untuk semua tabel.

scan.read-changelog-as-append-only.enabled

Menentukan apakah akan mengonversi aliran changelog menjadi aliran append-only.

BOOLEAN

Tidak

false

Nilai yang valid:

  • true: Semua jenis pesan, termasuk INSERT, DELETE, UPDATE_BEFORE, dan UPDATE_AFTER, dikonversi menjadi pesan INSERT. Aktifkan opsi ini hanya dalam skenario tertentu, misalnya saat Anda perlu menyimpan pesan penghapusan dari tabel leluhur.

  • false (default): Semua jenis pesan dikirim ke downstream apa adanya.

scan.parallel-deserialize-changelog.enabled

Pada fase inkremental, menentukan apakah akan menggunakan beberapa thread untuk mengurai event perubahan.

BOOLEAN

Tidak

false

Nilai yang valid:

  • true: Menggunakan pemrosesan multi-thread selama fase deserialisasi event perubahan sambil mempertahankan urutan event log biner untuk mempercepat pembacaan.

  • false (default): Menggunakan pemrosesan single-thread selama fase deserialisasi event.

scan.parallel-deserialize-changelog.handler.size

Jumlah penanganan event saat menggunakan beberapa thread untuk mengurai event perubahan.

INTEGER

Tidak

2

Tidak ada.

scan.incremental.snapshot.unbounded-chunk-first.enabled

Menentukan apakah akan mendistribusikan chunk tak terbatas terlebih dahulu selama fase pembacaan snapshot.

BOOLEAN

Tidak

false

Nilai yang valid:

  • true: Mendistribusikan chunk tak terbatas terlebih dahulu selama fase pembacaan snapshot.

  • false (default): Tidak mendistribusikan chunk tak terbatas terlebih dahulu selama fase pembacaan snapshot.

Ini adalah fitur eksperimen. Mengaktifkannya dapat mengurangi risiko error OOM saat TaskManager menyinkronkan chunk terakhir selama fase snapshot. Tambahkan konfigurasi ini sebelum job dijalankan untuk pertama kalinya.

polardbx.binlog.ignore.archive-events.enabled

Menentukan apakah akan mengabaikan peristiwa arsip (terutama event `DELETE`) dalam log biner PolarDB-X.

BOOLEAN

Tidak

false

polardbx.binlog.ignore.query-events.enabled

Menentukan apakah akan mengabaikan event ROWS_QUERY_LOG_EVENT dalam log biner PolarDB-X.

BOOLEAN

Tidak

false

polardbx.binlog.include.tables

Berlangganan log biner hanya untuk tabel-tabel ini. Pisahkan beberapa nama tabel dengan koma (,).

STRING

Tidak

Tidak ada

polardbx.binlog.exclude.tables

Tidak berlangganan log biner untuk tabel-tabel ini. Pisahkan beberapa nama tabel dengan koma (,).

STRING

Tidak

Tidak ada

Pemetaan tipe

Tipe data PolarDB-X

Tipe data Flink

TINYINT

TINYINT

SMALLINT

SMALLINT

TINYINT UNSIGNED

TINYINT UNSIGNED ZEROFILL

INT

INT

MEDIUMINT

SMALLINT UNSIGNED

SMALLINT UNSIGNED ZEROFILL

BIGINT

BIGINT

INT UNSIGNED

INT UNSIGNED ZEROFILL

MEDIUMINT UNSIGNED

MEDIUMINT UNSIGNED ZEROFILL

BIGINT UNSIGNED

DECIMAL(20, 0)

BIGINT UNSIGNED ZEROFILL

SERIAL

FLOAT [UNSIGNED] [ZEROFILL]

FLOAT

DOUBLE [UNSIGNED] [ZEROFILL]

DOUBLE

DOUBLE PRECISION [UNSIGNED] [ZEROFILL]

REAL [UNSIGNED] [ZEROFILL]

NUMERIC(p, s) [UNSIGNED] [ZEROFILL]

DECIMAL(p, s)

DECIMAL(p, s) [UNSIGNED] [ZEROFILL]

BOOLEAN

BOOLEAN

TINYINT(1)

DATE

DATE

TIME [(p)]

TIME [(p)] [WITHOUT TIME ZONE]

DATETIME [(p)]

TIMESTAMP [(p)] [WITHOUT TIME ZONE]

TIMESTAMP [(p)]

TIMESTAMP [(p)]

TIMESTAMP [(p)] WITH LOCAL TIME ZONE

CHAR(n)

STRING

VARCHAR(n)

TEXT

BINARY

BYTES

VARBINARY

BLOB