Topik ini menjelaskan cara menggunakan konektor PolarDB-X.
Informasi latar belakang
PolarDB for Xscale (PolarDB-X) adalah layanan database terdistribusi cloud-native berkinerja tinggi dari Alibaba Cloud yang menawarkan throughput tinggi, kapasitas penyimpanan besar, latensi rendah, skalabilitas mudah, dan ketersediaan tinggi.
Konektor ini mendukung Ververica Runtime (VVR) 11.5 atau versi yang lebih baru dan hanya kompatibel dengan PolarDB-X 2.0 atau versi yang lebih baru.
Konektor PolarDB-X CDC hanya dapat digunakan sebagai tabel sumber. Untuk mengkueri tabel dimensi atau menulis ke tabel sink pada instans PolarDB-X, gunakan konektor MySQL (pratinjau publik).
Kategori | Rincian |
Tipe yang didukung | Source table |
Mode runtime | Hanya mode streaming |
Format data | Tidak berlaku |
Metrik pemantauan spesifik |
|
Tipe API | SQL |
Mendukung pembaruan atau penghapusan pada tabel sink | Tidak |
Fitur
Konektor PolarDB-X CDC mengoptimalkan kinerja selama fase parsing log biner dengan mendukung pemfilteran dan pemotongan log biner yang tidak relevan di sisi server, sehingga meningkatkan throughput dan menghemat lebar pita jaringan.
Contoh subscription log biner sesuai permintaan
Versi ini mendukung pemfilteran log biner di sisi server, hanya mengirimkan log perubahan yang diperlukan ke klien. Pendekatan ini mengurangi lalu lintas jaringan dan meningkatkan throughput konsumsi log.
Sebagai contoh, untuk berlangganan hanya pada data perubahan dari tabel db.table1 dan db.table2 pada server PolarDB-X, konfigurasikan job Flink SQL sebagai berikut:
CREATE TABLE polardbx_table_foo (
... -- Definisikan skema tabel di sini
) WITH (
'connector' = 'polardbx-cdc',
'database-name' = 'db',
'table-name' = '.*',
..., -- Parameter lainnya
'polardbx.binlog.include.tables' = 'db.table1,db.table2' -- Berlangganan data hanya dari tabel yang sesuai
);Berbeda dengan konektor MySQL CDC yang memuat semua log perubahan dari seluruh instans untuk pemfilteran di sisi klien, konektor PolarDB-X CDC melakukan pemfilteran log biner di sisi server. Hal ini memungkinkan klien berlangganan log biner sesuai kebutuhan dan secara signifikan mengurangi overhead I/O jaringan.
Batasan
Pemfilteran log biner di sisi server dan subscription ke tabel tertentu memerlukan PolarDB-X versi 2.5.0 atau lebih baru serta komponen Simple Log Service versi 5.4.20 atau lebih baru.
SQL
Sintaksis
CREATE TABLE polardbx_customer_table(
`id` STRING,
[columnName dataType,]*
PRIMARY KEY(`id`) NOT ENFORCED
) WITH (
'connector' = 'polardbx-cdc',
'hosts' = 'pxc-**************-pub.polarx.rds.aliyuncs.com',
'username' = 'pdx_user',
'password' = 'pdx_password',
'database' = 'full_db',
'collection' = 'customers'
)DENGAN parameter
Parameter | Deskripsi | Tipe data | Wajib | Nilai default | Catatan |
connector | Nama konektor. | STRING | Ya | Tidak ada | Nilainya harus polardbx-cdc. |
hostname | Alamat IP atau hostname database PolarDB-X. | STRING | Ya | Tidak ada | Tentukan cluster endpoint dari instans tersebut. |
port | Nomor port layanan database PolarDB-X. | INTEGER | Tidak | 3306 | Tidak ada. |
username | Username untuk layanan database PolarDB-X. | STRING | Ya | Tidak ada | Tidak ada. |
password | Password untuk layanan database PolarDB-X. | STRING | Ya | Tidak ada | Tidak ada. |
database-name | Nama database PolarDB-X. | STRING | Ya | Tidak ada | Anda dapat menggunakan ekspresi reguler untuk membaca data dari beberapa database. Catatan Saat menggunakan ekspresi reguler, jangan gunakan simbol ^ dan $ untuk mencocokkan awal dan akhir string. |
table-name | Nama tabel PolarDB-X. | STRING | Ya | Tidak ada | Anda dapat menggunakan ekspresi reguler untuk membaca data dari beberapa tabel. Catatan Saat menggunakan ekspresi reguler, jangan gunakan simbol ^ dan $ untuk mencocokkan awal dan akhir string. |
server-time-zone | Zona waktu sesi yang digunakan oleh database. | STRING | Tidak | Zona waktu wilayah tempat job dijalankan. | Tentukan identifier zona waktu IANA, seperti Asia/Shanghai. Parameter ini mengontrol bagaimana tipe TIMESTAMP di tabel sumber dikonversi ke tipe STRING. |
scan.incremental.snapshot.chunk.size | Ukuran (jumlah baris) setiap chunk saat membaca data dari snapshot inkremental. | INTEGER | Tidak | 8096 | PolarDB-X membagi tabel menjadi beberapa chunk untuk dibaca dan menyimpan data chunk tersebut di memori. Mengurangi jumlah baris per chunk akan meningkatkan jumlah total chunk. Hal ini memberikan pemulihan kesalahan yang lebih granular tetapi juga meningkatkan risiko error kehabisan memori (OOM) dan mengurangi throughput. Konfigurasikan ukuran chunk yang wajar untuk menyeimbangkan kinerja. |
scan.snapshot.fetch.size | Jumlah maksimum catatan yang ditarik sekaligus saat membaca data lengkap dari tabel. | INTEGER | Tidak | 1024 | Tidak ada. |
connect.timeout | Waktu maksimum menunggu sebelum mencoba koneksi ulang setelah koneksi ke server database PolarDB-X timeout. | DURATION | Tidak | 30s | Tidak ada. |
connection.pool.size | Ukuran kolam koneksi database. | INTEGER | Tidak | 20 | Kolam koneksi database menggunakan kembali koneksi untuk mengurangi jumlah koneksi ke database. |
connect.max-retries | Jumlah maksimum percobaan ulang setelah koneksi ke layanan database MySQL gagal. | INTEGER | Tidak | 3 | Tidak ada. |
scan.startup.mode | Mode startup untuk konsumsi data. | STRING | Tidak | initial | Nilai yang valid:
Penting Untuk mode startup earliest-offset, specific-offset, dan timestamp, skema tabel saat startup harus sesuai dengan skema pada offset yang ditentukan. Ketidaksesuaian skema akan menyebabkan job gagal. Pastikan skema tabel tidak berubah antara offset log biner yang ditentukan dan saat job dijalankan. |
scan.startup.specific-offset.file | Nama file log biner untuk offset awal saat menggunakan mode offset spesifik. | STRING | Tidak | Tidak ada | Saat menggunakan parameter ini, Anda harus mengatur scan.startup.mode ke specific-offset. Contoh format nama file: |
scan.startup.specific-offset.pos | Posisi dalam file log biner yang ditentukan untuk offset awal saat menggunakan mode offset spesifik. | INTEGER | Tidak | Tidak ada | Saat menggunakan parameter ini, Anda harus mengatur scan.startup.mode ke specific-offset. |
scan.startup.specific-offset.gtid-set | GTID set untuk offset awal saat menggunakan mode offset spesifik. | STRING | Tidak | Tidak ada | Saat menggunakan parameter ini, Anda harus mengatur scan.startup.mode ke specific-offset. Contoh format GTID set: |
scan.startup.timestamp-millis | Timestamp dalam milidetik untuk offset awal saat menggunakan mode waktu spesifik. | LONG | Tidak | Tidak ada | Saat menggunakan parameter ini, Anda harus mengatur scan.startup.mode ke timestamp. Timestamp dalam satuan milidetik. |
scan.startup.specific-offset.skip-events | Jumlah event log biner yang dilewati saat membaca dari offset tertentu. | INTEGER | Tidak | Tidak ada | Saat menggunakan parameter ini, Anda harus mengatur scan.startup.mode ke specific-offset. |
scan.startup.specific-offset.skip-rows | Jumlah perubahan baris yang dilewati saat membaca dari offset tertentu. Satu event log biner dapat berkorespondensi dengan beberapa perubahan baris. | INTEGER | Tidak | Tidak ada | Saat menggunakan parameter ini, Anda harus mengatur scan.startup.mode ke specific-offset. |
heartbeat.interval | Interval di mana sumber menggunakan event heartbeat untuk memajukan offset log biner. | DURATION | Tidak | Tidak ada | Event heartbeat memaksa offset log biner maju di sisi sumber. Mekanisme ini mencegah log biner kedaluwarsa karena pembaruan yang jarang terjadi. Log biner yang kedaluwarsa menyebabkan job gagal dan hanya dapat dipulihkan melalui restart tanpa status. |
chunk-meta.group.size | Ukuran metadata chunk. | INTEGER | Tidak | 1000 | Jika metadata lebih besar dari nilai ini, metadata tersebut dibagi menjadi beberapa bagian untuk transmisi. |
chunk-key.even-distribution.factor.upper-bound | Batas atas faktor distribusi chunk untuk sharding merata. | DOUBLE | Tidak | 1000.0 | Jika faktor distribusi lebih besar dari nilai ini, sharding tidak merata digunakan. Faktor distribusi chunk = (MAX(kunci-chunk) - MIN(kunci-chunk) + 1) / Jumlah total baris. |
chunk-key.even-distribution.factor.lower-bound | Batas bawah faktor distribusi chunk untuk sharding merata. | DOUBLE | Tidak | 0.05 | Jika faktor distribusi kurang dari nilai ini, sharding tidak merata digunakan. Faktor distribusi chunk = (MAX(kunci-chunk) - MIN(kunci-chunk) + 1) / Jumlah total baris. |
scan.newly-added-table.enabled | Menentukan apakah akan memindai tabel yang baru ditambahkan saat job restart dari checkpoint. | BOOLEAN | Tidak | false | Jika diaktifkan, sistem menyinkronkan tabel yang baru ditambahkan yang sebelumnya tidak cocok dan menghapus tabel yang tidak lagi cocok dari state. Ini berlaku saat restart dari checkpoint atau titik simpan. |
scan.incremental.snapshot.chunk.key-column | Menentukan kolom yang digunakan untuk sharding data selama fase snapshot. | STRING | Lihat Catatan | Tidak ada |
|
scan.incremental.close-idle-reader.enabled | Menentukan apakah akan mematikan reader yang idle setelah fase snapshot berakhir. | BOOLEAN | Tidak | false | Agar konfigurasi ini berlaku, Anda juga harus mengatur execution.checkpointing.checkpoints-after-tasks-finish.enabled ke true. |
scan.incremental.snapshot.backfill.skip | Menentukan apakah akan melewati backfill selama fase pembacaan snapshot. | BOOLEAN | Tidak | false | Nilai yang valid:
Jika backfill dilewati, perubahan pada tabel selama fase snapshot akan dibaca pada fase inkremental berikutnya, bukan digabungkan ke dalam snapshot. Penting Melewati backfill dapat menyebabkan ketidakkonsistenan data karena perubahan yang terjadi selama fase snapshot mungkin diputar ulang. Hanya semantik at-least-once yang dijamin. |
scan.parse.online.schema.changes.enabled | Pada fase inkremental, menentukan apakah akan mencoba mengurai event DDL perubahan lockless RDS. | BOOLEAN | Tidak | false | Nilai yang valid:
Ini adalah fitur eksperimen. Sebelum melakukan perubahan lockless online, buat titik simpan untuk job Flink guna memudahkan pemulihan. |
scan.only.deserialize.captured.tables.changelog.enabled | Pada fase inkremental, menentukan apakah hanya akan mendeserialisasi event perubahan untuk tabel yang ditentukan. | BOOLEAN | Tidak | true | Nilai yang valid:
|
scan.read-changelog-as-append-only.enabled | Menentukan apakah akan mengonversi aliran changelog menjadi aliran append-only. | BOOLEAN | Tidak | false | Nilai yang valid:
|
scan.parallel-deserialize-changelog.enabled | Pada fase inkremental, menentukan apakah akan menggunakan beberapa thread untuk mengurai event perubahan. | BOOLEAN | Tidak | false | Nilai yang valid:
|
scan.parallel-deserialize-changelog.handler.size | Jumlah penanganan event saat menggunakan beberapa thread untuk mengurai event perubahan. | INTEGER | Tidak | 2 | Tidak ada. |
scan.incremental.snapshot.unbounded-chunk-first.enabled | Menentukan apakah akan mendistribusikan chunk tak terbatas terlebih dahulu selama fase pembacaan snapshot. | BOOLEAN | Tidak | false | Nilai yang valid:
Ini adalah fitur eksperimen. Mengaktifkannya dapat mengurangi risiko error OOM saat TaskManager menyinkronkan chunk terakhir selama fase snapshot. Tambahkan konfigurasi ini sebelum job dijalankan untuk pertama kalinya. |
polardbx.binlog.ignore.archive-events.enabled | Menentukan apakah akan mengabaikan peristiwa arsip (terutama event `DELETE`) dalam log biner PolarDB-X. | BOOLEAN | Tidak | false | |
polardbx.binlog.ignore.query-events.enabled | Menentukan apakah akan mengabaikan event ROWS_QUERY_LOG_EVENT dalam log biner PolarDB-X. | BOOLEAN | Tidak | false | |
polardbx.binlog.include.tables | Berlangganan log biner hanya untuk tabel-tabel ini. Pisahkan beberapa nama tabel dengan koma (,). | STRING | Tidak | Tidak ada | |
polardbx.binlog.exclude.tables | Tidak berlangganan log biner untuk tabel-tabel ini. Pisahkan beberapa nama tabel dengan koma (,). | STRING | Tidak | Tidak ada |
Pemetaan tipe
Tipe data PolarDB-X | Tipe data Flink |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
TINYINT UNSIGNED | |
TINYINT UNSIGNED ZEROFILL | |
INT | INT |
MEDIUMINT | |
SMALLINT UNSIGNED | |
SMALLINT UNSIGNED ZEROFILL | |
BIGINT | BIGINT |
INT UNSIGNED | |
INT UNSIGNED ZEROFILL | |
MEDIUMINT UNSIGNED | |
MEDIUMINT UNSIGNED ZEROFILL | |
BIGINT UNSIGNED | DECIMAL(20, 0) |
BIGINT UNSIGNED ZEROFILL | |
SERIAL | |
FLOAT [UNSIGNED] [ZEROFILL] | FLOAT |
DOUBLE [UNSIGNED] [ZEROFILL] | DOUBLE |
DOUBLE PRECISION [UNSIGNED] [ZEROFILL] | |
REAL [UNSIGNED] [ZEROFILL] | |
NUMERIC(p, s) [UNSIGNED] [ZEROFILL] | DECIMAL(p, s) |
DECIMAL(p, s) [UNSIGNED] [ZEROFILL] | |
BOOLEAN | BOOLEAN |
TINYINT(1) | |
DATE | DATE |
TIME [(p)] | TIME [(p)] [WITHOUT TIME ZONE] |
DATETIME [(p)] | TIMESTAMP [(p)] [WITHOUT TIME ZONE] |
TIMESTAMP [(p)] | TIMESTAMP [(p)] |
TIMESTAMP [(p)] WITH LOCAL TIME ZONE | |
CHAR(n) | STRING |
VARCHAR(n) | |
TEXT | |
BINARY | BYTES |
VARBINARY | |
BLOB |