Topik ini menjelaskan cara menggunakan konektor MySQL.
Informasi latar belakang
Konektor MySQL mendukung semua database yang kompatibel dengan protokol MySQL, termasuk ApsaraDB RDS for MySQL, PolarDB for MySQL, OceanBase (mode MySQL), dan database MySQL yang dikelola sendiri.
Saat menggunakan konektor MySQL untuk membaca data dari OceanBase, pastikan binary logging diaktifkan dan dikonfigurasi dengan benar di OceanBase. Untuk informasi selengkapnya, lihat Operasi binary logging. Fitur ini sedang dalam pratinjau publik. Evaluasi kesesuaian dan gunakan dengan hati-hati sebelum penerapan.
Konektor MySQL mendukung fitur-fitur berikut.
Kategori | Detail |
Jenis yang didukung | Tabel sumber, tabel dimensi, dan tabel sink. Juga mendukung ingesti data dari sumber data. |
Mode eksekusi | Hanya mode streaming |
Format data | Tidak berlaku |
Metrik pemantauan kustom | |
Jenis API | DataStream, SQL, dan YAML ingesti data |
Dukungan untuk memperbarui atau menghapus data tabel sink | Ya |
Fitur utama
Tabel sumber CDC MySQL pertama-tama membaca seluruh data historis dari database, lalu beralih secara mulus ke pembacaan log biner. Pendekatan ini memastikan tidak ada data yang terlewat atau duplikat. Bahkan jika terjadi kegagalan, pemrosesan data menjamin semantik tepat-sekali. Tabel sumber CDC MySQL mendukung pembacaan konkuren data lengkap dan menggunakan algoritma snapshot inkremental untuk mencapai operasi tanpa penguncian serta transfer yang dapat dilanjutkan. Untuk informasi selengkapnya, lihat Tentang tabel sumber CDC MySQL.
Pemrosesan streaming dan batch terpadu: membaca data lengkap dan inkremental tanpa memelihara dua alur kerja terpisah.
Pembacaan konkuren data lengkap untuk skalabilitas horizontal.
Transisi mulus dari pembacaan data lengkap ke pembacaan inkremental, dengan skala-masuk otomatis untuk menghemat sumber daya komputasi.
Transfer yang dapat dilanjutkan selama pembacaan data lengkap guna meningkatkan stabilitas.
Pembacaan data lengkap tanpa penguncian untuk menghindari dampak pada bisnis online.
Dukungan pembacaan log cadangan dari ApsaraDB RDS for MySQL.
Penguraian paralel file log biner untuk latensi yang lebih rendah.
Prasyarat
Sebelum menggunakan tabel sumber CDC MySQL, selesaikan langkah-langkah dalam Konfigurasi MySQL. Langkah-langkah tersebut memastikan lingkungan Anda memenuhi prasyarat untuk menggunakan tabel sumber CDC MySQL.
ApsaraDB RDS for MySQL
Lakukan pengujian konektivitas jaringan antara ApsaraDB RDS for MySQL dan Realtime Compute for Apache Flink untuk memverifikasi konektivitas jaringan.
Versi MySQL yang didukung: 5.6, 5.7, atau 8.0.x.
Aktifkan binary logging. (Diaktifkan secara default.)
Atur format log biner ke ROW. (ROW adalah format default.)
Atur binlog_row_image ke FULL. (FULL adalah pengaturan default.)
Nonaktifkan Binary Log Transaction Compression. (Diperkenalkan di MySQL 8.0.20 dan seterusnya. Dinonaktifkan secara default.)
Seorang pengguna MySQL telah dibuat dengan hak istimewa SELECT, SHOW DATABASES, REPLICATION SLAVE, dan REPLICATION CLIENT.
Buat database dan tabel MySQL. Untuk informasi selengkapnya, lihat Buat database dan akun untuk ApsaraDB RDS for MySQL. Gunakan akun istimewa untuk membuat database guna mencegah kegagalan akibat izin yang tidak mencukupi.
Konfigurasikan daftar putih alamat IP. Untuk informasi selengkapnya, lihat Konfigurasi daftar putih alamat IP untuk ApsaraDB RDS for MySQL.
PolarDB for MySQL
Lakukan probe jaringan ke Realtime Compute for Apache Flink untuk memastikan konektivitas jaringan.
Versi MySQL yang didukung: 5.6, 5.7, atau 8.0.x.
Aktifkan binary logging. (Dinonaktifkan secara default.)
Atur format log biner ke ROW. (ROW adalah format default.)
Atur binlog_row_image ke FULL. (FULL adalah pengaturan default.)
Nonaktifkan Binary Log Transaction Compression. (Diperkenalkan di MySQL 8.0.20 dan seterusnya. Dinonaktifkan secara default.)
Buat pengguna MySQL dan berikan izin SELECT, SHOW DATABASES, REPLICATION SLAVE, dan REPLICATION CLIENT.
Buat database dan tabel MySQL. Untuk informasi selengkapnya, lihat Buat database dan akun untuk PolarDB for MySQL. Gunakan akun istimewa untuk membuat database guna mencegah kegagalan akibat izin yang tidak mencukupi.
Konfigurasikan daftar putih alamat IP. Untuk informasi selengkapnya, lihat Konfigurasi daftar putih alamat IP untuk PolarDB for MySQL.
Database MySQL yang dikelola sendiri
Lakukan probe jaringan pada Realtime Compute for Apache Flink untuk memastikan konektivitas jaringan.
Versi MySQL yang didukung: 5.6, 5.7, atau 8.0.x.
Aktifkan binary logging. (Dinonaktifkan secara default.)
Atur format log biner ke ROW. (STATEMENT adalah format default.)
Atur binlog_row_image ke FULL. (FULL adalah pengaturan default.)
Nonaktifkan Binary Log Transaction Compression. (Diperkenalkan di MySQL 8.0.20 dan seterusnya. Dinonaktifkan secara default.)
Buat pengguna MySQL dan berikan izin SELECT, SHOW DATABASES, REPLICATION SLAVE, dan REPLICATION CLIENT.
Buat database dan tabel MySQL. Untuk informasi selengkapnya, lihat Buat database dan akun untuk database MySQL yang dikelola sendiri. Gunakan akun istimewa untuk membuat database guna mencegah kegagalan akibat izin yang tidak mencukupi.
Konfigurasikan daftar putih alamat IP. Untuk informasi selengkapnya, lihat Konfigurasi daftar putih alamat IP untuk database MySQL yang dikelola sendiri.
Batasan
Batasan umum
Tabel sumber CDC MySQL saat ini tidak mendukung pendefinisian watermark.
Pada pekerjaan CTAS dan CDAS, tabel sumber CDC MySQL dapat menyinkronkan beberapa perubahan skema. Untuk detail jenis perubahan yang didukung, lihat Kebijakan sinkronisasi evolusi skema.
Konektor CDC MySQL tidak mendukung Binary Log Transaction Compression. Saat mengonsumsi data inkremental, pastikan fitur ini dinonaktifkan. Jika tidak, data inkremental mungkin gagal dimuat.
ApsaraDB RDS for MySQL batasan
Jangan membaca data dari instans standby atau instans hanya baca ApsaraDB RDS for MySQL. Log biner pada instans ini disimpan dalam periode singkat secara default. Jika log biner kedaluwarsa dan dibersihkan, pekerjaan mungkin gagal mengonsumsinya.
ApsaraDB RDS for MySQL mengaktifkan replikasi paralel secara default tetapi tidak menjamin konsistensi urutan transaksi antara instans primer dan sekunder. Setelah alih bencana primer/sekunder, pemulihan checkpoint mungkin melewatkan beberapa data. Untuk menghindari hal ini, aktifkan opsi slave_preserve_commit_order di ApsaraDB RDS for MySQL.
PolarDB for MySQL batasan
Tabel sumber CDC MySQL tidak mendukung pembacaan log biner dari kluster multi-master di PolarDB for MySQL versi 1.0.19 dan sebelumnya. (Apa itu kluster multi-master?) Log biner dari kluster ini mungkin berisi ID tabel duplikat, yang dapat menyebabkan kesalahan pemetaan skema dan kegagalan penguraian.
MySQL open-source batasan
Secara default, MySQL mempertahankan urutan transaksi selama replikasi log biner primer-replika. Jika replikasi paralel diaktifkan pada replika MySQL (slave_parallel_workers > 1) tetapi slave_preserve_commit_order = ON tidak diaktifkan, urutan commit transaksi mungkin tidak sesuai dengan database primer. Perilaku commit yang tidak berurutan ini dapat menyebabkan kehilangan data ketika Flink CDC melanjutkan dari checkpoint. Untuk mencegah masalah ini, kami merekomendasikan menyetel slave_preserve_commit_order = ON pada replika MySQL. Atau, Anda dapat menyetel slave_parallel_workers = 1, yang mengorbankan kinerja replikasi.
Catatan penting
Tabel sink
Primary key auto-increment tidak dideklarasikan dalam DDL. MySQL mengisinya secara otomatis selama penulisan data.
Minimal satu bidang non-primary-key harus dideklarasikan. Jika tidak, terjadi kesalahan.
NOT ENFORCED dalam DDL berarti Flink tidak menegakkan validasi primary key. Anda harus memastikan kebenaran dan integritas primary key. Untuk informasi selengkapnya, lihat Pemeriksaan validitas.
Tabel dimensi
Untuk mempercepat kueri menggunakan indeks, bidang kondisi JOIN harus sesuai dengan urutan definisi indeks (aturan prefiks paling kiri). Misalnya, jika indeks adalah (a, b, c), kondisi JOIN harus
ON t.a = x AND t.b = y.SQL yang dihasilkan Flink mungkin ditulis ulang oleh pengoptimal, sehingga mencegah penggunaan indeks. Untuk memastikan apakah indeks digunakan, periksa rencana eksekusi (EXPLAIN) atau log kueri lambat di MySQL untuk pernyataan SELECT yang sebenarnya dieksekusi.
SQL
Anda dapat menggunakan konektor MySQL dalam pekerjaan SQL sebagai tabel sumber, tabel dimensi, atau tabel sink.
Sintaks
CREATE TEMPORARY TABLE mysqlcdc_source (
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
'connector' = 'mysql',
'hostname' = '<yourHostname>',
'port' = '3306',
'username' = '<yourUsername>',
'password' = '<yourPassword>',
'database-name' = '<yourDatabaseName>',
'table-name' = '<yourTableName>'
);Cara konektor menulis ke tabel sink: Setiap catatan masuk diubah menjadi pernyataan INSERT dan dieksekusi. SQL yang tepat tergantung pada skenario:
Untuk tabel sink tanpa primary key, konektor mengeksekusi
INSERT INTO table_name (column1, column2, ...) VALUES (value1, value2, ...);.Untuk tabel sink dengan primary key, konektor mengeksekusi
INSERT INTO table_name (column1, column2, ...) VALUES (value1, value2, ...) ON DUPLICATE KEY UPDATE column1 = VALUES(column1), column2 = VALUES(column2), ...;. Catatan: Jika tabel fisik memiliki kendala indeks unik selain primary key, memasukkan dua catatan dengan primary key berbeda tetapi nilai indeks unik identik menyebabkan kehilangan data akibat konflik.
Jika database MySQL Anda mendefinisikan primary key auto-increment, jangan deklarasikan dalam DDL Flink. MySQL mengisi bidang ini secara otomatis selama penulisan data. Konektor mendukung penulisan dan penghapusan data dengan kolom auto-increment tetapi tidak mendukung pembaruan.
dengan parameter
Parameter umum
Parameter
Deskripsi
Wajib
Tipe data
Nilai default
Catatan
connector
Jenis tabel.
Ya
STRING
Tidak ada
Saat digunakan sebagai tabel sumber, Anda dapat menentukan
mysql-cdcataumysql. Keduanya setara. Saat digunakan sebagai tabel dimensi atau sink, nilainya tetapmysql.hostname
Alamat IP atau hostname database MySQL.
Ya
STRING
Tidak ada
Kami merekomendasikan menentukan titik akhir virtual private cloud (VPC).
CatatanJika MySQL dan Realtime Compute for Apache Flink tidak berada dalam VPC yang sama, buat koneksi lintas-VPC atau gunakan titik akhir publik. Untuk informasi selengkapnya, lihat Manajemen dan operasi penyimpanan dan Bagaimana cara mengakses Internet dari kluster Flink yang sepenuhnya dikelola?.
username
Nama pengguna untuk layanan database MySQL.
Ya
STRING
Tidak ada
Tidak ada.
password
Kata sandi untuk layanan database MySQL.
Ya
STRING
Tidak ada
Tidak ada.
database-name
Nama database MySQL.
Ya
STRING
Tidak ada
Saat digunakan sebagai tabel sumber, nama database mendukung ekspresi reguler untuk membaca data dari beberapa database.
Saat menggunakan ekspresi reguler, hindari penggunaan karakter ^ dan $ untuk mencocokkan awal dan akhir string. Lihat catatan untuk table-name untuk detailnya.
table-name
Nama tabel MySQL.
Ya
STRING
Tidak ada
Saat digunakan sebagai tabel sumber, nama tabel mendukung ekspresi reguler untuk membaca data dari beberapa tabel.
Saat membaca beberapa tabel MySQL, kirim beberapa pernyataan CTAS sebagai satu Pekerjaan untuk menghindari peluncuran beberapa Pendengar log biner. Hal ini meningkatkan performa dan efisiensi. Untuk informasi selengkapnya, lihat Beberapa pernyataan CTAS: Kirim sebagai satu Pekerjaan.
Saat menggunakan ekspresi reguler, hindari penggunaan karakter ^ dan $ untuk mencocokkan awal dan akhir string. Lihat catatan di bawah untuk detailnya.
CatatanSaat mencocokkan nama tabel menggunakan ekspresi reguler, tabel sumber CDC MySQL menggabungkan nilai database-name dan table-name dengan string \\. (atau . untuk VVR 8.0.1 dan sebelumnya) untuk membentuk ekspresi reguler jalur lengkap. Ekspresi ini kemudian dicocokkan dengan nama tabel lengkap di MySQL.
Misalnya, jika Anda mengonfigurasi 'database-name'='db_.*' dan 'table-name'='tb_.+', konektor menggunakan ekspresi reguler db_.*\\.tb_.+ (atau db_.*.tb_.+ untuk VVR 8.0.1 dan sebelumnya) untuk menentukan tabel mana yang akan dibaca.
port
Nomor port layanan database MySQL.
Tidak
INTEGER
3306
Tidak ada.
Hanya untuk tabel sumber
Parameter
Deskripsi
Wajib
Tipe data
Nilai default
Catatan
server-id
ID numerik untuk klien database.
Tidak
STRING
Nilai acak antara 5400 dan 6400 dihasilkan secara default.
ID ini harus unik secara global di seluruh kluster MySQL. Kami merekomendasikan memberikan ID berbeda untuk setiap pekerjaan yang mengakses database yang sama.
Parameter ini juga mendukung format rentang, seperti 5400-5408. Saat pembacaan inkremental diaktifkan, beberapa pembaca konkuren didukung. Dalam kasus ini, tentukan rentang agar setiap pembaca menggunakan ID unik. Untuk informasi selengkapnya, lihat Menggunakan ID server.
scan.incremental.snapshot.enabled
Apakah akan mengaktifkan snapshot inkremental.
Tidak
BOOLEAN
true
Snapshot inkremental diaktifkan secara default. Snapshot inkremental adalah mekanisme baru untuk membaca snapshot data lengkap. Dibandingkan dengan snapshot tradisional, snapshot inkremental menawarkan beberapa keunggulan:
Pembacaan konkuren data lengkap.
Checkpointing pada level chunk selama pembacaan data lengkap.
Tidak memerlukan penguncian baca global (FLUSH TABLES WITH READ LOCK) selama pembacaan data lengkap.
Jika Anda ingin sumber mendukung pembacaan konkuren, setiap pembaca memerlukan ID server unik. Oleh karena itu, server-id harus berupa rentang seperti 5400-6400, dan ukuran rentang harus minimal sama dengan tingkat paralelisme.
CatatanKonfigurasi ini dihapus di VVR 11.1 dan seterusnya.
scan.incremental.snapshot.chunk.size
Jumlah baris per chunk.
Tidak
INTEGER
8096
Saat snapshot inkremental diaktifkan, tabel dibagi menjadi chunk untuk dibaca. Data dari setiap chunk di-cache dalam memori hingga chunk tersebut sepenuhnya dibaca.
Ukuran chunk yang lebih kecil meningkatkan jumlah total chunk. Meskipun ini mengurangi granularitas pemulihan, hal ini dapat menyebabkan kesalahan kehabisan memori (OOM) dan mengurangi throughput keseluruhan. Pertimbangkan pertukaran ini saat memilih ukuran chunk.
scan.snapshot.fetch.size
Jumlah maksimum catatan yang diambil per bacaan saat memindai data tabel lengkap.
Tidak
INTEGER
1024
Tidak ada.
scan.startup.mode
Mode startup untuk mengonsumsi data.
Tidak
STRING
initial
Nilai yang valid:
initial (default): Saat startup pertama, memindai data historis lengkap lalu membaca data log biner terbaru.
latest-offset: Saat startup pertama, melewati pemindaian data lengkap dan mulai membaca dari akhir log biner (posisi terbaru). Hanya membaca perubahan yang terjadi setelah konektor dimulai.
earliest-offset: Melewati pemindaian data lengkap dan mulai membaca dari posisi log biner paling awal yang tersedia.
specific-offset: Melewati pemindaian data lengkap dan mulai membaca dari offset log biner tertentu. Tentukan offset menggunakan scan.startup.specific-offset.file dan scan.startup.specific-offset.pos, atau gunakan scan.startup.specific-offset.gtid-set untuk memulai dari set GTID.
timestamp: Melewati pemindaian data lengkap dan mulai membaca dari timestamp tertentu. Tentukan timestamp menggunakan scan.startup.timestamp-millis, dalam milidetik.
PentingSaat menggunakan earliest-offset, specific-offset, atau timestamp, pastikan skema tabel tetap tidak berubah antara posisi log biner yang ditentukan dan startup pekerjaan. Perubahan skema dapat menyebabkan kesalahan.
scan.startup.specific-offset.file
Nama file log biner untuk offset startup tertentu.
Tidak
STRING
Tidak ada
Saat menggunakan konfigurasi ini, scan.startup.mode harus diatur ke specific-offset. Contoh nama file:
mysql-bin.000003.scan.startup.specific-offset.pos
Offset dalam file log biner yang ditentukan untuk posisi startup.
Tidak
INTEGER
Tidak ada
Saat menggunakan konfigurasi ini, scan.startup.mode harus diatur ke specific-offset.
scan.startup.specific-offset.gtid-set
Set GTID untuk posisi startup.
Tidak
STRING
Tidak ada
Saat menggunakan konfigurasi ini, scan.startup.mode harus diatur ke specific-offset. Contoh set GTID:
24DA167-0C0C-11E8-8442-00059A3C7B00:1-19.scan.startup.timestamp-millis
Timestamp startup dalam milidetik.
Tidak
LONG
Tidak ada
Saat menggunakan konfigurasi ini, scan.startup.mode harus diatur ke timestamp. Satuan timestamp adalah milidetik.
PentingSaat menggunakan timestamp, CDC MySQL mencoba membaca event awal dari setiap file log biner untuk menentukan timestamp-nya dan menemukan file yang sesuai. Pastikan timestamp yang ditentukan sesuai dengan file log biner yang ada dan dapat dibaca di database.
server-time-zone
Zona waktu sesi yang digunakan oleh database.
Tidak
STRING
Jika Anda tidak menentukan parameter ini, sistem menggunakan zona waktu lingkungan runtime pekerjaan Flink (zona waktu zona yang dipilih) sebagai zona waktu server database.
Misalnya, Asia/Shanghai. Parameter ini mengontrol bagaimana tipe TIMESTAMP MySQL dikonversi ke tipe STRING. Untuk informasi selengkapnya, lihat Nilai temporal Debezium.
debezium.min.row.count.to.stream.results
Saat jumlah baris tabel melebihi nilai ini, gunakan mode pembacaan batch.
Tidak
INTEGER
1000
Flink membaca data dari tabel sumber MySQL sebagai berikut:
Pembacaan lengkap: Memuat seluruh tabel ke memori. Cepat tetapi intensif memori. Tabel besar berisiko mengalami kesalahan OOM.
Pembacaan batch: Membaca data dalam batch. Hemat memori tetapi lebih lambat untuk tabel besar.
connect.timeout
Waktu maksimum menunggu sebelum mencoba koneksi ulang setelah timeout saat menghubungkan ke server database MySQL.
Tidak
DURATION
30s
Tidak ada.
connect.max-retries
Jumlah maksimum percobaan ulang setelah koneksi ke layanan database MySQL gagal.
Tidak
INTEGER
3
Tidak ada.
connection.pool.size
Ukuran kolam koneksi database.
Tidak
INTEGER
20
Kolam koneksi database menggunakan kembali koneksi untuk mengurangi jumlah koneksi database.
jdbc.properties.*
Parameter koneksi kustom untuk URL JDBC.
Tidak
STRING
Tidak ada
Anda dapat meneruskan parameter koneksi kustom. Misalnya, untuk menonaktifkan SSL, atur 'jdbc.properties.useSSL' = 'false'.
Untuk parameter koneksi yang didukung, lihat Properti Konfigurasi MySQL.
debezium.*
Parameter Debezium kustom untuk membaca log biner.
Tidak
STRING
Tidak ada
Anda dapat meneruskan parameter Debezium kustom. Misalnya, gunakan 'debezium.event.deserialization.failure.handling.mode'='ignore' untuk menentukan cara menangani kesalahan penguraian.
heartbeat.interval
Interval di mana event heartbeat memajukan offset log biner di sumber.
Tidak
DURATION
30s
Event heartbeat memajukan offset log biner di sumber. Ini berguna untuk tabel dengan pembaruan jarang. Tanpa heartbeat, offset log biner mungkin macet, menyebabkan kedaluwarsa dan kegagalan pekerjaan. Heartbeat mencegah masalah ini.
scan.incremental.snapshot.chunk.key-column
Tentukan kolom untuk digunakan dalam membagi chunk selama fase snapshot.
Lihat Catatan.
STRING
Tidak ada
Wajib untuk tabel tanpa primary key. Kolom yang dipilih harus non-null (NOT NULL).
Opsional untuk tabel dengan primary key. Hanya kolom dari primary key yang didukung.
rds.region-id
ID wilayah instans Alibaba Cloud ApsaraDB RDS for MySQL.
Wajib saat membaca log arsip dari OSS.
STRING
Tidak ada
Untuk ID wilayah, lihat Wilayah dan zona.
rds.access-key-id
ID AccessKey untuk akun Alibaba Cloud ApsaraDB RDS for MySQL.
Wajib saat membaca log arsip dari OSS.
STRING
Tidak ada
Untuk informasi selengkapnya, lihat Bagaimana cara melihat ID AccessKey dan rahasia AccessKey saya?.
PentingUntuk mencegah eksposur informasi AccessKey Anda, kami merekomendasikan menggunakan manajemen kunci untuk mengonfigurasi ID AccessKey. Untuk informasi selengkapnya, lihat Manajemen variabel.
rds.access-key-secret
Rahasia AccessKey untuk akun Alibaba Cloud ApsaraDB RDS for MySQL.
Wajib saat membaca log arsip dari OSS.
STRING
Tidak ada
Untuk informasi selengkapnya, lihat Bagaimana cara melihat ID AccessKey dan rahasia AccessKey saya?.
PentingUntuk mencegah eksposur informasi AccessKey Anda, kami merekomendasikan menggunakan manajemen kunci untuk mengonfigurasi rahasia AccessKey. Untuk informasi selengkapnya, lihat Manajemen variabel.
rds.db-instance-id
ID instans Alibaba Cloud ApsaraDB RDS for MySQL.
Wajib saat membaca log arsip dari OSS.
STRING
Tidak ada
Tidak ada.
rds.main-db-id
ID database primer instans Alibaba Cloud ApsaraDB RDS for MySQL.
Tidak
STRING
Tidak ada
Untuk petunjuk mengambil ID database primer, lihat Cadangan log ApsaraDB RDS for MySQL.
Didukung hanya di VVR 8.0.7 dan seterusnya.
rds.download.timeout
Timeout untuk mengunduh satu log arsip dari OSS.
Tidak
DURATION
60s
Tidak ada.
rds.endpoint
Titik akhir untuk mengakses informasi log biner OSS.
Tidak
STRING
Tidak ada
Untuk nilai yang valid, lihat Titik akhir layanan.
Didukung hanya di VVR 8.0.8 dan seterusnya.
scan.incremental.close-idle-reader.enabled
Apakah akan menutup pembaca idle setelah penyelesaian snapshot.
Tidak
BOOLEAN
false
Didukung hanya di VVR 8.0.1 dan seterusnya.
Pengaturan ini memerlukan execution.checkpointing.checkpoints-after-tasks-finish.enabled diatur ke true.
scan.read-changelog-as-append-only.enabled
Apakah akan mengonversi aliran data changelog menjadi aliran data append-only.
Tidak
BOOLEAN
false
Nilai yang valid:
true: Semua jenis pesan (INSERT, DELETE, UPDATE_BEFORE, UPDATE_AFTER) dikonversi menjadi pesan INSERT. Gunakan hanya dalam kasus khusus, seperti mempertahankan pesan penghapusan tabel upstream.
false (default): Semua jenis pesan diteruskan tanpa perubahan.
CatatanDidukung hanya di VVR 8.0.8 dan seterusnya.
scan.only.deserialize.captured.tables.changelog.enabled
Selama fase inkremental, apakah hanya mendeserialisasi event perubahan untuk tabel yang ditentukan.
Tidak
BOOLEAN
Default adalah false di VVR 8.x.
Default adalah true di VVR 11.1 dan seterusnya.
Nilai yang valid:
true: Hanya mendeserialisasi data perubahan untuk tabel target untuk mempercepat pembacaan log biner.
false (default): Mendeserialisasi data perubahan untuk semua tabel.
CatatanDidukung hanya di VVR 8.0.7 dan seterusnya.
Di VVR 8.0.8 dan sebelumnya, gunakan debezium.scan.only.deserialize.captured.tables.changelog.enable.
scan.parse.online.schema.changes.enabled
Selama fase inkremental, apakah mencoba mengurai event perubahan DDL lockless RDS.
Tidak
BOOLEAN
false
Nilai yang valid:
true: Mengurai event perubahan DDL lockless RDS.
false (default): Tidak mengurai event perubahan DDL lockless RDS.
Fitur eksperimental. Sebelum melakukan perubahan lockless online, ambil snapshot pekerjaan Flink untuk pemulihan.
CatatanDidukung hanya di VVR 11.1 dan seterusnya.
scan.incremental.snapshot.backfill.skip
Apakah akan melewati backfill selama pembacaan snapshot.
Tidak
BOOLEAN
false
Nilai yang valid:
true: Melewati backfill selama pembacaan snapshot.
false (default): Tidak melewati backfill selama pembacaan snapshot.
Jika backfill dilewati, perubahan yang terjadi selama fase snapshot dibaca dalam fase inkremental berikutnya daripada digabungkan ke dalam snapshot.
PentingMelewati backfill dapat menyebabkan inkonsistensi data karena perubahan yang terjadi selama fase snapshot mungkin diputar ulang. Ini hanya memberikan semantik at-least-once.
CatatanDidukung hanya di VVR 11.1 dan seterusnya.
scan.incremental.snapshot.unbounded-chunk-first.enabled
Apakah akan mendistribusikan chunk tak terbatas terlebih dahulu selama pembacaan snapshot.
Tidak
BOOELEAN
false
Nilai yang valid:
true: Mendistribusikan chunk tak terbatas terlebih dahulu selama pembacaan snapshot.
false (default): Tidak mendistribusikan chunk tak terbatas terlebih dahulu selama pembacaan snapshot.
Fitur eksperimental. Mengaktifkan ini mengurangi risiko kesalahan kehabisan memori (OOM) untuk TaskManager selama sinkronisasi chunk terakhir dalam fase snapshot. Kami merekomendasikan mengaktifkannya sebelum startup pekerjaan pertama.
CatatanDidukung hanya di VVR 11.1 dan seterusnya.
binlog.session.network.timeout
Timeout jaringan untuk koneksi log biner.
Tidak
DURATION
10m
Menyetel ini ke 0s menggunakan timeout default server MySQL.
CatatanDidukung hanya di VVR 11.5 dan seterusnya.
scan.rate-limit.records-per-second
Batasi jumlah maksimum catatan yang dipancarkan per detik oleh sumber.
Tidak
LONG
Tidak ada
Berguna untuk membatasi konsumsi data. Berlaku untuk fase lengkap dan inkremental.
Metrik
numRecordsOutPerSecondmencerminkan total catatan yang dipancarkan per detik. Sesuaikan parameter ini berdasarkan metrik tersebut.Selama pembacaan lengkap, kurangi ukuran batch untuk melengkapi batasan ini. Turunkan nilai parameter
scan.incremental.snapshot.chunk.size.CatatanDidukung hanya di VVR 11.5 dan seterusnya.
scan.binlog.tolerate.gtid-holes
Mengaktifkan parameter ini mengabaikan celah dalam urutan GTID, memungkinkan pekerjaan melewati event diskontinu dan terus berjalan.
Tidak
BOOLEAN
false
Sebelum mengaktifkan parameter ini, pastikan offset startup pekerjaan belum kedaluwarsa. Jika pekerjaan dimulai dari offset GTID yang dibersihkan atau kedaluwarsa, mesin diam-diam melewati log yang hilang, mengakibatkan kehilangan data.
CatatanDidukung hanya di VVR 11.6 dan seterusnya.
Parameter khusus tabel dimensi
Parameter
Deskripsi
Wajib
Tipe data
Nilai default
Catatan
url
URL JDBC MySQL.
Tidak
STRING
Tidak ada
Format URL adalah:
jdbc:mysql://<endpoint>:<port>/<database-name>.lookup.max-retries
Jumlah maksimum percobaan ulang setelah operasi baca gagal.
Tidak
INTEGER
3
Didukung hanya di VVR 6.0.7 dan seterusnya.
lookup.cache.strategy
Kebijakan cache.
Tidak
STRING
Tidak ada
Kebijakan yang didukung: None, LRU, dan ALL. Untuk deskripsi, lihat Informasi latar belakang.
CatatanSaat menggunakan kebijakan cache LRU, Anda juga harus mengonfigurasi parameter lookup.cache.max-rows.
lookup.cache.max-rows
Jumlah maksimum baris yang di-cache.
Tidak
INTEGER
100000
Saat kebijakan cache least recently used dipilih, ukuran cache harus diatur.
Opsional saat menggunakan kebijakan cache ALL.
lookup.cache.ttl
Waktu hidup (TTL) cache.
Tidak
DURATION
10 s
Pengaturan lookup.cache.ttl tergantung pada lookup.cache.strategy:
Jika lookup.cache.strategy adalah None, lookup.cache.ttl opsional dan menunjukkan tidak ada TTL.
Jika lookup.cache.strategy adalah LRU, lookup.cache.ttl adalah TTL cache. Default adalah tidak kedaluwarsa.
Jika lookup.cache.strategy adalah ALL, lookup.cache.ttl adalah interval reload cache. Default adalah tidak reload.
Tentukan waktu dalam format seperti 1min atau 10s.
lookup.max-join-rows
Jumlah maksimum hasil yang dikembalikan saat menggabungkan setiap baris dari tabel utama dengan tabel dimensi.
Tidak
INTEGER
1024
Tidak ada.
lookup.filter-push-down.enabled
Apakah akan mengaktifkan filter pushdown untuk tabel dimensi.
Tidak
BOOLEAN
false
Nilai yang valid:
true: Aktifkan filter pushdown. Tabel dimensi memfilter data lebih awal berdasarkan kondisi yang ditentukan dalam pekerjaan SQL.
false (default): Nonaktifkan filter pushdown. Tabel dimensi memuat semua data.
CatatanDidukung hanya di VVR 8.0.7 dan seterusnya.
PentingAktifkan filter pushdown hanya saat tabel Flink digunakan sebagai tabel dimensi. Tabel sumber MySQL tidak mendukung filter pushdown. Jika tabel Flink digunakan sebagai tabel sumber dan dimensi, dan filter pushdown diaktifkan untuk tabel dimensi, atur eksplisit konfigurasi ini ke false menggunakan petunjuk SQL saat menggunakannya sebagai tabel sumber. Jika tidak, eksekusi pekerjaan mungkin gagal.
Hanya untuk tabel sink
Parameter
Deskripsi
Wajib
Tipe data
Nilai default
Catatan
url
URL JDBC MySQL.
Tidak
STRING
Tidak ada
Format URL adalah:
jdbc:mysql://<endpoint>:<port>/<database-name>.sink.max-retries
Jumlah maksimum percobaan ulang setelah operasi tulis gagal.
Tidak
INTEGER
3
Tidak ada.
sink.buffer-flush.batch-size
Jumlah catatan yang ditulis dalam satu batch.
Tidak
INTEGER
4096
Tidak ada.
sink.buffer-flush.max-rows
Jumlah baris yang di-cache dalam memori.
Tidak
INTEGER
10000
Parameter ini hanya berlaku saat primary key ditentukan.
sink.buffer-flush.interval
Interval untuk flush buffer. Jika data yang di-buffer menunggu lebih lama dari interval ini tanpa memenuhi kondisi flush, sistem secara otomatis flush semua data yang di-buffer.
Tidak
DURATION
1s
Tidak ada.
sink.ignore-delete
Apakah akan mengabaikan operasi DELETE.
Tidak
BOOLEAN
false
Saat aliran SQL Flink berisi catatan DELETE atau UPDATE_BEFORE, pembaruan simultan ke bidang berbeda dari tabel yang sama oleh beberapa tugas dapat menyebabkan inkonsistensi data.
Misalnya, jika catatan dihapus dan tugas lain hanya memperbarui beberapa bidang, bidang yang tidak diperbarui menjadi null atau nilai default, menyebabkan kesalahan data.
Menyetel sink.ignore-delete ke true mengabaikan operasi DELETE dan UPDATE_BEFORE upstream, menghindari masalah ini.
CatatanUPDATE_BEFORE adalah bagian dari mekanisme retract Flink, digunakan untuk "menarik kembali" nilai lama selama pembaruan.
Saat ignoreDelete = true, semua catatan DELETE dan UPDATE_BEFORE dilewati. Hanya catatan INSERT dan UPDATE_AFTER yang diproses.
sink.ignore-null-when-update
Saat memperbarui data, apakah akan memperbarui bidang ke null atau melewati pembaruan jika nilai input adalah null.
Tidak
BOOLEAN
false
Nilai yang valid:
true: Lewati pembaruan bidang. Didukung hanya saat tabel Flink memiliki primary key. Saat diatur ke true:
Di VVR 8.0.6 dan sebelumnya, penulisan batch tidak didukung untuk tabel sink.
Di VVR 8.0.7 dan seterusnya, penulisan batch didukung untuk tabel sink.
Penulisan batch meningkatkan efisiensi dan throughput penulisan tetapi memperkenalkan latensi dan risiko OOM. Pertimbangkan pertukaran ini berdasarkan kebutuhan bisnis Anda.
false: Perbarui bidang ke null.
CatatanDidukung hanya di VVR 8.0.5 dan seterusnya.
Pemetaan tipe data
Tabel sumber CDC
Tipe bidang CDC MySQL
Tipe bidang Flink
TINYINT
TINYINT
SMALLINT
SMALLINT
TINYINT UNSIGNED
TINYINT UNSIGNED ZEROFILL
INT
INT
MEDIUMINT
SMALLINT UNSIGNED
SMALLINT UNSIGNED ZEROFILL
BIGINT
BIGINT
INT UNSIGNED
INT UNSIGNED ZEROFILL
MEDIUMINT UNSIGNED
MEDIUMINT UNSIGNED ZEROFILL
BIGINT UNSIGNED
DECIMAL(20, 0)
BIGINT UNSIGNED ZEROFILL
SERIAL
FLOAT [UNSIGNED] [ZEROFILL]
FLOAT
DOUBLE [UNSIGNED] [ZEROFILL]
DOUBLE
DOUBLE PRECISION [UNSIGNED] [ZEROFILL]
REAL [UNSIGNED] [ZEROFILL]
NUMERIC(p, s) [UNSIGNED] [ZEROFILL]
DECIMAL(p, s)
DECIMAL(p, s) [UNSIGNED] [ZEROFILL]
BOOLEAN
BOOLEAN
TINYINT(1)
DATE
DATE
TIME [(p)]
TIME [(p)] [WITHOUT TIME ZONE]
DATETIME [(p)]
TIMESTAMP [(p)] [WITHOUT TIME ZONE]
TIMESTAMP [(p)]
TIMESTAMP [(p)]
TIMESTAMP [(p)] WITH LOCAL TIME ZONE
CHAR(n)
STRING
VARCHAR(n)
TEXT
BINARY
BYTES
VARBINARY
BLOB
PentingJangan gunakan tipe TINYINT(1) di MySQL untuk menyimpan nilai selain 0 dan 1. Saat property-version diatur ke 0, tabel sumber CDC MySQL memetakan TINYINT(1) ke tipe BOOLEAN Flink secara default. Hal ini dapat menyebabkan data tidak akurat. Untuk menggunakan tipe TINYINT(1) untuk menyimpan nilai selain 0 dan 1, lihat parameter konfigurasi catalog.table.treat-tinyint1-as-boolean.
Tabel dimensi dan sink
Tipe bidang MySQL
Tipe bidang Flink
TINYINT
TINYINT
SMALLINT
SMALLINT
TINYINT UNSIGNED
INT
INT
MEDIUMINT
SMALLINT UNSIGNED
BIGINT
BIGINT
INT UNSIGNED
BIGINT UNSIGNED
DECIMAL(20, 0)
FLOAT
FLOAT
DOUBLE
DOUBLE
DOUBLE PRECISION
NUMERIC(p, s)
DECIMAL(p, s)
Catatandengan p <= 38.
DECIMAL(p, s)
BOOLEAN
BOOLEAN
TINYINT(1)
DATE
DATE
TIME [(p)]
TIME [(p)] [WITHOUT TIME ZONE]
DATETIME [(p)]
TIMESTAMP [(p)] [WITHOUT TIME ZONE]
TIMESTAMP [(p)]
CHAR(n)
CHAR(n)
VARCHAR(n)
VARCHAR(n)
BIT(n)
BINARY(⌈n/8⌉)
BINARY(n)
BINARY(n)
VARBINARY(N)
VARBINARY(N)
TINYTEXT
STRING
TEXT
MEDIUMTEXT
LONGTEXT
TINYBLOB
BYTES
PentingFlink mendukung catatan BLOB MySQL hingga 2.147.483.647 byte (231 − 1).
BLOB
MEDIUMBLOB
LONGBLOB
Ingesti Data
Konektor MySQL dapat digunakan sebagai sumber data dalam pekerjaan ingesti data YAML.
Sintaks
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: <username>
password: <password>
tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
server-id: 5401-5404
sink:
type: xxxKonfigurasi
Parameter | Deskripsi | Wajib | Tipe data | Nilai default | Keterangan |
type | Jenis sumber data. | Ya | STRING | Tidak ada | Nilainya tetap mysql. |
name | Nama sumber data. | Tidak | STRING | Tidak ada | Tidak ada. |
hostname | Alamat IP atau hostname database MySQL. | Ya | STRING | Tidak ada | Tentukan alamat virtual private cloud (VPC). Catatan Jika database MySQL dan layanan Flink real-time Anda tidak berada dalam VPC yang sama, Anda perlu terlebih dahulu membuat koneksi jaringan lintas-VPC atau mengakses database melalui jaringan publik. Untuk informasi selengkapnya, lihat Manajemen dan Operasi Penyimpanan dan Bagaimana kluster Flink yang sepenuhnya dikelola mengakses jaringan publik?. |
username | Nama pengguna untuk layanan database MySQL. | Ya | STRING | Tidak ada | Tidak ada. |
password | Kata sandi untuk layanan database MySQL. | Ya | STRING | Tidak ada | Tidak ada. |
tables | Tabel MySQL yang akan disinkronkan. | Ya | STRING | Tidak ada |
Catatan
|
tables.exclude | Tabel yang dikecualikan dari sinkronisasi. | Tidak | STRING | Tidak ada |
Catatan Titik (.) memisahkan nama database dan nama tabel. Untuk menggunakan titik untuk mencocokkan karakter apa pun, escape dengan backslash (\). Misalnya: db0.\.*, db1.user_table_[0-9]+, db[1-2].[app|web]order_\.*. |
port | Nomor port layanan database MySQL. | Tidak | INTEGER | 3306 | Tidak ada. |
schema-change.enabled | Menentukan apakah akan mengirimkan event evolusi skema. | Tidak | BOOLEAN | true | Tidak ada. |
server-id | ID numerik atau rentang ID untuk klien database yang digunakan untuk sinkronisasi. | Tidak | STRING | Nilai acak antara 5400 dan 6400 dihasilkan. | ID ini harus unik secara global dalam kluster MySQL. Tetapkan ID berbeda untuk setiap pekerjaan yang terhubung ke database yang sama. Parameter ini juga mendukung rentang ID, seperti 5400-5408. Saat pembacaan inkremental diaktifkan, beberapa pembacaan konkuren didukung. Dalam kasus ini, tetapkan rentang ID agar setiap pembacaan konkuren menggunakan ID berbeda. |
jdbc.properties.* | Parameter koneksi kustom dalam URL Java Database Connectivity (JDBC). | Tidak | STRING | Tidak ada | Anda dapat meneruskan parameter koneksi kustom. Misalnya, untuk menonaktifkan protokol SSL, atur 'jdbc.properties.useSSL' = 'false'. Untuk informasi selengkapnya tentang parameter koneksi yang didukung, lihat Properti Konfigurasi MySQL. |
debezium.* | Parameter kustom untuk Debezium membaca data logging biner (binlog). | Tidak | STRING | Tidak ada | Anda dapat meneruskan parameter Debezium kustom. Misalnya, gunakan 'debezium.event.deserialization.failure.handling.mode'='ignore' untuk menentukan cara menangani kesalahan deserialisasi. |
scan.incremental.snapshot.chunk.size | Ukuran setiap chunk dalam jumlah baris. | Tidak | INTEGER | 8096 | Tabel MySQL dibagi menjadi beberapa chunk untuk dibaca. Data chunk di-cache dalam memori sebelum sepenuhnya dibaca. Mengurangi jumlah baris per chunk meningkatkan jumlah total chunk dalam tabel. Meskipun ini mengurangi granularitas pemulihan kesalahan, hal ini dapat memicu kesalahan kehabisan memori (OOM) dan mengurangi throughput keseluruhan. Oleh karena itu, Anda harus menyeimbangkan faktor-faktor ini dan menetapkan ukuran chunk yang sesuai. |
scan.snapshot.fetch.size | Jumlah maksimum catatan yang diambil sekaligus saat membaca data lengkap tabel. | Tidak | INTEGER | 1024 | Tidak ada. |
scan.startup.mode | Mode startup untuk mengonsumsi data. | Tidak | STRING | initial | Nilai yang valid:
Penting Untuk mode startup earliest-offset, specific-offset, dan timestamp, jika skema tabel pada waktu startup pekerjaan berbeda dari skema pada offset awal yang ditentukan, pekerjaan gagal. Dengan kata lain, jika Anda menggunakan ketiga mode startup ini, pastikan skema tabel yang sesuai tidak berubah antara posisi konsumsi binlog yang ditentukan dan waktu startup pekerjaan. |
scan.startup.specific-offset.file | Nama file binlog untuk offset awal saat mode startup diatur ke specific-offset. | Tidak | STRING | Tidak ada | Parameter ini hanya digunakan saat scan.startup.mode diatur ke specific-offset. Format contoh: |
scan.startup.specific-offset.pos | Offset dalam file binlog yang ditentukan untuk memulai saat mode startup diatur ke specific-offset. | Tidak | INTEGER | Tidak ada | Parameter ini hanya digunakan saat scan.startup.mode diatur ke specific-offset. |
scan.startup.specific-offset.gtid-set | Set GTID untuk memulai saat mode startup diatur ke specific-offset. | Tidak | STRING | Tidak ada | Parameter ini hanya digunakan saat scan.startup.mode diatur ke specific-offset. Format contoh set GTID: |
scan.startup.timestamp-millis | Timestamp dalam milidetik untuk memulai saat mode startup diatur ke timestamp. | Tidak | LONG | Tidak ada | Parameter ini hanya digunakan saat scan.startup.mode diatur ke timestamp. Timestamp dalam milidetik. Penting Saat Anda menentukan waktu, CDC MySQL mencoba membaca event awal dari setiap file binlog untuk menentukan timestamp-nya dan menemukan file binlog yang sesuai dengan waktu yang ditentukan. Pastikan file binlog yang sesuai dengan timestamp yang ditentukan belum dibersihkan dari database dan dapat dibaca. |
server-time-zone | Zona waktu sesi yang digunakan oleh database. | Tidak | STRING | Jika Anda tidak menentukan parameter ini, sistem menggunakan zona waktu lingkungan runtime pekerjaan Flink sebagai zona waktu server database. Ini adalah zona waktu zona yang Anda pilih. | Misalnya, Asia/Shanghai. Parameter ini mengontrol bagaimana tipe TIMESTAMP MySQL dikonversi ke tipe STRING. Untuk informasi selengkapnya, lihat Tipe temporal Debezium. |
scan.startup.specific-offset.skip-events | Jumlah event binlog yang dilewati saat membaca dari offset tertentu. | Tidak | INTEGER | Tidak ada | Saat menggunakan konfigurasi ini, Anda harus mengatur scan.startup.mode ke specific-offset. |
scan.startup.specific-offset.skip-rows | Jumlah perubahan baris yang dilewati saat membaca dari offset tertentu. Satu event binlog mungkin sesuai dengan beberapa perubahan baris. | Tidak | INTEGER | Tidak ada | Saat menggunakan konfigurasi ini, Anda harus mengatur scan.startup.mode ke specific-offset. |
connect.timeout | Waktu maksimum menunggu sebelum mencoba koneksi ulang ke server database MySQL setelah timeout. | Tidak | DURATION | 30s | Tidak ada. |
connect.max-retries | Jumlah maksimum percobaan ulang setelah koneksi ke layanan database MySQL gagal. | Tidak | INTEGER | 3 | Tidak ada. |
connection.pool.size | Ukuran kolam koneksi database. | Tidak | INTEGER | 20 | Kolam koneksi database digunakan untuk menggunakan kembali koneksi, yang dapat mengurangi jumlah koneksi database. |
heartbeat.interval | Interval di mana sumber menggunakan event heartbeat untuk memajukan offset binlog. | Tidak | DURATION | 30s | Event heartbeat digunakan untuk memajukan offset binlog di sumber. Ini berguna untuk tabel di MySQL yang jarang diperbarui. Untuk tabel tersebut, offset binlog tidak maju secara otomatis. Event heartbeat dapat mendorong offset binlog maju, yang mencegah offset kedaluwarsa. Offset binlog yang kedaluwarsa menyebabkan pekerjaan gagal dan tidak dapat dipulihkan. Pekerjaan hanya dapat dimulai ulang tanpa status. |
scan.incremental.snapshot.chunk.key-column | Menentukan kolom untuk digunakan dalam sharding selama fase snapshot. | Tidak. | STRING | Tidak ada | Hanya kolom dari primary key yang dapat dipilih. |
rds.region-id | ID wilayah instans Alibaba Cloud RDS for MySQL. | Wajib saat Anda membaca log arsip dari Object Storage Service (OSS). | STRING | Tidak ada | Untuk informasi selengkapnya tentang ID wilayah, lihat Wilayah dan zona. |
rds.access-key-id | ID AccessKey akun Alibaba Cloud untuk instans RDS for MySQL. | Wajib saat Anda membaca log arsip dari OSS. | STRING | Tidak ada | Untuk informasi selengkapnya, lihat Bagaimana cara melihat ID AccessKey dan rahasia AccessKey? Penting Untuk mencegah informasi AccessKey Anda bocor, gunakan manajemen rahasia untuk menentukan ID AccessKey. Untuk informasi selengkapnya, lihat Manajemen variabel. |
rds.access-key-secret | Rahasia AccessKey akun Alibaba Cloud untuk instans RDS for MySQL. | Wajib saat Anda membaca log arsip dari OSS. | STRING | Tidak ada | Untuk informasi selengkapnya, lihat Bagaimana cara melihat ID AccessKey dan rahasia AccessKey? Penting Untuk mencegah informasi AccessKey Anda bocor, gunakan manajemen rahasia untuk menentukan rahasia AccessKey. Untuk informasi selengkapnya, lihat Manajemen variabel. |
rds.db-instance-id | ID instans Alibaba Cloud RDS for MySQL. | Wajib saat Anda membaca log arsip dari OSS. | STRING | Tidak ada | Tidak ada. |
rds.main-db-id | ID database primer instans Alibaba Cloud RDS for MySQL. | Tidak | STRING | Tidak ada | Untuk informasi selengkapnya tentang cara mendapatkan ID database primer, lihat Cadangan log RDS for MySQL. |
rds.download.timeout | Periode timeout untuk mengunduh satu log arsip dari OSS. | Tidak | DURATION | 60s | Tidak ada. |
rds.endpoint | Titik akhir layanan untuk mendapatkan informasi binlog OSS. | Tidak | STRING | Tidak ada | Untuk informasi selengkapnya tentang nilai yang tersedia, lihat Titik Akhir. |
rds.binlog-directory-prefix | Awalan direktori untuk menyimpan file binlog. | Tidak | STRING | rds-binlog- | Tidak ada. |
rds.use-intranet-link | Menentukan apakah akan menggunakan tautan jaringan internal untuk mengunduh file binlog. | Tidak | BOOLEAN | true | Tidak ada. |
rds.binlog-directories-parent-path | Jalur mutlak direktori induk tempat file binlog disimpan. | Tidak | STRING | Tidak ada | Tidak ada. |
chunk-meta.group.size | Ukuran metadata chunk. | Tidak | INTEGER | 1000 | Jika metadata lebih besar dari nilai ini, metadata diteruskan dalam beberapa bagian. |
chunk-key.even-distribution.factor.lower-bound | Batas bawah faktor distribusi chunk untuk sharding merata. | Tidak | DOUBLE | 0.05 | Jika faktor distribusi kurang dari nilai ini, sharding tidak merata digunakan. Faktor distribusi chunk = (MAX(chunk-key) - MIN(chunk-key) + 1) / Total jumlah baris data. |
chunk-key.even-distribution.factor.upper-bound | Batas atas faktor distribusi chunk untuk sharding merata. | Tidak | DOUBLE | 1000.0 | Jika faktor distribusi lebih besar dari nilai ini, sharding tidak merata digunakan. Faktor distribusi chunk = (MAX(chunk-key) - MIN(chunk-key) + 1) / Total jumlah baris data. |
scan.incremental.close-idle-reader.enabled | Menentukan apakah akan menutup pembaca idle setelah snapshot selesai. | Tidak | BOOLEAN | false | Agar konfigurasi ini berlaku, atur |
scan.only.deserialize.captured.tables.changelog.enabled | Menentukan apakah hanya mendeserialisasi event perubahan tabel yang ditentukan selama fase inkremental. | Tidak | BOOLEAN |
| Nilai yang valid:
|
scan.parallel-deserialize-changelog.enabled | Menentukan apakah akan menggunakan beberapa thread untuk mengurai event perubahan selama fase inkremental. | Tidak | BOOLEAN | false | Nilai yang valid:
Catatan Fitur ini hanya didukung oleh mesin komputasi Flink VVR 8.0.11 dan seterusnya. |
scan.parallel-deserialize-changelog.handler.size | Jumlah handler event saat menggunakan beberapa thread untuk mengurai event perubahan. | Tidak | INTEGER | 2 | Catatan Fitur ini hanya didukung oleh mesin komputasi Flink VVR 8.0.11 dan seterusnya. |
metadata-column.include-list | Kolom metadata untuk diteruskan ke sink downstream. | Tidak | STRING | Tidak ada | Kolom metadata yang tersedia meliputi Catatan Konektor YAML CDC MySQL tidak memerlukan atau mendukung penambahan kolom metadata untuk nama database, nama tabel, atau Penting
|
scan.newly-added-table.enabled | Menentukan apakah akan menyinkronkan tabel yang baru ditambahkan yang tidak cocok selama startup sebelumnya atau menghapus tabel dari status yang saat ini tidak cocok saat memulai ulang dari checkpoint. | Tidak | BOOLEAN | false | Ini berlaku saat memulai ulang dari checkpoint atau titik simpan. |
scan.binlog.newly-added-table.enabled | Menentukan apakah akan mengirimkan data untuk tabel yang baru cocok selama fase inkremental. | Tidak | BOOLEAN | false | Tidak dapat diaktifkan bersamaan dengan |
scan.incremental.snapshot.chunk.key-column | Menentukan kolom untuk digunakan dalam sharding selama fase snapshot untuk tabel tertentu. | Tidak | STRING | Tidak ada |
|
scan.parse.online.schema.changes.enabled | Menentukan apakah akan mencoba mengurai event DDL perubahan lockless RDS selama fase inkremental. | Tidak | BOOLEAN | false | Nilai yang valid:
Ini adalah fitur eksperimental. Sebelum melakukan perubahan lockless online, buat snapshot pekerjaan Flink untuk pemulihan. Catatan Fitur ini hanya didukung oleh mesin komputasi Flink VVR 11.0 dan seterusnya. |
scan.incremental.snapshot.backfill.skip | Menentukan apakah akan melewati backfill selama fase pembacaan snapshot. | Tidak | BOOLEAN | false | Nilai yang valid:
Jika backfill dilewati, perubahan pada tabel selama fase snapshot dibaca dalam fase inkremental berikutnya daripada digabungkan ke dalam snapshot. Penting Melewati backfill dapat menyebabkan inkonsistensi data karena perubahan yang terjadi selama fase snapshot mungkin diputar ulang. Hanya semantik at-least-once yang dijamin. Catatan Fitur ini hanya didukung oleh mesin komputasi Flink VVR 11.1 dan seterusnya. |
treat-tinyint1-as-boolean.enabled | Menentukan apakah akan memperlakukan tipe TINYINT(1) sebagai tipe BOOLEAN. | Tidak | BOOLEAN | true | Nilai yang valid:
|
treat-timestamp-as-datetime-enabled | Menentukan apakah akan memperlakukan tipe TIMESTAMP sebagai tipe DATETIME. | Tidak | BOOLEAN | false | Nilai yang valid:
Tipe TIMESTAMP MySQL menyimpan waktu UTC dan dipengaruhi oleh zona waktu. Tipe DATETIME MySQL menyimpan waktu literal dan tidak dipengaruhi oleh zona waktu. Jika ini diaktifkan, data tipe TIMESTAMP MySQL dikonversi ke tipe DATETIME berdasarkan server-time-zone. |
include-comments.enabled | Menentukan apakah akan menyinkronkan komentar tabel dan kolom. | Tidak | BOOELEAN | false | Nilai yang valid:
Mengaktifkan ini meningkatkan penggunaan memori pekerjaan. |
scan.incremental.snapshot.unbounded-chunk-first.enabled | Menentukan apakah akan mendistribusikan chunk tak terbatas terlebih dahulu selama fase pembacaan snapshot. | Tidak | BOOELEAN | false | Nilai yang valid:
Ini adalah fitur eksperimental. Mengaktifkan ini mengurangi risiko kesalahan OOM saat TaskManager menyinkronkan chunk terakhir selama fase snapshot. Tambahkan parameter ini sebelum startup pertama pekerjaan. Catatan Fitur ini hanya didukung oleh mesin komputasi Flink VVR 11.1 dan seterusnya. |
binlog.session.network.timeout | Timeout jaringan untuk koneksi binlog. | Tidak | DURATION | 10m | Jika ini diatur ke 0s, timeout default server MySQL digunakan. Catatan Fitur ini hanya didukung oleh mesin komputasi Flink VVR 11.5 dan seterusnya. |
scan.rate-limit.records-per-second | Membatasi jumlah maksimum catatan yang dapat dikirim sumber per detik. | Tidak | LONG | Tidak ada | Ini berlaku untuk skenario di mana pembacaan data perlu dibatasi. Batasan ini efektif dalam fase lengkap dan inkremental. Metrik Selama fase baca lengkap, Anda biasanya perlu mengurangi jumlah entri data yang dibaca dalam setiap batch. Anda dapat mengurangi nilai parameter Catatan Fitur ini hanya didukung oleh mesin komputasi Flink VVR 11.5 dan seterusnya. |
include-binlog-meta.enable | Menentukan apakah akan menyertakan informasi binlog MySQL asli, seperti GTID dan offset binlog, dalam pesan. | Tidak | Boolean | false | Ini cocok untuk skenario sinkronisasi binlog asli, seperti mengganti tautan sinkronisasi canal yang ada. Catatan Fitur ini hanya didukung oleh mesin komputasi Flink VVR 11.6 dan seterusnya. |
scan.binlog.tolerate.gtid-holes | Mengaktifkan parameter ini mengabaikan celah dalam urutan GTID, yang memungkinkan pekerjaan melewati event diskontinu dan terus berjalan. | Tidak | Boolean | false | Sebelum Anda mengaktifkan parameter ini, Anda harus memastikan bahwa offset awal pekerjaan belum kedaluwarsa. Jika pekerjaan dimulai dari offset GTID yang dibersihkan atau kedaluwarsa, mesin diam-diam melewatkan log yang hilang, yang mengakibatkan kehilangan data. Catatan Parameter ini hanya didukung oleh mesin komputasi Flink VVR 11.6 dan seterusnya. |
Pemetaan tipe
Tabel berikut menunjukkan pemetaan tipe data untuk ingesti data.
Tipe bidang CDC MySQL | Tipe bidang CDC |
TINYINT(n) | TINYINT |
SMALLINT | SMALLINT |
TINYINT UNSIGNED | |
TINYINT UNSIGNED ZEROFILL | |
YEAR | |
INT | INT |
MEDIUMINT | |
MEDIUMINT UNSIGNED | |
MEDIUMINT UNSIGNED ZEROFILL | |
SMALLINT UNSIGNED | |
SMALLINT UNSIGNED ZEROFILL | |
BIGINT | BIGINT |
INT UNSIGNED | |
INT UNSIGNED ZEROFILL | |
BIGINT UNSIGNED | DECIMAL(20, 0) |
BIGINT UNSIGNED ZEROFILL | |
SERIAL | |
FLOAT [UNSIGNED] [ZEROFILL] | FLOAT |
DOUBLE [UNSIGNED] [ZEROFILL] | DOUBLE |
DOUBLE PRECISION [UNSIGNED] [ZEROFILL] | |
REAL [UNSIGNED] [ZEROFILL] | |
NUMERIC(p, s) [UNSIGNED] [ZEROFILL] dengan p <= 38 | DECIMAL(p, s) |
DECIMAL(p, s) [UNSIGNED] [ZEROFILL] dengan p <= 38 | |
FIXED(p, s) [UNSIGNED] [ZEROFILL] dengan p <= 38 | |
BOOLEAN | BOOLEAN |
BIT(1) | |
TINYINT(1) | |
DATE | DATE |
TIME [(p)] | TIME [(p)] |
DATETIME [(p)] | TIMESTAMP [(p)] |
TIMESTAMP [(p)] | Pemetaan tergantung pada nilai parameter
|
CHAR(n) | CHAR(n) |
VARCHAR(n) | VARCHAR(n) |
BIT(n) | BINARY(⌈(n + 7) / 8⌉) |
BINARY(n) | BINARY(n) |
VARBINARY(N) | VARBINARY(N) |
NUMERIC(p, s) [UNSIGNED] [ZEROFILL] dengan 38 < p <= 65 | STRING Catatan Dalam MySQL, presisi tipe data desimal dapat mencapai 65. Dalam Flink, presisi dibatasi hingga 38. Oleh karena itu, jika Anda mendefinisikan kolom desimal dengan presisi lebih dari 38, Anda harus memetakannya ke string untuk menghindari kehilangan presisi. |
DECIMAL(p, s) [UNSIGNED] [ZEROFILL] dengan 38 < p <= 65 | |
FIXED(p, s) [UNSIGNED] [ZEROFILL] dengan 38 < p <= 65 | |
TINYTEXT | STRING |
TEXT | |
MEDIUMTEXT | |
LONGTEXT | |
ENUM | |
JSON | STRING Catatan Tipe data JSON dikonversi ke string berformat JSON di Flink. |
GEOMETRY | STRING Catatan Tipe data spasial di MySQL dikonversi ke string dalam format JSON tetap. Untuk informasi selengkapnya, lihat Pemetaan tipe data spasial MySQL. |
POINT | |
LINESTRING | |
POLYGON | |
MULTIPOINT | |
MULTILINESTRING | |
MULTIPOLYGON | |
GEOMETRYCOLLECTION | |
TINYBLOB | BYTES Catatan Untuk tipe data BLOB di MySQL, hanya blob dengan panjang tidak lebih dari 2.147.483.647 (2^31 - 1) yang didukung. |
BLOB | |
MEDIUMBLOB | |
LONGBLOB |
Contoh penggunaan
Tabel sumber CDC
CREATE TEMPORARY TABLE mysqlcdc_source ( order_id INT, order_date TIMESTAMP(0), customer_name STRING, price DECIMAL(10, 5), product_id INT, order_status BOOLEAN, PRIMARY KEY(order_id) NOT ENFORCED ) WITH ( 'connector' = 'mysql', 'hostname' = '<yourHostname>', 'port' = '3306', 'username' = '<yourUsername>', 'password' = '<yourPassword>', 'database-name' = '<yourDatabaseName>', 'table-name' = '<yourTableName>' ); CREATE TEMPORARY TABLE blackhole_sink( order_id INT, customer_name STRING ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT order_id, customer_name FROM mysqlcdc_source;Tabel dimensi
CREATE TEMPORARY TABLE datagen_source( a INT, b BIGINT, c STRING, `proctime` AS PROCTIME() ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE mysql_dim ( a INT, b VARCHAR, c VARCHAR ) WITH ( 'connector' = 'mysql', 'hostname' = '<yourHostname>', 'port' = '3306', 'username' = '<yourUsername>', 'password' = '<yourPassword>', 'database-name' = '<yourDatabaseName>', 'table-name' = '<yourTableName>' ); CREATE TEMPORARY TABLE blackhole_sink( a INT, b STRING ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT T.a, H.b FROM datagen_source AS T JOIN mysql_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H ON T.a = H.a;Tabel sink
CREATE TEMPORARY TABLE datagen_source ( `name` VARCHAR, `age` INT ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE mysql_sink ( `name` VARCHAR, `age` INT ) WITH ( 'connector' = 'mysql', 'hostname' = '<yourHostname>', 'port' = '3306', 'username' = '<yourUsername>', 'password' = '<yourPassword>', 'database-name' = '<yourDatabaseName>', 'table-name' = '<yourTableName>' ); INSERT INTO mysql_sink SELECT * FROM datagen_source;Sumber ingesti data
source: type: mysql name: MySQL Source hostname: ${mysql.hostname} port: ${mysql.port} username: ${mysql.username} password: ${mysql.password} tables: ${mysql.source.table} server-id: 7601-7604 sink: type: values name: Values Sink print.enabled: true sink.print.logger: true
Tentang tabel sumber CDC MySQL
Prinsip implementasi
Saat tabel sumber CDC MySQL dimulai, tabel tersebut memindai seluruh tabel, membaginya menjadi beberapa chunk berdasarkan primary key, dan mencatat offset log biner. Kemudian membaca data dari setiap chunk secara berurutan menggunakan algoritma snapshot inkremental dengan pernyataan SELECT. Pekerjaan secara berkala melakukan checkpoint, mencatat chunk yang telah selesai. Jika terjadi failover, pekerjaan melanjutkan pembacaan dari chunk yang belum selesai. Setelah semua chunk dibaca, pekerjaan membaca catatan perubahan inkremental dari offset log biner yang sebelumnya dicatat. Pekerjaan Flink terus melakukan checkpoint secara berkala, mencatat offset log biner. Jika pekerjaan gagal, pekerjaan melanjutkan pemrosesan dari offset log biner terakhir yang dicatat, memastikan semantik tepat-sekali.
Untuk penjelasan lebih rinci tentang algoritma snapshot inkremental, lihat Konektor CDC MySQL.
Metadata
Metadata berguna dalam skenario sharding tempat Anda menggabungkan dan menyinkronkan data dari beberapa database dan tabel. Dalam kasus seperti itu, Anda sering perlu membedakan database dan tabel sumber untuk setiap catatan. Kolom metadata memberikan akses ke informasi nama database dan tabel sumber, sehingga memudahkan penggabungan beberapa tabel sharded menjadi satu tabel tujuan.
Tabel Sumber CDC MySQL mendukung sintaks kolom metadata, memungkinkan Anda mengakses metadata berikut:
Kunci metadata
Tipe metadata
Deskripsi
database_name
STRING NOT NULL
Nama database yang berisi catatan.
table_name
STRING NOT NULL
Nama tabel yang berisi catatan.
op_ts
TIMESTAMP_LTZ(3) NOT NULL
Waktu perubahan catatan di database. Jika catatan berasal dari data historis daripada log biner, nilai ini selalu 0.
op_type
STRING NOT NULL
Jenis perubahan catatan.
+I: Pesan INSERT
-D: Pesan DELETE
-U: Pesan UPDATE_BEFORE
+U: Pesan UPDATE_AFTER
CatatanDidukung hanya di VVR 8.0.7 dan seterusnya.
query_log
STRING NOT NULL
Catatan log kueri MySQL yang sesuai dengan baris tersebut.
CatatanMySQL harus memiliki parameter binlog_rows_query_log_events diaktifkan untuk mencatat log kueri.
Contoh kode berikut menggabungkan beberapa tabel orders sharded dari database berbeda dalam satu instans MySQL ke satu tabel holo_orders di Hologres.
CREATE TEMPORARY TABLE mysql_orders ( db_name STRING METADATA FROM 'database_name' VIRTUAL, -- Baca nama database. table_name STRING METADATA FROM 'table_name' VIRTUAL, -- Baca nama tabel. operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL, -- Baca waktu perubahan. op_type STRING METADATA FROM 'op_type' VIRTUAL, -- Baca jenis perubahan. order_id INT, order_date TIMESTAMP(0), customer_name STRING, price DECIMAL(10, 5), product_id INT, order_status BOOLEAN, PRIMARY KEY(order_id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'localhost', 'port' = '3306', 'username' = 'flinkuser', 'password' = 'flinkpw', 'database-name' = 'mydb_.*', -- Regex cocokkan beberapa database sharded. 'table-name' = 'orders_.*' -- Regex cocokkan beberapa tabel sharded. ); INSERT INTO holo_orders SELECT * FROM mysql_orders;Berdasarkan kode di atas, jika Anda mengatur parameter scan.read-changelog-as-append-only.enabled ke true dalam klausa WITH, output bervariasi berdasarkan konfigurasi primary key tabel sink:
Jika primary key tabel sink adalah order_id, output hanya berisi perubahan terakhir untuk setiap primary key dari tabel sumber. Untuk primary key yang perubahan terakhirnya adalah operasi penghapusan, tabel sink menampilkan catatan dengan primary key yang sama dan op_type -D.
Jika primary key tabel sink adalah order_id, operation_ts, dan op_type, output berisi riwayat perubahan lengkap untuk setiap primary key dari tabel sumber.
Dukungan ekspresi reguler
Tabel sumber CDC MySQL mendukung penggunaan ekspresi reguler dalam nama tabel atau nama database untuk mencocokkan beberapa tabel atau database. Contoh kode berikut menggunakan ekspresi reguler untuk menentukan beberapa tabel.
CREATE TABLE products ( db_name STRING METADATA FROM 'database_name' VIRTUAL, table_name STRING METADATA FROM 'table_name' VIRTUAL, operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL, order_id INT, order_date TIMESTAMP(0), customer_name STRING, price DECIMAL(10, 5), product_id INT, order_status BOOLEAN, PRIMARY KEY(order_id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'localhost', 'port' = '3306', 'username' = 'root', 'password' = '123456', 'database-name' = '(^(test).*|^(tpc).*|txc|.*[p$]|t{2})', -- Regex cocokkan beberapa database. 'table-name' = '(t[5-8]|tt)' -- Regex cocokkan beberapa tabel. );Penjelasan ekspresi reguler dalam contoh:
^(test).* adalah contoh pencocokan prefiks. Ekspresi ini mencocokkan nama database yang dimulai dengan "test", seperti test1 atau test2.
.*[p$] adalah contoh pencocokan sufiks. Ekspresi ini mencocokkan nama database yang diakhiri dengan "p", seperti cdcp atau edcp.
txc adalah pencocokan eksak. Ini mencocokkan nama database "txc".
Saat mencocokkan nama tabel jalur lengkap, CDC MySQL mengidentifikasi tabel secara unik menggunakan nama database dan nama tabel, yaitu database-name.table-name sebagai pola pencocokan. Misalnya, pola (^(test).*|^(tpc).*|txc|.*[p$]|t{2}).(t[5-8]|tt) dapat mencocokkan tabel txc.tt dan test2.test5 dalam database.
PentingDalam konfigurasi pekerjaan SQL, table-name dan database-name tidak mendukung daftar yang dipisahkan koma untuk menentukan beberapa tabel atau database.
Untuk mencocokkan beberapa tabel atau menggunakan beberapa ekspresi reguler, hubungkan dengan bilah vertikal (|) dan tutup dalam tanda kurung. Misalnya, untuk membaca tabel 'user' dan 'product', atur table-name ke
(user|product).Jika ekspresi reguler berisi koma, tulis ulang menggunakan operator bilah vertikal (|). Misalnya, tulis ulang
mytable_\d{1, 2}sebagai(mytable_\d{1}|mytable_\d{2})yang setara untuk menghindari penggunaan koma.
Kontrol konkurensi
Konektor MySQL mendukung pembacaan konkuren data lengkap untuk meningkatkan efisiensi pemuatan data. Dikombinasikan dengan fitur auto-tuning Autopilot di konsol Realtime Compute Flink, secara otomatis menskala turun selama fase inkremental setelah pembacaan data lengkap konkuren, menghemat sumber daya komputasi.
Di Konsol pengembangan Realtime Compute, Anda dapat mengatur tingkat paralelisme pekerjaan dalam mode dasar atau ahli. Perbedaan dalam pengaturan konkurensi adalah sebagai berikut:
Konkurensi yang diatur dalam mode dasar berlaku secara global untuk pekerjaan.

Dalam mode ahli, Anda dapat mengatur tingkat paralelisme untuk vertex tertentu.

Untuk informasi selengkapnya tentang konfigurasi sumber daya, lihat Konfigurasi informasi penerapan pekerjaan.
PentingTerlepas dari modusnya, saat mengatur konkurensi, rentang server-id yang dideklarasikan dalam tabel harus lebih besar dari atau sama dengan tingkat paralelisme pekerjaan. Misalnya, jika rentang server-id adalah 5404-5412, terdapat 8 ID server unik, sehingga pekerjaan dapat memiliki maksimal 8 tugas konkuren. Pekerjaan berbeda yang mengakses instans MySQL yang sama harus memiliki rentang server-id yang tidak tumpang tindih; setiap pekerjaan harus secara eksplisit mengonfigurasi server-id unik.
Autopilot Autoscaling
Fase data lengkap mengakumulasi banyak data historis. Untuk meningkatkan efisiensi pembacaan, biasanya digunakan pembacaan konkuren. Dalam fase inkremental log biner, karena volume data log biner kecil dan untuk memastikan urutan global, pembacaan single-threaded biasanya cukup. Persyaratan sumber daya yang berbeda untuk fase lengkap dan inkremental dapat diseimbangkan secara otomatis oleh fitur auto-tuning.
Auto-tuning memantau trafik setiap tugas di Sumber CDC MySQL. Saat pekerjaan memasuki fase log biner, jika hanya satu tugas yang bertanggung jawab atas pembacaan log biner dan tugas lainnya idle, auto-tuning secara otomatis mengurangi jumlah CU dan konkurensi Sumber. Untuk mengaktifkan auto-tuning, atur mode auto-tuning ke Active di halaman operasi pekerjaan.
CatatanInterval pemicu minimum untuk mengurangi konkurensi adalah 24 jam secara default. Untuk informasi selengkapnya tentang parameter dan detail auto-tuning, lihat Konfigurasi auto-tuning.
Mode startup
Gunakan item konfigurasi scan.startup.mode untuk menentukan mode startup untuk tabel sumber CDC MySQL. Nilai yang valid:
initial (default): Saat startup pertama, melakukan pembacaan lengkap tabel database, lalu beralih ke pembacaan log biner inkremental.
earliest-offset: Melewati fase snapshot dan mulai membaca dari offset log biner paling awal yang tersedia.
latest-offset: Melewati fase snapshot dan mulai membaca dari akhir log biner. Dalam mode ini, tabel sumber hanya membaca perubahan data yang terjadi setelah pekerjaan dimulai.
specific-offset: Melewati fase snapshot dan mulai membaca dari offset log biner tertentu. Offset dapat ditentukan berdasarkan nama file log biner dan posisi, atau berdasarkan set GTID.
timestamp: Melewati fase snapshot dan mulai membaca event log biner dari timestamp tertentu.
Contoh penggunaan:
CREATE TABLE mysql_source (...) WITH ( 'connector' = 'mysql-cdc', 'scan.startup.mode' = 'earliest-offset', -- Mulai dari offset paling awal. 'scan.startup.mode' = 'latest-offset', -- Mulai dari offset terbaru. 'scan.startup.mode' = 'specific-offset', -- Mulai dari offset tertentu. 'scan.startup.mode' = 'timestamp', -- Mulai dari timestamp tertentu. 'scan.startup.specific-offset.file' = 'mysql-bin.000003', -- Tentukan nama file log biner untuk mode specific-offset. 'scan.startup.specific-offset.pos' = '4', -- Tentukan posisi log biner untuk mode specific-offset. 'scan.startup.specific-offset.gtid-set' = '24DA167-0C0C-11E8-8442-00059A3C7B00:1-19', -- Tentukan set GTID untuk mode specific-offset. 'scan.startup.timestamp-millis' = '1667232000000' -- Tentukan timestamp startup untuk mode timestamp. ... )PentingSumber MySQL mencetak offset saat ini pada waktu checkpoint dengan logging level INFO. Awalan log adalah
Binlog offset on checkpoint {checkpoint-id}. Log ini membantu Anda memulai pekerjaan dari offset checkpoint tertentu.Jika tabel yang dibaca telah mengalami perubahan skema, memulai dari mode earliest-offset, specific-offset, atau timestamp dapat menyebabkan kesalahan. Hal ini karena pembaca Debezium secara internal mempertahankan skema tabel terbaru, dan data sebelumnya dengan skema yang tidak cocok tidak dapat diurai dengan benar.
Tentang tabel sumber CDC tanpa kunci
Menggunakan tabel tanpa kunci memerlukan pengaturan scan.incremental.snapshot.chunk.key-column, dan kolom yang dipilih harus non-null.
Semantik pemrosesan tabel sumber CDC tanpa kunci ditentukan oleh perilaku kolom yang ditentukan dalam scan.incremental.snapshot.chunk.key-column:
Jika kolom yang ditentukan tidak diperbarui, semantik tepat-sekali dijamin.
Jika kolom yang ditentukan diperbarui, hanya semantik at-least-once yang dijamin. Namun, Anda dapat memastikan kebenaran data dengan menentukan primary key di sistem downstream dan menggunakan operasi idempoten.
Membaca log cadangan Alibaba Cloud ApsaraDB RDS for MySQL
Tabel sumber CDC MySQL mendukung pembacaan log cadangan dari Alibaba Cloud ApsaraDB RDS for MySQL. Ini berguna ketika fase data lengkap memakan waktu lama, file log biner lokal secara otomatis dibersihkan, tetapi file cadangan yang diunggah secara otomatis atau manual masih ada.
Contoh penggunaan:
CREATE TABLE mysql_source (...) WITH ( 'connector' = 'mysql-cdc', 'rds.region-id' = 'cn-beijing', 'rds.access-key-id' = 'xxxxxxxxx', 'rds.access-key-secret' = 'xxxxxxxxx', 'rds.db-instance-id' = 'rm-xxxxxxxxxxxxxxxxx', 'rds.main-db-id' = '12345678', 'rds.download.timeout' = '60s' ... )Mengaktifkan penggunaan ulang Sumber CDC
Dalam satu pekerjaan, beberapa tabel sumber CDC MySQL memulai beberapa klien Binlog. Saat semua tabel sumber berada dalam instans yang sama, hal ini meningkatkan beban pada database. Untuk informasi selengkapnya, lihat FAQ CDC MySQL.
Solusi
VVR 8.0.7 dan seterusnya mendukung penggunaan ulang Sumber CDC MySQL. Penggunaan ulang menggabungkan tabel sumber CDC MySQL yang dapat digabungkan. Penggabungan terjadi saat konfigurasi tabel sumber identik, kecuali untuk nama database, nama tabel, dan
server-id. Mesin secara otomatis menggabungkan sumber CDC MySQL dalam pekerjaan yang sama.Prosedur
Gunakan perintah
SETdalam pekerjaan SQL Anda:SET 'table.optimizer.source-merge.enabled' = 'true'; # (VVR 8.0.8 dan 8.0.9) Tambahan atur ini: SET 'sql-gateway.exec-plan.enabled' = 'false';VVR 11.1 dan seterusnya memiliki penggunaan ulang diaktifkan secara default.
Jalankan pekerjaan tanpa status. Memodifikasi konfigurasi penggunaan ulang Sumber mengubah topologi pekerjaan. Anda harus menjalankan pekerjaan tanpa status. Jika tidak, pekerjaan mungkin gagal dimulai atau kehilangan data. Jika Sumber digabung, Anda akan melihat node
MergetableSourceScan.
PentingSetelah mengaktifkan penggunaan ulang, kami merekomendasikan untuk tidak menonaktifkan operator chaining. Mengatur
pipeline.operator-chainingkefalsemeningkatkan overhead serialisasi dan deserialisasi data. Semakin banyak Sumber yang digabung, semakin besar overhead-nya.Di VVR 8.0.7, menonaktifkan operator chaining menyebabkan masalah serialisasi.
Mempercepat pembacaan log biner
Saat konektor MySQL digunakan sebagai tabel sumber atau sumber ingesti data, konektor tersebut mengurai file log biner untuk menghasilkan berbagai pesan perubahan selama fase inkremental. File log biner mencatat semua perubahan tabel dalam format biner. Percepat penguraian file log biner menggunakan metode berikut:
Aktifkan konfigurasi filter penguraian
Gunakan item konfigurasi
scan.only.deserialize.captured.tables.changelog.enabled: Hanya uraikan event perubahan untuk tabel yang ditentukan.
Optimalkan parameter Debezium
debezium.max.queue.size: 162580 debezium.max.batch.size: 40960 debezium.poll.interval.ms: 50debezium.max.queue.size: Jumlah maksimum catatan yang dapat ditampung oleh antrian pemblokiran. Saat Debezium membaca aliran event dari database, event ditempatkan ke antrian pemblokiran sebelum ditulis ke downstream. Nilai default adalah 8192.debezium.max.batch.size: Jumlah maksimum event yang diproses konektor dalam setiap iterasi. Nilai default adalah 2048.debezium.poll.interval.ms: Jumlah milidetik yang ditunggu konektor sebelum meminta event perubahan baru. Nilai default adalah 1000 milidetik (1 detik).
Contoh penggunaan:
CREATE TABLE mysql_source (...) WITH (
'connector' = 'mysql-cdc',
-- Konfigurasi Debezium
'debezium.max.queue.size' = '162580',
'debezium.max.batch.size' = '40960',
'debezium.poll.interval.ms' = '50',
-- Aktifkan filter penguraian
'scan.only.deserialize.captured.tables.changelog.enabled' = 'true', -- Hanya uraikan event perubahan untuk tabel yang ditentukan.
...
)source:
type: mysql
name: MySQL Source
hostname: ${mysql.hostname}
port: ${mysql.port}
username: ${mysql.username}
password: ${mysql.password}
tables: ${mysql.source.table}
server-id: 7601-7604
# Konfigurasi Debezium
debezium.max.queue.size: 162580
debezium.max.batch.size: 40960
debezium.poll.interval.ms: 50
# Aktifkan filter penguraian
scan.only.deserialize.captured.tables.changelog.enabled: trueVersi enterprise CDC MySQL memiliki kapasitas konsumsi log biner 85 MB/detik, kira-kira dua kali lipat versi komunitas open-source. Jika laju pembuatan log biner melebihi 85 MB/detik (yaitu, file 512 MB setiap 6 detik), latensi pekerjaan Flink terus meningkat. Latensi secara bertahap menurun saat laju pembuatan log biner turun. Jika file log biner berisi transaksi besar, latensi pemrosesan mungkin meningkat secara singkat, lalu menurun setelah log transaksi diproses.
API DataStream CDC MySQL
Untuk membaca dan menulis data menggunakan DataStream, gunakan konektor DataStream yang sesuai untuk terhubung ke Flink. Untuk petunjuk pengaturan konektor DataStream, lihat Cara menggunakan konektor DataStream.
Buat program API DataStream dan gunakan MySqlSource. Berikut ini contoh kode dan dependensi pom:
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
public class MySqlSourceExample {
public static void main(String[] args) throws Exception {
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("yourHostname")
.port(yourPort)
.databaseList("yourDatabaseName") // set database yang ditangkap
.tableList("yourDatabaseName.yourTableName") // set tabel yang ditangkap
.username("yourUsername")
.password("yourPassword")
.deserializer(new JsonDebeziumDeserializationSchema()) // mengonversi SourceRecord ke String JSON
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// aktifkan checkpoint
env.enableCheckpointing(3000);
env
.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
// set 4 tugas sumber paralel
.setParallelism(4)
.print().setParallelism(1); // gunakan paralelisme 1 untuk sink agar menjaga urutan pesan
env.execute("Print MySQL Snapshot + Binlog");
}
}<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-mysql</artifactId>
<version>${vvr.version}</version>
</dependency>Saat membuat MySqlSource, Anda harus menentukan parameter berikut:
Parameter | Deskripsi |
hostname | Alamat IP atau hostname database MySQL. |
port | Nomor port layanan database MySQL. |
databaseList | Nama database MySQL. Catatan Nama database mendukung ekspresi reguler untuk membaca data dari beberapa database. Anda dapat menggunakan |
username | Nama pengguna untuk layanan database MySQL. |
password | Kata sandi untuk layanan database MySQL. |
deserializer | Deserializer mengonversi catatan tipe SourceRecord ke tipe yang ditentukan. Nilai yang valid:
|
Dependensi pom harus menentukan parameter berikut:
${vvr.version} | Versi mesin Alibaba Cloud Realtime Compute for Apache Flink, misalnya: Catatan Merujuk pada versi yang ditampilkan di Maven, karena versi hotfix mungkin dirilis secara berkala tanpa pemberitahuan lainnya. |
${flink.version} | Versi Apache Flink, misalnya: Penting Gunakan versi Apache Flink yang sesuai dengan versi mesin Alibaba Cloud Realtime Compute for Apache Flink Anda untuk menghindari masalah ketidakcocokan selama eksekusi pekerjaan. Untuk pemetaan versi, lihat Mesin. |
FAQ
Untuk masalah umum yang dihadapi saat menggunakan tabel sumber CDC, lihat Masalah CDC.