Topik ini menjelaskan cara menggunakan konektor MySQL.
Informasi latar belakang
Konektor MySQL mendukung semua database yang kompatibel dengan protokol MySQL, seperti RDS MySQL, PolarDB for MySQL, OceanBase (mode MySQL), dan MySQL yang dikelola sendiri.
Saat menggunakan konektor MySQL untuk membaca dari OceanBase, pastikan binary logging (binlog) OceanBase diaktifkan dan dikonfigurasi dengan benar. Untuk informasi selengkapnya, lihat Operasi binary logging. Fitur ini berada dalam pratinjau publik. Kami menyarankan agar Anda mengevaluasinya secara menyeluruh dan menggunakannya dengan hati-hati.
Tabel berikut menjelaskan dukungan untuk konektor MySQL.
Kategori | Detail |
Jenis yang didukung | Tabel sumber, tabel dimensi, tabel sink, dan sumber data ingestion |
Mode runtime | Hanya mode streaming |
Format data | Tidak berlaku |
Metrik pemantauan spesifik | |
Jenis API | DataStream, SQL, dan YAML data ingestion |
Dukungan untuk memperbarui atau menghapus data tabel sink | Ya |
Fitur
Tabel sumber MySQL Change Data Capture (CDC) pertama-tama membaca seluruh data historis database, lalu beralih secara mulus ke pembacaan event binary log (binlog). Proses ini menjamin semantik tepat-sekali (exactly-once semantics), artinya tidak ada data yang terlewat atau diduplikasi, bahkan jika terjadi kegagalan. Tabel sumber CDC MySQL mendukung pembacaan data lengkap secara konkuren serta menerapkan pembacaan tanpa penguncian (lock-free) dan transfer data yang dapat dilanjutkan (resumable) menggunakan algoritma snapshot inkremental. Untuk informasi selengkapnya, lihat Tentang tabel sumber MySQL CDC.
Pemrosesan batch dan streaming terpadu: Membaca data lengkap dan inkremental tanpa perlu memelihara pipeline terpisah.
Pembacaan data lengkap secara konkuren: Meningkatkan kinerja secara horizontal.
Peralihan mulus dari pembacaan lengkap ke inkremental: Secara otomatis mengurangi skala untuk menghemat sumber daya komputasi.
Transfer data yang dapat dilanjutkan: Mendukung kelanjutan transfer data selama pembacaan data lengkap guna meningkatkan stabilitas.
Pembacaan tanpa penguncian: Membaca data lengkap tanpa mengganggu operasi bisnis online.
Pembacaan log cadangan: Mendukung pembacaan log cadangan dari RDS MySQL.
Penguraian binlog paralel: Mengurangi latensi baca dengan mengurai file binlog secara paralel.
Prasyarat
Sebelum menggunakan tabel sumber MySQL CDC, Anda harus mengonfigurasi database MySQL seperti yang dijelaskan dalam Konfigurasi MySQL. Konfigurasi berikut wajib dilakukan.
RDS MySQL
Lakukan probe jaringan dengan Realtime Compute for Apache Flink untuk memastikan konektivitas jaringan.
Versi MySQL yang didukung: 5.6, 5.7, dan 8.0.x.
Aktifkan binary logging (binlog). Ini diaktifkan secara default.
Atur format binlog ke ROW. Ini adalah format default.
Atur binlog_row_image ke FULL. Ini adalah pengaturan default.
Nonaktifkan Binary Log Transaction Compression. Fitur ini diperkenalkan di MySQL 8.0.20 dan dinonaktifkan secara default.
Pengguna MySQL telah dibuat dengan izin SELECT, SHOW DATABASES, REPLICATION SLAVE, dan REPLICATION CLIENT.
Buat database dan tabel MySQL. Untuk informasi selengkapnya, lihat Buat database dan akun untuk RDS MySQL. Kami menyarankan agar Anda menggunakan akun istimewa untuk membuat database MySQL guna menghindari kegagalan operasional akibat izin yang tidak mencukupi.
Konfigurasikan daftar putih IP. Untuk informasi selengkapnya, lihat Konfigurasi daftar putih untuk RDS MySQL.
PolarDB for MySQL
Lakukan probe jaringan dengan Realtime Compute for Apache Flink untuk memastikan konektivitas jaringan.
Versi MySQL yang didukung: 5.6, 5.7, dan 8.0.x.
Aktifkan binary logging (binlog). Ini dinonaktifkan secara default.
Atur format binlog ke ROW. Ini adalah format default.
Atur binlog_row_image ke FULL. Ini adalah pengaturan default.
Nonaktifkan Binary Log Transaction Compression. Fitur ini diperkenalkan di MySQL 8.0.20 dan dinonaktifkan secara default.
Anda telah membuat pengguna MySQL dan memberikan izin SELECT, SHOW DATABASES, REPLICATION SLAVE, dan REPLICATION CLIENT.
Buat database dan tabel MySQL. Untuk informasi selengkapnya, lihat Buat database dan akun untuk PolarDB for MySQL. Kami menyarankan agar Anda menggunakan akun istimewa untuk membuat database MySQL guna menghindari kegagalan operasional akibat izin yang tidak mencukupi.
Konfigurasikan daftar putih IP. Untuk informasi selengkapnya, lihat Konfigurasi daftar putih untuk PolarDB for MySQL.
Self-managed MySQL
Lakukan probe jaringan dengan Realtime Compute for Apache Flink untuk memastikan konektivitas jaringan.
Versi MySQL yang didukung: 5.6, 5.7, dan 8.0.x.
Aktifkan binary logging (binlog). Ini dinonaktifkan secara default.
Atur format binlog ke ROW. Format default adalah STATEMENT.
Atur binlog_row_image ke FULL. Ini adalah pengaturan default.
Nonaktifkan Binary Log Transaction Compression. Fitur ini diperkenalkan di MySQL 8.0.20 dan dinonaktifkan secara default.
Pengguna MySQL telah dibuat dan diberikan izin SELECT, SHOW DATABASES, REPLICATION SLAVE, dan REPLICATION CLIENT.
Buat database dan tabel MySQL. Untuk informasi selengkapnya, lihat Buat database dan akun untuk instans MySQL yang dikelola sendiri. Kami menyarankan agar Anda menggunakan akun istimewa untuk membuat database MySQL guna menghindari kegagalan operasional akibat izin yang tidak mencukupi.
Konfigurasikan daftar putih IP. Untuk informasi selengkapnya, lihat Konfigurasi daftar putih untuk instans MySQL yang dikelola sendiri.
Batasan
Batasan umum
Tabel sumber MySQL CDC tidak mendukung pendefinisian watermark.
Pada pekerjaan CREATE TABLE AS SELECT (CTAS) dan CREATE DATABASE AS SELECT (CDAS), tabel sumber MySQL CDC dapat menyinkronkan perubahan skema parsial. Untuk informasi selengkapnya tentang jenis perubahan yang didukung, lihat Kebijakan sinkronisasi evolusi skema.
Konektor MySQL CDC tidak mendukung Binary Log Transaction Compression. Oleh karena itu, saat menggunakan konektor MySQL CDC untuk mengonsumsi data inkremental, Anda harus memastikan fitur ini dinonaktifkan. Jika tidak, data inkremental mungkin gagal diambil.
RDS MySQL batasan
Kami tidak menyarankan membaca data dari database secondary atau replika read-only untuk RDS MySQL. Secara default, periode retensi binlog untuk instans ini singkat. Jika binlog kedaluwarsa dan dihapus, pekerjaan mungkin gagal mengonsumsi data binlog dan kemudian melaporkan error.
RDS MySQL secara default mengaktifkan sinkronisasi paralel antara database primary dan secondary, tetapi tidak menjamin konsistensi urutan transaksi. Hal ini dapat menyebabkan beberapa data terlewat selama alih bencana primary/secondary dan pemulihan checkpoint. Untuk mengatasi masalah ini, Anda dapat mengaktifkan opsi slave_preserve_commit_order secara manual di RDS MySQL.
PolarDB for MySQL batasan
Tabel sumber MySQL CDC tidak mendukung pembacaan dari Kluster Multi-master PolarDB for MySQL versi 1.0.19 atau lebih lama. Untuk informasi selengkapnya, lihat Apa itu Kluster Multi-master?. Binlog yang dihasilkan oleh kluster ini mungkin berisi ID tabel duplikat. Hal ini dapat menyebabkan kesalahan pemetaan skema pada tabel sumber CDC dan mengakibatkan kesalahan penguraian binlog.
Open-source MySQL batasan
Secara default, MySQL mempertahankan urutan transaksi selama replikasi binlog primary-replika. Jika replika MySQL mengaktifkan replikasi paralel (slave_parallel_workers > 1) tetapi tidak memiliki slave_preserve_commit_order=ON, urutan commit transaksinya mungkin berbeda dari database primary. Saat Flink CDC pulih dari checkpoint, data mungkin terlewat karena ketidakkonsistenan urutan ini. Kami menyarankan agar Anda mengatur slave_preserve_commit_order = ON pada replika MySQL atau mengatur slave_parallel_workers = 1. Perlu diperhatikan bahwa mengatur slave_parallel_workers ke 1 dapat mengurangi kinerja replikasi.
Catatan
Tabel sink
Jangan deklarasikan primary key auto-increment dalam pernyataan DDL. MySQL akan mengisi bidang ini secara otomatis saat menulis data.
Anda harus mendeklarasikan setidaknya satu bidang non-primary key. Jika tidak, akan terjadi error.
NOT ENFORCED dalam pernyataan DDL menunjukkan bahwa Flink tidak menegakkan pemeriksaan validitas pada primary key. Anda harus memastikan kebenaran dan integritas primary key. Untuk informasi selengkapnya, lihat Pemeriksaan Validitas.
Tabel dimensi
Untuk menggunakan indeks guna mempercepat kueri, urutan bidang dalam klausa JOIN harus sesuai dengan urutan definisi indeks. Ini dikenal sebagai aturan prefiks paling kiri (leftmost prefix rule). Misalnya, jika indeks berada pada
(a, b, c), kondisi JOIN harus berupaON t.a = x AND t.b = y.SQL yang dihasilkan Flink mungkin ditulis ulang oleh pengoptimal, sehingga mencegah penggunaan indeks selama kueri database aktual. Untuk memverifikasi apakah indeks digunakan, Anda dapat memeriksa rencana eksekusi (EXPLAIN) atau log kueri lambat di MySQL untuk melihat pernyataan SELECT aktual yang dieksekusi.
SQL
Anda dapat menggunakan konektor MySQL dalam pekerjaan SQL sebagai tabel sumber, tabel dimensi, atau tabel sink.
Sintaks
CREATE TEMPORARY TABLE mysqlcdc_source (
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
'connector' = 'mysql',
'hostname' = '<yourHostname>',
'port' = '3306',
'username' = '<yourUsername>',
'password' = '<yourPassword>',
'database-name' = '<yourDatabaseName>',
'table-name' = '<yourTableName>'
);Cara konektor menulis ke tabel sink: Untuk setiap record yang diterima, konektor membuat dan mengeksekusi satu pernyataan SQL. Pernyataan tepatnya bergantung pada struktur tabel:
Untuk tabel sink tanpa primary key, sistem membuat dan mengeksekusi pernyataan SQL berikut:
INSERT INTO table_name (column1, column2, ...) VALUES (value1, value2, ...);Untuk tabel hasil dengan primary key, sistem mengeksekusi pernyataan SQL berikut:
INSERT INTO table_name (column1, column2, ...) VALUES (value1, value2, ...) ON DUPLICATE KEY UPDATE column1 = VALUES(column1), column2 = VALUES(column2), ...;Catatan: Jika tabel fisik memiliki kendala indeks unik selain primary key, memasukkan dua record dengan primary key berbeda tetapi nilai identik pada kolom yang dicakup oleh indeks unik menyebabkan konflik indeks unik. Konflik ini memicu penimpaan data, yang mengakibatkan kehilangan data dalam output.
Jika Anda mendefinisikan primary key auto-increment di database MySQL, jangan deklarasikan kolom auto-increment dalam pernyataan DDL Flink. Database akan mengisi bidang ini secara otomatis selama penyisipan data. Konektor mendukung penulisan dan penghapusan data dengan kolom auto-increment tetapi tidak mendukung pembaruan.
DENGAN parameter
Umum
Parameter
Deskripsi
Wajib
Tipe data
Nilai default
Keterangan
connector
Jenis tabel.
Ya
STRING
Tidak ada
Saat digunakan sebagai tabel sumber, atur opsi ini ke
mysql-cdcataumysql. Keduanya setara. Saat digunakan sebagai tabel dimensi atau sink, atur opsi ini kemysql.hostname
Alamat IP atau hostname database MySQL.
Ya
STRING
Tidak ada
Kami menyarankan memasukkan alamat VPC.
CatatanJika database MySQL dan Realtime Compute for Apache Flink tidak berada dalam VPC yang sama, buat koneksi jaringan cross-VPC atau gunakan Internet untuk akses. Untuk informasi selengkapnya, lihat Kelola dan operasikan ruang kerja dan Bagaimana kluster Flink yang sepenuhnya dikelola mengakses Internet?.
username
Username untuk layanan database MySQL.
Ya
STRING
Tidak ada
Tidak ada.
password
Password untuk layanan database MySQL.
Ya
STRING
Tidak ada
Tidak ada.
database-name
Nama database MySQL.
Ya
STRING
Tidak ada
Saat digunakan sebagai tabel sumber, opsi ini mendukung ekspresi reguler untuk membaca data dari beberapa database.
Saat menggunakan ekspresi reguler, hindari penggunaan ^ dan $ untuk mencocokkan awal dan akhir string. Lihat kolom Keterangan untuk table-name untuk detailnya.
table-name
Nama tabel MySQL.
Ya
STRING
Tidak ada
Saat digunakan sebagai tabel sumber, opsi ini mendukung ekspresi reguler untuk membaca data dari beberapa tabel.
Saat membaca data dari beberapa tabel MySQL, kirimkan beberapa pernyataan CTAS sebagai satu pekerjaan tunggal. Hal ini menghindari pengaktifan beberapa pendengar binlog dan meningkatkan kinerja serta efisiensi. Untuk informasi selengkapnya, lihat Beberapa pernyataan CTAS: Kirimkan sebagai satu pekerjaan tunggal.
Saat menggunakan ekspresi reguler, hindari penggunaan ^ dan $ untuk mencocokkan awal dan akhir string. Lihat catatan di bawah untuk detailnya.
CatatanSaat tabel sumber MySQL CDC mencocokkan nama tabel, sistem menggabungkan database-name dan table-name yang Anda tentukan menjadi ekspresi reguler jalur lengkap menggunakan string \\. (Dalam versi VVR sebelum 8.0.1, karakter . digunakan.) Sistem kemudian menggunakan ekspresi reguler ini untuk mencocokkan nama tabel yang memenuhi syarat penuh di database MySQL.
Misalnya, jika Anda mengatur 'database-name'='db_.*' dan 'table-name'='tb_.+', konektor menggunakan ekspresi reguler db_.*\\.tb_.+ (atau db_.*.tb_.+ dalam versi sebelum 8.0.1) untuk mencocokkan nama tabel yang memenuhi syarat penuh dan menentukan tabel mana yang akan dibaca.
port
Nomor port layanan database MySQL.
Tidak
INTEGER
3306
Tidak ada.
Khusus sumber
Parameter
Deskripsi
Wajib
Tipe data
Nilai default
Keterangan
server-id
ID numerik untuk klien database.
Tidak
STRING
Nilai acak antara 5400 dan 6400 dihasilkan.
ID ini harus unik secara global dalam kluster MySQL. Kami menyarankan memberikan ID berbeda untuk setiap pekerjaan yang terhubung ke database yang sama.
Opsi ini juga mendukung rentang ID, seperti 5400-5408. Saat pembacaan inkremental diaktifkan, pembacaan konkuren didukung. Dalam kasus ini, kami menyarankan menentukan rentang ID agar setiap pembaca konkuren menggunakan ID berbeda. Untuk informasi selengkapnya, lihat Menggunakan server ID.
scan.incremental.snapshot.enabled
Menentukan apakah snapshot inkremental diaktifkan.
Tidak
BOOLEAN
true
Snapshot inkremental diaktifkan secara default. Snapshot inkremental adalah mekanisme baru untuk membaca snapshot data lengkap. Dibandingkan dengan metode snapshot lama, snapshot inkremental menawarkan beberapa keunggulan:
Pembacaan data lengkap dapat dilakukan secara paralel.
Pembacaan data lengkap mendukung checkpoint tingkat chunk.
Pembacaan data lengkap tidak memerlukan penguncian baca global (FLUSH TABLES WITH READ LOCK).
Jika Anda ingin sumber mendukung pembacaan konkuren, setiap pembaca konkuren memerlukan server ID unik. Oleh karena itu, server-id harus berupa rentang seperti 5400-6400, dan rentang tersebut harus minimal sebesar tingkat paralelisme.
CatatanItem konfigurasi ini dihapus di Ververica Runtime (VVR) 11.1 dan versi selanjutnya.
scan.incremental.snapshot.chunk.size
Ukuran setiap chunk dalam baris.
Tidak
INTEGER
8096
Saat pembacaan snapshot inkremental diaktifkan, tabel dibagi menjadi beberapa chunk untuk dibaca. Data chunk disimpan dalam memori hingga sepenuhnya dibaca.
Semakin sedikit baris dalam setiap chunk, semakin banyak total chunk dalam tabel. Meskipun hal ini mengurangi granularitas pemulihan kesalahan, hal ini dapat menyebabkan error Out Of Memory (OOM) dan penurunan throughput keseluruhan. Oleh karena itu, Anda perlu membuat pertimbangan dan mengatur ukuran chunk yang wajar.
scan.snapshot.fetch.size
Jumlah maksimum record yang diambil sekaligus saat membaca data tabel lengkap.
Tidak
INTEGER
1024
Tidak ada.
scan.startup.mode
Mode startup untuk konsumsi data.
Tidak
STRING
initial
Nilai yang valid:
initial (default): Memindai data historis lengkap terlebih dahulu, lalu membaca data binlog terbaru saat startup pertama kali.
latest-offset: Tidak memindai data historis saat startup pertama kali. Mulai membaca dari akhir binlog, artinya hanya membaca perubahan terbaru setelah konektor dimulai.
earliest-offset: Tidak memindai data historis. Mulai membaca dari offset binlog paling awal yang tersedia.
specific-offset: Tidak memindai data historis. Mulai dari offset binlog tertentu yang Anda tentukan. Anda dapat menentukan offset dengan mengonfigurasi scan.startup.specific-offset.file dan scan.startup.specific-offset.pos, atau hanya mengonfigurasi scan.startup.specific-offset.gtid-set untuk memulai dari set GTID tertentu.
timestamp: Tidak memindai data historis. Mulai membaca event binlog dari timestamp tertentu. Tentukan timestamp menggunakan scan.startup.timestamp-millis, dalam milidetik.
PentingSaat menggunakan earliest-offset, specific-offset, atau timestamp, pastikan skema tabel tidak berubah antara posisi konsumsi binlog yang ditentukan dan waktu startup pekerjaan untuk menghindari error akibat ketidakcocokan skema.
scan.startup.specific-offset.file
Nama file binlog untuk offset awal saat menggunakan mode startup offset spesifik.
Tidak
STRING
Tidak ada
Saat menggunakan konfigurasi ini, atur scan.startup.mode ke specific-offset. Contoh nama file:
mysql-bin.000003.scan.startup.specific-offset.pos
Offset dalam file binlog yang ditentukan untuk offset awal saat menggunakan mode startup offset spesifik.
Tidak
INTEGER
Tidak ada
Saat menggunakan konfigurasi ini, atur scan.startup.mode ke specific-offset.
scan.startup.specific-offset.gtid-set
Set GTID untuk offset awal saat menggunakan mode startup offset spesifik.
Tidak
STRING
Tidak ada
Saat menggunakan konfigurasi ini, atur scan.startup.mode ke specific-offset. Contoh set GTID:
24DA167-0C0C-11E8-8442-00059A3C7B00:1-19.scan.startup.timestamp-millis
Offset awal sebagai timestamp milidetik saat menggunakan mode startup timestamp.
Tidak
LONG
Tidak ada
Saat menggunakan konfigurasi ini, atur scan.startup.mode ke timestamp. Timestamp dalam milidetik.
PentingSaat menggunakan timestamp, MySQL CDC mencoba membaca event awal setiap file binlog untuk menentukan timestamp-nya dan menemukan file binlog yang sesuai. Pastikan file binlog untuk timestamp yang ditentukan belum dihapus dari database dan masih dapat dibaca.
server-time-zone
Zona waktu sesi yang digunakan oleh database.
Tidak
STRING
Jika Anda tidak menentukan opsi ini, sistem menggunakan zona waktu lingkungan runtime pekerjaan Flink sebagai zona waktu server database—zona waktu wilayah yang Anda pilih.
Contoh: Asia/Shanghai. Opsi ini mengontrol bagaimana tipe TIMESTAMP MySQL dikonversi ke tipe STRING. Untuk informasi selengkapnya, lihat Debezium temporal values.
debezium.min.row.count.to.stream.results
Gunakan mode pembacaan batch saat jumlah baris dalam tabel melebihi nilai ini.
Tidak
INTEGER
1000
Flink membaca data dari tabel sumber MySQL dengan cara berikut:
Baca lengkap: Memuat seluruh data tabel langsung ke memori. Ini cepat tetapi mengonsumsi memori sebanding dengan volume data. Jika tabel sumber sangat besar, hal ini dapat menyebabkan masalah OOM.
Baca batch: Membaca data secara batch, mengambil jumlah baris tetap per batch hingga semua data terbaca. Ini menghindari risiko OOM untuk tabel besar tetapi lebih lambat.
connect.timeout
Waktu maksimum menunggu koneksi ke server database MySQL hingga timeout sebelum mencoba lagi.
Tidak
DURATION
30s
Tidak ada.
connect.max-retries
Jumlah maksimum percobaan ulang setelah koneksi ke layanan database MySQL gagal.
Tidak
INTEGER
3
Tidak ada.
connection.pool.size
Ukuran kolam koneksi database.
Tidak
INTEGER
20
Kolam koneksi database menggunakan kembali koneksi untuk mengurangi jumlah koneksi database.
jdbc.properties.*
Opsi koneksi kustom dalam URL JDBC.
Tidak
STRING
Tidak ada
Anda dapat meneruskan opsi koneksi kustom. Misalnya, untuk menonaktifkan SSL, atur 'jdbc.properties.useSSL' = 'false'.
Untuk informasi selengkapnya tentang opsi koneksi yang didukung, lihat MySQL Configuration Properties.
debezium.*
Opsi Debezium kustom untuk membaca binlog.
Tidak
STRING
Tidak ada
Anda dapat meneruskan opsi Debezium kustom. Misalnya, gunakan 'debezium.event.deserialization.failure.handling.mode'='ignore' untuk menentukan cara menangani error penguraian.
heartbeat.interval
Interval di mana sumber menggunakan event heartbeat untuk memajukan offset binlog.
Tidak
DURATION
30s
Event heartbeat memajukan offset binlog di sumber. Ini berguna untuk tabel MySQL yang diperbarui secara lambat. Untuk tabel tersebut, offset binlog tidak maju secara otomatis. Event heartbeat mendorong offset binlog maju, mencegah masalah di mana offset binlog yang kedaluwarsa menyebabkan kegagalan pekerjaan dan memerlukan restart tanpa status.
scan.incremental.snapshot.chunk.key-column
Kolom yang digunakan untuk membagi chunk selama fase snapshot.
Lihat Keterangan.
STRING
Tidak ada
Wajib untuk tabel tanpa primary key. Kolom yang dipilih harus non-null (NOT NULL).
Opsional untuk tabel dengan primary key. Anda hanya dapat memilih satu kolom dari primary key.
rds.region-id
ID wilayah instans Alibaba Cloud RDS MySQL.
Wajib saat membaca log arsip dari OSS.
STRING
Tidak ada
Untuk ID wilayah, lihat Wilayah dan zona.
rds.access-key-id
ID AccessKey untuk akun Alibaba Cloud RDS MySQL.
Wajib saat membaca log arsip dari OSS.
STRING
Tidak ada
Untuk informasi selengkapnya, lihat Bagaimana cara melihat ID AccessKey dan rahasia AccessKey?.
PentingUntuk mencegah kebocoran informasi AccessKey Anda, kelola ID AccessKey Anda menggunakan manajemen rahasia. Untuk informasi selengkapnya, lihat Manajemen variabel.
rds.access-key-secret
Rahasia AccessKey untuk akun Alibaba Cloud RDS MySQL.
Wajib saat membaca log arsip dari OSS.
STRING
Tidak ada
Untuk informasi selengkapnya, lihat Bagaimana cara melihat ID AccessKey dan rahasia AccessKey?.
PentingUntuk mencegah kebocoran informasi AccessKey Anda, kelola rahasia AccessKey Anda menggunakan manajemen rahasia. Untuk informasi selengkapnya, lihat Manajemen variabel.
rds.db-instance-id
ID instans Alibaba Cloud RDS MySQL.
Wajib saat membaca log arsip dari OSS.
STRING
Tidak ada
Tidak ada.
rds.main-db-id
ID database utama instans Alibaba Cloud RDS MySQL.
Tidak
STRING
Tidak ada
Untuk informasi selengkapnya tentang cara mendapatkan ID database utama, lihat Cadangan log RDS MySQL.
Hanya didukung di VVR 8.0.7 dan versi selanjutnya.
rds.download.timeout
Timeout untuk mengunduh satu log arsip dari OSS.
Tidak
DURATION
60s
Tidak ada.
rds.endpoint
Titik akhir layanan untuk mengambil informasi binlog OSS.
Tidak
STRING
Tidak ada
Untuk nilai yang valid, lihat Titik akhir layanan.
Hanya didukung di VVR 8.0.8 dan versi selanjutnya.
scan.incremental.close-idle-reader.enabled
Menentukan apakah pembaca idle ditutup setelah fase snapshot berakhir.
Tidak
BOOLEAN
false
Hanya didukung di VVR 8.0.1 dan versi selanjutnya.
Konfigurasi ini hanya berlaku saat execution.checkpointing.checkpoints-after-tasks-finish.enabled diatur ke true.
scan.read-changelog-as-append-only.enabled
Menentukan apakah aliran changelog diubah menjadi aliran append-only.
Tidak
BOOLEAN
false
Nilai yang valid:
true: Mengubah semua jenis pesan (termasuk INSERT, DELETE, UPDATE_BEFORE, UPDATE_AFTER) menjadi pesan INSERT. Aktifkan hanya dalam kasus khusus, seperti mempertahankan pesan penghapusan tabel upstream.
false (default): Semua jenis pesan diteruskan tanpa perubahan.
CatatanHanya didukung di VVR 8.0.8 dan versi selanjutnya.
scan.only.deserialize.captured.tables.changelog.enabled
Selama fase inkremental, menentukan apakah hanya mendeserialisasi event perubahan untuk tabel yang ditentukan.
Tidak
BOOLEAN
Nilai default adalah false di VVR 8.x.
Nilai default adalah true di VVR 11.1 dan versi selanjutnya.
Nilai yang valid:
true: Hanya mendeserialisasi data perubahan untuk tabel target guna mempercepat pembacaan binlog.
false (default): Mendeserialisasi data perubahan untuk semua tabel.
CatatanHanya didukung di VVR 8.0.7 dan versi selanjutnya.
Di VVR 8.0.8 dan versi sebelumnya, ubah nama parameter ini menjadi debezium.scan.only.deserialize.captured.tables.changelog.enable.
scan.parse.online.schema.changes.enabled
Selama fase inkremental, menentukan apakah mencoba mengurai event DDL lockless RDS.
Tidak
BOOLEAN
false
Nilai yang valid:
true: Mengurai event DDL lockless RDS.
false (default): Tidak mengurai event DDL lockless RDS.
Ini adalah fitur eksperimental. Sebelum melakukan perubahan lockless online, ambil snapshot pekerjaan Flink untuk pemulihan.
CatatanHanya didukung di VVR 11.1 dan versi selanjutnya.
scan.incremental.snapshot.backfill.skip
Menentukan apakah melewati backfill selama fase pembacaan snapshot.
Tidak
BOOLEAN
false
Nilai yang valid:
true: Melewati backfill selama fase pembacaan snapshot.
false (default): Tidak melewati backfill selama fase pembacaan snapshot.
Jika backfill dilewati, perubahan pada tabel selama fase snapshot dibaca dalam fase inkremental berikutnya, bukan digabungkan ke dalam snapshot.
PentingMelewati backfill dapat menyebabkan ketidakkonsistenan data karena perubahan selama fase snapshot mungkin diputar ulang. Hanya semantik at-least-once yang dijamin.
CatatanHanya didukung di VVR 11.1 dan versi selanjutnya.
scan.incremental.snapshot.unbounded-chunk-first.enabled
Menentukan apakah mendistribusikan chunk tak terbatas terlebih dahulu selama fase pembacaan snapshot.
Tidak
BOOELEAN
false
Nilai yang valid:
true: Mendistribusikan chunk tak terbatas terlebih dahulu selama fase pembacaan snapshot.
false (default): Tidak mendistribusikan chunk tak terbatas terlebih dahulu selama fase pembacaan snapshot.
Ini adalah fitur eksperimental. Mengaktifkannya mengurangi risiko error OOM saat Pengelola Tugas menyinkronkan chunk terakhir selama fase snapshot. Kami menyarankan menambahkan ini sebelum startup pertama pekerjaan.
CatatanHanya didukung di VVR 11.1 dan versi selanjutnya.
binlog.session.network.timeout
Timeout jaringan untuk operasi baca/tulis koneksi binlog.
Tidak
DURATION
10m
Mengatur ini ke 0s menggunakan timeout default server MySQL.
CatatanHanya didukung di VVR 11.5 dan versi selanjutnya.
scan.rate-limit.records-per-second
Membatasi jumlah maksimum record yang dipancarkan per detik oleh sumber.
Tidak
LONG
Tidak ada
Berguna untuk membatasi pembacaan data. Batasan ini berlaku untuk fase lengkap dan inkremental.
Metrik
numRecordsOutPerSecondmencerminkan jumlah record yang dipancarkan per detik di seluruh aliran data. Sesuaikan parameter ini berdasarkan metrik tersebut.Selama pembacaan lengkap, kurangi jumlah baris per batch dengan menurunkan nilai
scan.incremental.snapshot.chunk.size.CatatanHanya didukung di VVR 11.5 dan versi selanjutnya.
Khusus tabel dimensi
Parameter
Deskripsi
Wajib
Tipe data
Nilai default
Keterangan
url
URL JDBC MySQL.
Tidak
STRING
Tidak ada
Format URL adalah
jdbc:mysql://<endpoint>:<port>/<database_name>.lookup.max-retries
Jumlah maksimum percobaan ulang setelah pembacaan data gagal.
Tidak
INTEGER
3
Hanya didukung di VVR 6.0.7 dan versi selanjutnya.
lookup.cache.strategy
Kebijakan cache.
Tidak
STRING
Tidak ada
Mendukung tiga kebijakan cache: None, LRU, dan ALL. Untuk informasi selengkapnya, lihat Informasi latar belakang.
CatatanSaat menggunakan kebijakan cache LRU, Anda juga harus mengonfigurasi opsi lookup.cache.max-rows.
lookup.cache.max-rows
Jumlah maksimum baris yang dicache.
Tidak
INTEGER
100000
Jika Anda memilih kebijakan cache least recently used, Anda harus menentukan ukuran cache.
Opsional saat memilih kebijakan cache ALL.
lookup.cache.ttl
Waktu hidup (TTL) cache.
Tidak
DURATION
10 s
Konfigurasi lookup.cache.ttl bergantung pada lookup.cache.strategy:
Jika lookup.cache.strategy diatur ke None, lookup.cache.ttl opsional dan berarti cache tidak pernah kedaluwarsa.
Jika lookup.cache.strategy diatur ke LRU, lookup.cache.ttl adalah TTL cache. Secara default, tidak pernah kedaluwarsa.
Jika lookup.cache.strategy diatur ke ALL, lookup.cache.ttl adalah waktu reload cache. Secara default, tidak direload.
Tentukan waktu dalam format seperti 1min atau 10s.
lookup.max-join-rows
Jumlah maksimum hasil yang dikembalikan saat mengkueri tabel dimensi untuk setiap baris di tabel utama.
Tidak
INTEGER
1024
Tidak ada.
lookup.filter-push-down.enabled
Menentukan apakah mengaktifkan filter pushdown untuk tabel dimensi.
Tidak
BOOLEAN
false
Nilai yang valid:
true: Mengaktifkan filter pushdown untuk tabel dimensi. Saat memuat data dari tabel database MySQL, tabel dimensi memfilter data terlebih dahulu berdasarkan kondisi yang ditetapkan dalam pekerjaan SQL.
false (default): Menonaktifkan filter pushdown untuk tabel dimensi. Saat memuat data dari tabel database MySQL, tabel dimensi memuat semua data.
CatatanHanya didukung di VVR 8.0.7 dan versi selanjutnya.
PentingFilter pushdown harus diaktifkan hanya saat tabel Flink digunakan sebagai tabel dimensi. Tabel sumber MySQL tidak mendukung pengaktifan filter pushdown. Jika tabel Flink digunakan sebagai tabel sumber dan dimensi, dan filter pushdown diaktifkan untuk tabel dimensi, atur secara eksplisit opsi ini ke false untuk tabel sumber menggunakan SQL hints. Jika tidak, pekerjaan mungkin berjalan tidak normal.
Khusus sink
Parameter
Deskripsi
Wajib
Tipe data
Nilai default
Keterangan
url
URL JDBC MySQL.
Tidak
STRING
Tidak ada
Format URL adalah
jdbc:mysql://<endpoint>:<port>/<database_name>.sink.max-retries
Jumlah maksimum percobaan ulang setelah penulisan data gagal.
Tidak
INTEGER
3
Tidak ada.
sink.buffer-flush.batch-size
Jumlah record yang ditulis dalam satu batch.
Tidak
INTEGER
4096
Tidak ada.
sink.buffer-flush.max-rows
Jumlah record data yang dibuffer di memori.
Tidak
INTEGER
10000
Opsi ini hanya berlaku setelah primary key ditentukan.
sink.buffer-flush.interval
Interval waktu untuk flushing buffer. Jika data dalam buffer tidak memenuhi kondisi output setelah menunggu waktu yang ditentukan, sistem secara otomatis mengeluarkan semua data yang dibuffer.
Tidak
DURATION
1s
Tidak ada.
sink.ignore-delete
Menentukan apakah mengabaikan operasi DELETE.
Tidak
BOOLEAN
false
Saat aliran yang dihasilkan oleh Flink SQL mencakup record delete atau update-before, pembaruan simultan ke bidang berbeda dari tabel yang sama oleh beberapa tugas output dapat menyebabkan ketidakkonsistenan data.
Misalnya, setelah record dihapus, tugas lain hanya memperbarui beberapa bidang. Bidang yang tidak diperbarui menjadi null atau nilai default-nya, menyebabkan error data.
Atur sink.ignore-delete ke true untuk mengabaikan operasi DELETE dan UPDATE_BEFORE upstream dan menghindari masalah tersebut.
CatatanUPDATE_BEFORE adalah bagian dari mekanisme retraksi Flink, digunakan untuk "menarik kembali" nilai lama dalam operasi pembaruan.
Saat ignoreDelete = true, semua record DELETE dan UPDATE_BEFORE dilewati. Hanya record INSERT dan UPDATE_AFTER yang diproses.
sink.ignore-null-when-update
Saat memperbarui data, menentukan apakah mengatur bidang yang sesuai ke null atau melewati pembaruan jika nilai bidang input adalah null.
Tidak
BOOLEAN
false
Nilai yang valid:
true: Melewati pembaruan bidang. Hanya didukung saat tabel Flink memiliki primary key. Saat diatur ke true:
Di VVR 8.0.6 dan versi sebelumnya, eksekusi batch tidak didukung untuk menulis data ke tabel sink.
Di VVR 8.0.7 dan versi selanjutnya, eksekusi batch didukung untuk menulis data ke tabel sink.
Meskipun penulisan batch meningkatkan efisiensi penulisan dan throughput keseluruhan, hal ini memperkenalkan latensi data dan risiko OOM. Seimbangkan pertimbangan ini berdasarkan skenario bisnis Anda.
false: Mengatur bidang ke null.
CatatanHanya didukung di VVR 8.0.5 dan versi selanjutnya.
Pemetaan tipe
Tabel sumber CDC
Tipe bidang MySQL CDC
Tipe bidang Flink
TINYINT
TINYINT
SMALLINT
SMALLINT
TINYINT UNSIGNED
TINYINT UNSIGNED ZEROFILL
INT
INT
MEDIUMINT
SMALLINT UNSIGNED
SMALLINT UNSIGNED ZEROFILL
BIGINT
BIGINT
INT UNSIGNED
INT UNSIGNED ZEROFILL
MEDIUMINT UNSIGNED
MEDIUMINT UNSIGNED ZEROFILL
BIGINT UNSIGNED
DECIMAL(20, 0)
BIGINT UNSIGNED ZEROFILL
SERIAL
FLOAT [UNSIGNED] [ZEROFILL]
FLOAT
DOUBLE [UNSIGNED] [ZEROFILL]
DOUBLE
DOUBLE PRECISION [UNSIGNED] [ZEROFILL]
REAL [UNSIGNED] [ZEROFILL]
NUMERIC(p, s) [UNSIGNED] [ZEROFILL]
DECIMAL(p, s)
DECIMAL(p, s) [UNSIGNED] [ZEROFILL]
BOOLEAN
BOOLEAN
TINYINT(1)
DATE
DATE
TIME [(p)]
TIME [(p)] [WITHOUT TIME ZONE]
DATETIME [(p)]
TIMESTAMP [(p)] [WITHOUT TIME ZONE]
TIMESTAMP [(p)]
TIMESTAMP [(p)]
TIMESTAMP [(p)] WITH LOCAL TIME ZONE
CHAR(n)
STRING
VARCHAR(n)
TEXT
BINARY
BYTES
VARBINARY
BLOB
PentingKami menyarankan agar Anda tidak menggunakan tipe TINYINT(1) di MySQL untuk menyimpan nilai selain 0 dan 1. Saat property-version=0, tabel sumber MySQL CDC memetakan TINYINT(1) ke tipe BOOLEAN Flink secara default. Hal ini dapat menyebabkan ketidakakuratan data. Untuk menggunakan TINYINT(1) guna menyimpan nilai selain 0 dan 1, lihat opsi konfigurasi catalog.table.treat-tinyint1-as-boolean.
Tabel dimensi dan tabel sink
Tipe bidang MySQL
Tipe bidang Flink
TINYINT
TINYINT
SMALLINT
SMALLINT
TINYINT UNSIGNED
INT
INT
MEDIUMINT
SMALLINT UNSIGNED
BIGINT
BIGINT
INT UNSIGNED
BIGINT UNSIGNED
DECIMAL(20, 0)
FLOAT
FLOAT
DOUBLE
DOUBLE
DOUBLE PRECISION
NUMERIC(p, s)
DECIMAL(p, s)
Catatanp harus ≤ 38.
DECIMAL(p, s)
BOOLEAN
BOOLEAN
TINYINT(1)
DATE
DATE
TIME [(p)]
TIME [(p)] [WITHOUT TIME ZONE]
DATETIME [(p)]
TIMESTAMP [(p)] [WITHOUT TIME ZONE]
TIMESTAMP [(p)]
CHAR(n)
CHAR(n)
VARCHAR(n)
VARCHAR(n)
BIT(n)
BINARY(⌈n/8⌉)
BINARY(n)
BINARY(n)
VARBINARY(N)
VARBINARY(N)
TINYTEXT
STRING
TEXT
MEDIUMTEXT
LONGTEXT
TINYBLOB
BYTES
PentingFlink mendukung record BLOB MySQL dengan ukuran maksimum 2.147.483.647 (2^31 - 1).
BLOB
MEDIUMBLOB
LONGBLOB
Data ingestion
Anda dapat menggunakan konektor MySQL sebagai sumber data dalam pekerjaan YAML data ingestion.
Sintaks
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: <username>
password: <password>
tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
server-id: 5401-5404
sink:
type: xxxItem Konfigurasi
Parameter | Deskripsi | Wajib | Tipe data | Nilai default | Keterangan |
type | Jenis sumber data. | Ya | STRING | Tidak ada | Atur opsi ini ke mysql. |
name | Nama sumber data. | Tidak | STRING | Tidak ada | Tidak ada. |
hostname | Alamat IP atau hostname database MySQL. | Ya | STRING | Tidak ada | Kami menyarankan memasukkan alamat Virtual Private Cloud (VPC). Catatan Jika database MySQL dan Realtime Compute for Apache Flink Anda tidak berada dalam VPC yang sama, buat koneksi jaringan cross-VPC atau gunakan Internet untuk akses. Untuk informasi selengkapnya, lihat Kelola dan operasikan ruang kerja dan Bagaimana kluster Flink yang sepenuhnya dikelola mengakses Internet?. |
username | Username untuk layanan database MySQL. | Ya | STRING | Tidak ada | Tidak ada. |
password | Password untuk layanan database MySQL. | Ya | STRING | Tidak ada | Tidak ada. |
tables | Tabel data MySQL yang akan disinkronkan. | Ya | STRING | Tidak ada |
Catatan
|
tables.exclude | Tabel yang dikecualikan dari sinkronisasi. | Tidak | STRING | Tidak ada |
Catatan Titik memisahkan nama database dan nama tabel. Untuk mencocokkan karakter apa pun dengan titik, escape dengan backslash. Contoh: db0.\.*, db1.user_table_[0-9]+, db[1-2].[app|web]order_\.*. |
port | Nomor port layanan database MySQL. | Tidak | INTEGER | 3306 | Tidak ada. |
schema-change.enabled | Menentukan apakah mengirim event perubahan skema. | Tidak | BOOLEAN | true | Tidak ada. |
server-id | ID numerik atau rentang yang digunakan oleh klien database untuk sinkronisasi. | Tidak | STRING | Nilai acak antara 5400 dan 6400 dihasilkan. | ID ini harus unik secara global dalam kluster MySQL. Kami menyarankan memberikan ID berbeda untuk setiap pekerjaan yang terhubung ke database yang sama. Opsi ini juga mendukung rentang ID, seperti 5400-5408. Saat pembacaan inkremental diaktifkan, pembacaan konkuren didukung. Dalam kasus ini, kami menyarankan menentukan rentang ID agar setiap pembaca konkuren menggunakan ID berbeda. |
jdbc.properties.* | Parameter koneksi kustom dalam URL JDBC. | Tidak | STRING | Tidak ada | Anda dapat meneruskan parameter koneksi kustom. Misalnya, untuk menonaktifkan SSL, atur 'jdbc.properties.useSSL' = 'false'. Untuk informasi selengkapnya tentang parameter koneksi yang didukung, lihat MySQL Configuration Properties. |
debezium.* | Parameter Debezium kustom untuk membaca log biner. | Tidak | STRING | Tidak ada | Anda dapat meneruskan parameter Debezium kustom. Misalnya, gunakan 'debezium.event.deserialization.failure.handling.mode'='ignore' untuk menentukan cara menangani error penguraian. |
scan.incremental.snapshot.chunk.size | Ukuran setiap chunk dalam baris. | Tidak | INTEGER | 8096 | Tabel MySQL dibagi menjadi beberapa chunk untuk dibaca. Data chunk disimpan dalam memori hingga sepenuhnya dibaca. Semakin sedikit baris dalam setiap chunk, semakin banyak total chunk dalam tabel. Meskipun hal ini mengurangi granularitas pemulihan kesalahan, hal ini dapat menyebabkan masalah kehabisan memori (OOM) dan mengurangi throughput keseluruhan. Oleh karena itu, Anda perlu menyeimbangkan faktor-faktor ini dan mengatur ukuran chunk yang sesuai. |
scan.snapshot.fetch.size | Jumlah maksimum record yang diambil sekaligus saat membaca data tabel lengkap. | Tidak | INTEGER | 1024 | Tidak ada. |
scan.startup.mode | Mode startup untuk konsumsi data. | Tidak | STRING | initial | Nilai yang valid:
Penting Untuk earliest-offset, specific-offset, dan timestamp, jika skema tabel berbeda antara waktu startup dan waktu offset awal yang ditentukan, pekerjaan gagal karena ketidakcocokan skema. Dengan kata lain, saat menggunakan ketiga mode startup ini, pastikan skema tabel tidak berubah antara posisi konsumsi binlog yang ditentukan dan waktu startup pekerjaan. |
scan.startup.specific-offset.file | Nama file binlog untuk offset awal saat menggunakan mode startup offset spesifik. | Tidak | STRING | Tidak ada | Saat menggunakan konfigurasi ini, atur scan.startup.mode ke specific-offset. Contoh nama file: |
scan.startup.specific-offset.pos | Offset dalam file binlog yang ditentukan untuk offset awal saat menggunakan mode startup offset spesifik. | Tidak | INTEGER | Tidak ada | Saat menggunakan konfigurasi ini, atur scan.startup.mode ke specific-offset. |
scan.startup.specific-offset.gtid-set | Set GTID untuk offset awal saat menggunakan mode startup offset spesifik. | Tidak | STRING | Tidak ada | Saat menggunakan konfigurasi ini, atur scan.startup.mode ke specific-offset. Contoh set GTID: |
scan.startup.timestamp-millis | Offset awal sebagai timestamp milidetik saat menggunakan mode startup timestamp. | Tidak | LONG | Tidak ada | Saat menggunakan konfigurasi ini, atur scan.startup.mode ke timestamp. Timestamp dalam milidetik. Penting Saat menggunakan timestamp, MySQL CDC mencoba membaca event awal setiap file binlog untuk menentukan timestamp-nya dan menemukan file binlog yang sesuai. Pastikan file binlog untuk timestamp yang ditentukan belum dihapus dari database dan masih dapat dibaca. |
server-time-zone | Zona waktu sesi yang digunakan oleh database. | Tidak | STRING | Jika Anda tidak menentukan opsi ini, sistem menggunakan zona waktu lingkungan runtime pekerjaan Flink sebagai zona waktu server database—zona waktu wilayah yang Anda pilih. | Contoh: Asia/Shanghai. Opsi ini mengontrol bagaimana tipe TIMESTAMP MySQL dikonversi ke tipe STRING. Untuk informasi selengkapnya, lihat Debezium temporal values. |
scan.startup.specific-offset.skip-events | Jumlah event binlog yang dilewati saat membaca dari offset tertentu. | Tidak | INTEGER | Tidak ada | Saat menggunakan konfigurasi ini, atur scan.startup.mode ke specific-offset. |
scan.startup.specific-offset.skip-rows | Jumlah perubahan baris yang dilewati saat membaca dari offset tertentu. Satu event binlog mungkin sesuai dengan beberapa perubahan baris. | Tidak | INTEGER | Tidak ada | Saat menggunakan konfigurasi ini, atur scan.startup.mode ke specific-offset. |
connect.timeout | Waktu maksimum menunggu koneksi ke server database MySQL hingga timeout sebelum mencoba lagi. | Tidak | DURATION | 30s | Tidak ada. |
connect.max-retries | Jumlah maksimum percobaan ulang setelah koneksi ke layanan database MySQL gagal. | Tidak | INTEGER | 3 | Tidak ada. |
connection.pool.size | Ukuran kolam koneksi database. | Tidak | INTEGER | 20 | Kolam koneksi database menggunakan kembali koneksi untuk mengurangi jumlah koneksi database. |
heartbeat.interval | Interval di mana sumber menggunakan event heartbeat untuk memajukan offset binlog. | Tidak | DURATION | 30s | Event heartbeat memajukan offset binlog di sumber. Ini berguna untuk tabel MySQL yang diperbarui secara lambat. Untuk tabel tersebut, offset binlog tidak maju secara otomatis. Event heartbeat mendorong offset binlog maju, mencegah masalah di mana offset binlog yang kedaluwarsa menyebabkan kegagalan pekerjaan dan memerlukan restart tanpa status. |
scan.incremental.snapshot.chunk.key-column | Kolom yang digunakan untuk membagi chunk selama fase snapshot. | Tidak. | STRING | Tidak ada | Anda hanya dapat memilih satu kolom dari primary key. |
rds.region-id | ID wilayah instans Alibaba Cloud RDS MySQL. | Wajib saat membaca log arsip dari OSS. | STRING | Tidak ada | Untuk ID wilayah, lihat Wilayah dan zona. |
rds.access-key-id | ID AccessKey untuk akun Alibaba Cloud RDS MySQL. | Wajib saat membaca log arsip dari OSS. | STRING | Tidak ada | Untuk informasi selengkapnya, lihat Bagaimana cara melihat ID AccessKey dan rahasia AccessKey?. Penting Untuk mencegah kebocoran informasi AccessKey Anda, kelola ID AccessKey Anda menggunakan manajemen rahasia. Untuk informasi selengkapnya, lihat Manajemen variabel. |
rds.access-key-secret | Rahasia AccessKey untuk akun Alibaba Cloud RDS MySQL. | Wajib saat membaca log arsip dari OSS. | STRING | Tidak ada | Untuk informasi selengkapnya, lihat Bagaimana cara melihat ID AccessKey dan rahasia AccessKey?. Penting Untuk mencegah kebocoran informasi AccessKey Anda, kelola rahasia AccessKey Anda menggunakan manajemen rahasia. Untuk informasi selengkapnya, lihat Manajemen variabel. |
rds.db-instance-id | ID instans Alibaba Cloud RDS MySQL. | Wajib saat membaca log arsip dari OSS. | STRING | Tidak ada | Tidak ada. |
rds.main-db-id | ID database utama instans Alibaba Cloud RDS MySQL. | Tidak | STRING | Tidak ada | Untuk informasi selengkapnya tentang cara mendapatkan ID database utama, lihat Cadangan log RDS MySQL. |
rds.download.timeout | Timeout untuk mengunduh satu log arsip dari OSS. | Tidak | DURATION | 60s | Tidak ada. |
rds.endpoint | Titik akhir layanan untuk mengambil informasi binlog OSS. | Tidak | STRING | Tidak ada | Untuk nilai yang valid, lihat Titik akhir layanan. |
rds.binlog-directory-prefix | Awalan direktori untuk menyimpan file binlog. | Tidak | STRING | rds-binlog- | Tidak ada. |
rds.use-intranet-link | Menentukan apakah menggunakan jaringan internal untuk mengunduh file binlog. | Tidak | BOOLEAN | true | Tidak ada. |
rds.binlog-directories-parent-path | Jalur mutlak direktori induk untuk menyimpan file binlog. | Tidak | STRING | Tidak ada | Tidak ada. |
chunk-meta.group.size | Ukuran metadata chunk. | Tidak | INTEGER | 1000 | Jika metadata melebihi ukuran ini, metadata dikirimkan dalam beberapa bagian. |
chunk-key.even-distribution.factor.lower-bound | Batas bawah faktor distribusi chunk untuk sharding merata. | Tidak | DOUBLE | 0.05 | Faktor distribusi chunk yang kurang dari nilai ini menghasilkan sharding tidak merata. Faktor distribusi chunk = (MAX(chunk-key) - MIN(chunk-key) + 1) / Jumlah total baris. |
chunk-key.even-distribution.factor.upper-bound | Batas atas faktor distribusi chunk untuk sharding merata. | Tidak | DOUBLE | 1000.0 | Faktor distribusi chunk yang lebih besar dari nilai ini menghasilkan sharding tidak merata. Faktor distribusi chunk = (MAX(chunk-key) - MIN(chunk-key) + 1) / Jumlah total baris. |
scan.incremental.close-idle-reader.enabled | Menentukan apakah pembaca idle ditutup setelah fase snapshot berakhir. | Tidak | BOOLEAN | false | Agar konfigurasi ini berlaku, atur |
scan.only.deserialize.captured.tables.changelog.enabled | Selama fase inkremental, menentukan apakah hanya mendeserialisasi event perubahan untuk tabel yang ditentukan. | Tidak | BOOLEAN |
| Nilai yang valid:
|
scan.parallel-deserialize-changelog.enabled | Selama fase inkremental, menentukan apakah menggunakan multithreading untuk mengurai event perubahan. | Tidak | BOOLEAN | false | Nilai yang valid:
Catatan Hanya didukung di VVR 8.0.11 dan versi selanjutnya. |
scan.parallel-deserialize-changelog.handler.size | Jumlah handler event saat menggunakan multithreading untuk mengurai event perubahan. | Tidak | INTEGER | 2 | Catatan Hanya didukung di VVR 8.0.11 dan versi selanjutnya. |
metadata-column.include-list | Kolom metadata yang diteruskan ke downstream. | Tidak | STRING | Tidak ada | Metadata yang tersedia meliputi Catatan Konektor MySQL CDC YAML tidak perlu dan tidak mendukung penambahan kolom metadata nama database, nama tabel, dan Penting Kolom metadata
|
scan.newly-added-table.enabled | Saat memulai ulang dari checkpoint, menentukan apakah menyinkronkan tabel yang baru ditambahkan yang tidak cocok pada proses sebelumnya atau menghapus tabel yang saat ini tidak cocok yang disimpan dalam state. | Tidak | BOOLEAN | false | Ini berlaku saat memulai ulang dari checkpoint atau titik simpan. |
scan.binlog.newly-added-table.enabled | Selama fase inkremental, menentukan apakah mengirim data dari tabel yang baru ditambahkan yang cocok dengan pola. | Tidak | BOOLEAN | false | Ini tidak dapat diaktifkan secara bersamaan dengan |
scan.incremental.snapshot.chunk.key-column | Tentukan kolom untuk tabel tertentu yang akan digunakan sebagai kunci pemisahan chunk selama fase snapshot. | Tidak | STRING | Tidak ada |
|
scan.parse.online.schema.changes.enabled | Selama fase inkremental, menentukan apakah mencoba mengurai event DDL lockless RDS. | Tidak | BOOLEAN | false | Nilai yang valid:
Ini adalah fitur eksperimental. Sebelum melakukan perubahan lockless online, ambil snapshot pekerjaan Flink untuk pemulihan. Catatan Hanya didukung di VVR 11.0 dan versi selanjutnya. |
scan.incremental.snapshot.backfill.skip | Menentukan apakah melewati backfill selama fase pembacaan snapshot. | Tidak | BOOLEAN | false | Nilai yang valid:
Jika backfill dilewati, perubahan pada tabel selama fase snapshot dibaca dalam fase inkremental berikutnya, bukan digabungkan ke dalam snapshot. Penting Melewati backfill dapat menyebabkan ketidakkonsistenan data karena perubahan selama fase snapshot mungkin diputar ulang. Hanya semantik at-least-once yang dijamin. Catatan Hanya didukung di VVR 11.1 dan versi selanjutnya. |
treat-tinyint1-as-boolean.enabled | Menentukan apakah memperlakukan tipe TINYINT(1) sebagai tipe Boolean. | Tidak | BOOLEAN | true | Nilai yang valid:
|
treat-timestamp-as-datetime-enabled | Menentukan apakah memproses TIMESTAMP sebagai DATETIME. | Tidak | BOOLEAN | false | Nilai yang valid:
TIMESTAMP MySQL menyimpan waktu UTC dan dipengaruhi oleh zona waktu. DATETIME MySQL menyimpan waktu literal dan tidak dipengaruhi oleh zona waktu. Mengaktifkan ini mengonversi data TIMESTAMP MySQL ke DATETIME berdasarkan server-time-zone. |
include-comments.enabled | Menentukan apakah menyinkronkan komentar tabel dan kolom. | Tidak | BOOELEAN | false | Nilai yang valid:
Mengaktifkan ini meningkatkan penggunaan memori pekerjaan. |
scan.incremental.snapshot.unbounded-chunk-first.enabled | Menentukan apakah mendistribusikan chunk tak terbatas terlebih dahulu selama fase pembacaan snapshot. | Tidak | BOOELEAN | false | Nilai yang valid:
Ini adalah fitur eksperimental. Mengaktifkannya mengurangi risiko error OOM saat Pengelola Tugas menyinkronkan chunk terakhir selama fase snapshot. Kami menyarankan menambahkan ini sebelum startup pertama pekerjaan. Catatan Hanya didukung di VVR 11.1 dan versi selanjutnya. |
binlog.session.network.timeout | Timeout jaringan untuk koneksi binlog. | Tidak | DURATION | 10m | Mengatur ini ke 0s menggunakan timeout default server MySQL. Catatan Hanya didukung di VVR 11.5 dan versi selanjutnya. |
scan.rate-limit.records-per-second | Membatasi jumlah maksimum record yang dipancarkan per detik oleh sumber. | Tidak | LONG | Tidak ada | Berguna untuk membatasi pembacaan data. Batasan ini berlaku untuk fase lengkap dan inkremental. Metrik Selama pembacaan lengkap, kurangi jumlah baris per batch dengan menurunkan nilai Catatan Hanya didukung di VVR 11.5 dan versi selanjutnya. |
Pemetaan tipe
Tabel berikut menunjukkan pemetaan tipe untuk data ingestion.
Tipe bidang MySQL CDC | Tipe bidang CDC |
TINYINT(n) | TINYINT |
SMALLINT | SMALLINT |
TINYINT UNSIGNED | |
TINYINT UNSIGNED ZEROFILL | |
YEAR | |
INT | INT |
MEDIUMINT | |
MEDIUMINT UNSIGNED | |
MEDIUMINT UNSIGNED ZEROFILL | |
SMALLINT UNSIGNED | |
SMALLINT UNSIGNED ZEROFILL | |
BIGINT | BIGINT |
INT UNSIGNED | |
INT UNSIGNED ZEROFILL | |
BIGINT UNSIGNED | DECIMAL(20, 0) |
BIGINT UNSIGNED ZEROFILL | |
SERIAL | |
FLOAT [UNSIGNED] [ZEROFILL] | FLOAT |
DOUBLE [UNSIGNED] [ZEROFILL] | DOUBLE |
DOUBLE PRECISION [UNSIGNED] [ZEROFILL] | |
REAL [UNSIGNED] [ZEROFILL] | |
NUMERIC(p, s) [UNSIGNED] [ZEROFILL] where p ≤ 38 | DECIMAL(p, s) |
DECIMAL(p, s) [UNSIGNED] [ZEROFILL] where p ≤ 38 | |
FIXED(p, s) [UNSIGNED] [ZEROFILL] where p ≤ 38 | |
BOOLEAN | BOOLEAN |
BIT(1) | |
TINYINT(1) | |
DATE | DATE |
TIME [(p)] | TIME [(p)] |
DATETIME [(p)] | TIMESTAMP [(p)] |
TIMESTAMP [(p)] | Pemetaan bidang bergantung pada opsi
|
CHAR(n) | CHAR(n) |
VARCHAR(n) | VARCHAR(n) |
BIT(n) | BINARY(⌈(n + 7) / 8⌉) |
BINARY(n) | BINARY(n) |
VARBINARY(N) | VARBINARY(N) |
NUMERIC(p, s) [UNSIGNED] [ZEROFILL] where 38 < p ≤ 65 | STRING Catatan Di MySQL, presisi desimal dapat mencapai hingga 65. Flink membatasi presisi desimal hingga 38. Jika Anda mendefinisikan kolom desimal dengan presisi > 38, petakan ke string untuk menghindari kehilangan presisi. |
DECIMAL(p, s) [UNSIGNED] [ZEROFILL] where 38 < p ≤ 65 | |
FIXED(p, s) [UNSIGNED] [ZEROFILL] where 38 < p ≤ 65 | |
TINYTEXT | STRING |
TEXT | |
MEDIUMTEXT | |
LONGTEXT | |
ENUM | |
JSON | STRING Catatan Tipe data JSON dikonversi ke string berformat JSON di Flink. |
GEOMETRY | STRING Catatan Tipe data spasial MySQL dikonversi ke string dengan format JSON tetap. Untuk informasi selengkapnya, lihat pemetaan tipe data spasial MySQL. |
POINT | |
LINESTRING | |
POLYGON | |
MULTIPOINT | |
MULTILINESTRING | |
MULTIPOLYGON | |
GEOMETRYCOLLECTION | |
TINYBLOB | BYTES Catatan MySQL mendukung BLOB dengan panjang maksimum 2.147.483.647 (2**31-1) byte. |
BLOB | |
MEDIUMBLOB | |
LONGBLOB |
Contoh
Tabel sumber CDC
CREATE TEMPORARY TABLE mysqlcdc_source ( order_id INT, order_date TIMESTAMP(0), customer_name STRING, price DECIMAL(10, 5), product_id INT, order_status BOOLEAN, PRIMARY KEY(order_id) NOT ENFORCED ) WITH ( 'connector' = 'mysql', 'hostname' = '<yourHostname>', 'port' = '3306', 'username' = '<yourUsername>', 'password' = '<yourPassword>', 'database-name' = '<yourDatabaseName>', 'table-name' = '<yourTableName>' ); CREATE TEMPORARY TABLE blackhole_sink( order_id INT, customer_name STRING ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT order_id, customer_name FROM mysqlcdc_source;Tabel dimensi
CREATE TEMPORARY TABLE datagen_source( a INT, b BIGINT, c STRING, `proctime` AS PROCTIME() ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE mysql_dim ( a INT, b VARCHAR, c VARCHAR ) WITH ( 'connector' = 'mysql', 'hostname' = '<yourHostname>', 'port' = '3306', 'username' = '<yourUsername>', 'password' = '<yourPassword>', 'database-name' = '<yourDatabaseName>', 'table-name' = '<yourTableName>' ); CREATE TEMPORARY TABLE blackhole_sink( a INT, b STRING ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT T.a, H.b FROM datagen_source AS T JOIN mysql_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H ON T.a = H.a;Tabel sink
CREATE TEMPORARY TABLE datagen_source ( `name` VARCHAR, `age` INT ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE mysql_sink ( `name` VARCHAR, `age` INT ) WITH ( 'connector' = 'mysql', 'hostname' = '<yourHostname>', 'port' = '3306', 'username' = '<yourUsername>', 'password' = '<yourPassword>', 'database-name' = '<yourDatabaseName>', 'table-name' = '<yourTableName>' ); INSERT INTO mysql_sink SELECT * FROM datagen_source;Sumber data ingestion
source: type: mysql name: MySQL Source hostname: ${mysql.hostname} port: ${mysql.port} username: ${mysql.username} password: ${mysql.password} tables: ${mysql.source.table} server-id: 7601-7604 sink: type: values name: Values Sink print.enabled: true sink.print.logger: true
Tentang tabel sumber MySQL CDC
Cara kerja
Tabel sumber MySQL CDC memindai tabel lengkap saat startup dan membaginya menjadi beberapa chunk berdasarkan primary key. Sistem mencatat offset binlog saat ini lalu menggunakan algoritma snapshot inkremental untuk membaca data setiap chunk menggunakan pernyataan SELECT. Pekerjaan melakukan checkpoint berkala untuk mencatat chunk yang telah selesai. Jika terjadi failover, pekerjaan melanjutkan membaca hanya chunk yang belum selesai. Setelah semua chunk terbaca, pekerjaan membaca record perubahan inkremental dari offset binlog yang sebelumnya dicatat. Pekerjaan Flink terus melakukan checkpoint berkala untuk mencatat offset binlog. Jika terjadi failover, pekerjaan melanjutkan pemrosesan dari offset binlog terakhir yang dicatat. Proses ini mencapai semantik tepat-sekali (exactly-once semantics).
Untuk detail lebih lanjut tentang algoritma snapshot inkremental, lihat Konektor MySQL CDC.
Metadata
Metadata sangat berguna dalam skenario penggabungan database dan tabel yang di-shard. Setelah digabung, bisnis sering kali masih perlu mengidentifikasi database dan tabel sumber untuk setiap baris data. Kolom metadata memungkinkan Anda mengakses informasi ini. Oleh karena itu, Anda dapat dengan mudah menggabungkan beberapa tabel yang di-shard menjadi satu tabel tujuan menggunakan kolom metadata.
Sumber MySQL CDC mendukung sintaks kolom metadata. Anda dapat mengakses metadata berikut melalui kolom metadata.
Kunci metadata
Tipe metadata
Deskripsi
database_name
STRING NOT NULL
Nama database yang berisi baris tersebut.
table_name
STRING NOT NULL
Nama tabel yang berisi baris tersebut.
op_ts
TIMESTAMP_LTZ(3) NOT NULL
Waktu saat baris tersebut diubah di database. Jika record berasal dari data historis yang sudah ada di tabel, bukan dari binlog, nilai ini selalu 0.
op_type
STRING NOT NULL
Jenis perubahan baris tersebut.
+I: Pesan INSERT
-D: Pesan DELETE
-U: Pesan UPDATE_BEFORE
+U: Pesan UPDATE_AFTER
CatatanHanya didukung di Ververica Runtime (VVR) 8.0.7 dan versi selanjutnya.
query_log
STRING NOT NULL
Baca record log kueri MySQL yang sesuai dengan baris ini.
CatatanMySQL harus memiliki parameter binlog_rows_query_log_events diaktifkan untuk mencatat log kueri.
Contoh berikut menunjukkan cara menggabungkan beberapa tabel orders dari berbagai database yang di-shard dalam instans MySQL dan menyinkronkannya ke tabel holo_orders di Hologres.
CREATE TEMPORARY TABLE mysql_orders ( db_name STRING METADATA FROM 'database_name' VIRTUAL, -- Baca nama database. table_name STRING METADATA FROM 'table_name' VIRTUAL, -- Baca nama tabel. operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL, -- Baca timestamp perubahan. op_type STRING METADATA FROM 'op_type' VIRTUAL, -- Baca jenis perubahan. order_id INT, order_date TIMESTAMP(0), customer_name STRING, price DECIMAL(10, 5), product_id INT, order_status BOOLEAN, PRIMARY KEY(order_id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'localhost', 'port' = '3306', 'username' = 'flinkuser', 'password' = 'flinkpw', 'database-name' = 'mydb_.*', -- Cocokkan beberapa database yang di-shard menggunakan ekspresi reguler. 'table-name' = 'orders_.*' -- Cocokkan beberapa tabel yang di-shard menggunakan ekspresi reguler. ); INSERT INTO holo_orders SELECT * FROM mysql_orders;Berdasarkan kode di atas, jika Anda mengatur scan.read-changelog-as-append-only.enabled ke true dalam klausa WITH, output bervariasi berdasarkan konfigurasi primary key tabel downstream:
Jika primary key tabel downstream adalah order_id, output hanya berisi perubahan terakhir untuk setiap primary key di tabel upstream. Misalnya, jika perubahan terakhir untuk primary key adalah operasi penghapusan, Anda akan melihat record di tabel downstream dengan primary key yang sama dan op_type -D.
Jika primary key tabel downstream adalah order_id, operation_ts, dan op_type, output berisi riwayat perubahan lengkap untuk setiap primary key di tabel upstream.
Dukungan ekspresi reguler
Tabel sumber MySQL CDC mendukung penggunaan ekspresi reguler dalam nama tabel atau nama database untuk mencocokkan beberapa tabel atau database. Contoh berikut menunjukkan cara menentukan beberapa tabel menggunakan ekspresi reguler.
CREATE TABLE products ( db_name STRING METADATA FROM 'database_name' VIRTUAL, table_name STRING METADATA FROM 'table_name' VIRTUAL, operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL, order_id INT, order_date TIMESTAMP(0), customer_name STRING, price DECIMAL(10, 5), product_id INT, order_status BOOLEAN, PRIMARY KEY(order_id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'localhost', 'port' = '3306', 'username' = 'root', 'password' = '123456', 'database-name' = '(^(test).*|^(tpc).*|txc|.*[p$]|t{2})', -- Cocokkan beberapa database menggunakan ekspresi reguler. 'table-name' = '(t[5-8]|tt)' -- Cocokkan beberapa tabel menggunakan ekspresi reguler. );Penjelasan ekspresi reguler dalam contoh di atas:
^(test).* adalah contoh pencocokan awalan. Ekspresi ini mencocokkan nama database yang dimulai dengan test, seperti test1 atau test2.
.*[p$] adalah contoh pencocokan akhiran. Ekspresi ini mencocokkan nama database yang diakhiri dengan p, seperti cdcp atau edcp.
txc adalah pencocokan eksak. Ini mencocokkan nama database spesifik txc.
Saat mencocokkan nama tabel yang memenuhi syarat penuh, MySQL CDC menggunakan nama database dan nama tabel untuk mengidentifikasi tabel secara unik. Sistem menggunakan pola database-name.table-name untuk pencocokan. Misalnya, pola (^(test).*|^(tpc).*|txc|.*[p$]|t{2}).(t[5-8]|tt) mencocokkan tabel seperti txc.tt dan test2.test5 dalam database.
PentingDalam konfigurasi pekerjaan SQL, opsi table-name dan database-name tidak mendukung penggunaan koma (,) untuk menentukan beberapa tabel atau database.
Untuk mencocokkan beberapa tabel atau menggunakan beberapa ekspresi reguler, Anda dapat memisahkannya dengan GARIS TEGAK (|) dan membungkusnya dalam tanda kurung. Misalnya, untuk membaca tabel user dan product, Anda dapat mengonfigurasi table-name sebagai
(user|product).Jika ekspresi reguler berisi koma, Anda harus menulis ulang menggunakan operator GARIS TEGAK (|). Misalnya, ekspresi reguler
mytable_\d{1, 2}harus ditulis ulang menjadi yang setara(mytable_\d{1}|mytable_\d{2})untuk menghindari penggunaan koma.
Kontrol konkurensi
Konektor MySQL mendukung pembacaan data lengkap secara konkuren, yang meningkatkan efisiensi pemuatan data. Saat dikombinasikan dengan Autopilot di konsol Realtime Compute for Apache Flink, konektor secara otomatis mengurangi skala selama fase inkremental setelah pembacaan konkuren selesai. Hal ini menghemat sumber daya komputasi.
Di Konsol pengembangan Realtime Compute, Anda dapat mengatur paralelisme pekerjaan di halaman Konfigurasi Sumber Daya baik dalam mode Dasar maupun mode Ahli. Perbedaannya sebagai berikut:
Paralelisme yang diatur dalam mode Dasar adalah paralelisme global untuk seluruh pekerjaan.

Mode Ahli memungkinkan Anda mengatur paralelisme untuk VERTEX tertentu sesuai kebutuhan.

Untuk informasi selengkapnya tentang konfigurasi sumber daya, lihat Konfigurasi informasi penerapan pekerjaan.
PentingTerlepas dari apakah Anda menggunakan mode Dasar atau mode Ahli, rentang server-id yang dideklarasikan dalam tabel harus lebih besar dari atau sama dengan paralelisme pekerjaan. Misalnya, jika rentang server-id adalah 5404-5412, terdapat delapan ID server unik. Oleh karena itu, pekerjaan dapat memiliki maksimal delapan tugas paralel. Pekerjaan berbeda untuk instans MySQL yang sama harus memiliki rentang server-id yang tidak tumpang tindih. Setiap pekerjaan harus secara eksplisit mengonfigurasi server-id yang berbeda.
Autopilot Auto Scale-in
Fase data lengkap mengakumulasi banyak data historis. Untuk meningkatkan efisiensi pembacaan, data historis biasanya dibaca secara konkuren. Namun, dalam fase binlog inkremental, satu konkurensi biasanya sudah cukup karena volume data binlog kecil dan urutan global harus dipertahankan. Autopilot secara otomatis menyeimbangkan kinerja dan sumber daya untuk memenuhi persyaratan berbeda antara fase lengkap dan inkremental.
Autopilot memantau trafik untuk setiap tugas di Sumber CDC MySQL. Saat memasuki fase binlog, jika hanya satu tugas yang menangani pembacaan binlog sementara yang lain menganggur, Autopilot secara otomatis mengurangi jumlah CU dan paralelisme Sumber. Untuk mengaktifkan Autopilot, atur mode Autopilot ke Aktif di halaman Operasi dan Pemeliharaan pekerjaan.
CatatanInterval pemicu minimum default untuk penskalaan turun paralelisme adalah 24 jam. Untuk informasi selengkapnya tentang parameter dan detail Autopilot, lihat Konfigurasi Autopilot.
Startup Mode
Anda dapat menggunakan opsi scan.startup.mode untuk menentukan mode startup untuk tabel sumber MySQL CDC. Nilai yang valid dijelaskan sebagai berikut:
initial (default): Melakukan pembacaan lengkap tabel database saat startup pertama kali, lalu beralih ke mode inkremental untuk membaca binlog.
earliest-offset: Melewati fase snapshot dan mulai membaca dari offset binlog paling awal yang tersedia.
latest-offset: Melewati fase snapshot dan mulai membaca dari akhir binlog. Dalam mode ini, tabel sumber hanya membaca perubahan yang terjadi setelah pekerjaan dimulai.
specific-offset: Melewati fase snapshot dan mulai membaca dari offset binlog tertentu. Anda dapat menentukan offset berdasarkan nama file binlog dan posisi atau berdasarkan set GTID.
timestamp: Melewati fase snapshot dan mulai membaca event binlog dari timestamp tertentu.
Contoh:
CREATE TABLE mysql_source (...) WITH ( 'connector' = 'mysql-cdc', 'scan.startup.mode' = 'earliest-offset', -- Mulai dari offset paling awal. 'scan.startup.mode' = 'latest-offset', -- Mulai dari offset terbaru. 'scan.startup.mode' = 'specific-offset', -- Mulai dari offset tertentu. 'scan.startup.mode' = 'timestamp', -- Mulai dari timestamp tertentu. 'scan.startup.specific-offset.file' = 'mysql-bin.000003', -- Tentukan nama file binlog untuk mode specific-offset. 'scan.startup.specific-offset.pos' = '4', -- Tentukan posisi binlog untuk mode specific-offset. 'scan.startup.specific-offset.gtid-set' = '24DA167-0C0C-11E8-8442-00059A3C7B00:1-19', -- Tentukan set GTID untuk mode specific-offset. 'scan.startup.timestamp-millis' = '1667232000000' -- Tentukan timestamp startup untuk mode timestamp. ... )PentingSumber MySQL mencatat posisi saat ini pada level INFO selama checkpoint. Awalan log adalah
Binlog offset on checkpoint {checkpoint-id}. Log ini membantu Anda memulai ulang pekerjaan dari posisi checkpoint tertentu.Jika skema tabel yang sedang dibaca telah berubah di masa lalu, memulai dari earliest-offset, specific-offset, atau timestamp dapat menyebabkan error. Hal ini karena pembaca Debezium secara internal menyimpan skema terbaru, dan data lama dengan skema yang tidak cocok tidak dapat diurai dengan benar.
Tabel sumber CDC tanpa kunci
Untuk menggunakan tabel tanpa kunci, Anda harus mengatur scan.incremental.snapshot.chunk.key-column dan memilih hanya bidang non-null.
Semantik pemrosesan tabel sumber CDC tanpa kunci bergantung pada perilaku kolom yang ditentukan dalam scan.incremental.snapshot.chunk.key-column:
Jika kolom yang ditentukan tidak pernah diperbarui, semantik tepat-sekali dijamin.
Jika kolom yang ditentukan diperbarui, hanya semantik at-least-once yang dijamin. Namun, Anda dapat memastikan kebenaran data dengan menggabungkannya dengan sistem downstream, menentukan primary key downstream, dan menggunakan operasi idempoten.
Baca log cadangan dari RDS MySQL
Tabel sumber MySQL CDC mendukung pembacaan log cadangan dari Alibaba Cloud RDS MySQL. Fitur ini sangat berguna ketika fase snapshot lengkap memakan waktu lama. Dalam kasus ini, file binlog lokal mungkin secara otomatis dibersihkan, sedangkan file cadangan yang diunggah secara manual atau otomatis masih ada.
Contoh:
CREATE TABLE mysql_source (...) WITH ( 'connector' = 'mysql-cdc', 'rds.region-id' = 'cn-beijing', 'rds.access-key-id' = 'xxxxxxxxx', 'rds.access-key-secret' = 'xxxxxxxxx', 'rds.db-instance-id' = 'rm-xxxxxxxxxxxxxxxxx', 'rds.main-db-id' = '12345678', 'rds.download.timeout' = '60s' ... )Aktifkan penggunaan ulang sumber CDC
Satu pekerjaan dengan beberapa tabel sumber MySQL CDC meluncurkan beberapa klien binlog. Jika semua tabel sumber membaca dari instans MySQL yang sama, praktik ini meningkatkan tekanan pada database. Untuk informasi selengkapnya, lihat FAQ MySQL CDC.
Solusi
VVR 8.0.7 dan versi selanjutnya mendukung penggunaan ulang sumber MySQL CDC. Penggunaan ulang menggabungkan tabel sumber MySQL CDC yang kompatibel. Penggabungan terjadi ketika tabel sumber memiliki konfigurasi identik kecuali nama database, nama tabel, dan
server-id. Mesin secara otomatis menggabungkan sumber MySQL CDC dalam pekerjaan yang sama.Prosedur
Anda dapat menggunakan perintah
SETdalam pekerjaan SQL:SET 'table.optimizer.source-merge.enabled' = 'true'; # (Versi VVR 8.0.8 dan 8.0.9) Atur juga ini: SET 'sql-gateway.exec-plan.enabled' = 'false';VVR 11.1 dan versi selanjutnya mengaktifkan penggunaan ulang secara default.
Jalankan pekerjaan tanpa status. Memodifikasi konfigurasi penggunaan ulang mengubah topologi pekerjaan. Anda harus menjalankan pekerjaan tanpa status. Jika tidak, pekerjaan mungkin gagal dijalankan atau kehilangan data. Jika sumber digabung, Anda dapat melihat node
MergetableSourceScandalam topologi pekerjaan.
PentingSetelah Anda mengaktifkan penggunaan ulang, jangan atur
pipeline.operator-chainingkefalse. Menonaktifkan operator chaining menambahkan overhead serialisasi dan deserialisasi. Semakin banyak sumber yang digabung, semakin besar overhead-nya.Di VVR 8.0.7, menonaktifkan operator chaining menyebabkan masalah serialisasi.
Percepat pembacaan binlog
Saat Anda menggunakan konektor MySQL sebagai tabel sumber atau sumber data ingestion, sistem mengurai file binlog untuk menghasilkan berbagai pesan perubahan selama fase inkremental. File binlog mencatat semua perubahan tabel dalam format biner. Anda dapat mempercepat penguraian file binlog dengan cara berikut:
Aktifkan konfigurasi filter penguraian
Gunakan opsi
scan.only.deserialize.captured.tables.changelog.enableduntuk mengurai event perubahan hanya untuk tabel yang ditentukan.
Optimalkan opsi Debezium
debezium.max.queue.size: 162580 debezium.max.batch.size: 40960 debezium.poll.interval.ms: 50debezium.max.queue.size: Jumlah maksimum record yang dapat ditampung oleh antrian pemblokiran. Saat Debezium membaca aliran event dari database, sistem menempatkan event dalam antrian pemblokiran sebelum menuliskannya ke downstream. Nilai default adalah 8192.debezium.max.batch.size: Jumlah maksimum event yang diproses per iterasi. Nilai default adalah 2048.debezium.poll.interval.ms: Jumlah milidetik yang ditunggu konektor sebelum meminta event perubahan baru. Nilai default adalah 1000 ms (1 detik).
Contoh:
CREATE TABLE mysql_source (...) WITH (
'connector' = 'mysql-cdc',
-- Konfigurasi Debezium
'debezium.max.queue.size' = '162580',
'debezium.max.batch.size' = '40960',
'debezium.poll.interval.ms' = '50',
-- Aktifkan filter penguraian
'scan.only.deserialize.captured.tables.changelog.enabled' = 'true', -- Urai event perubahan hanya untuk tabel yang ditentukan.
...
)source:
type: mysql
name: MySQL Source
hostname: ${mysql.hostname}
port: ${mysql.port}
username: ${mysql.username}
password: ${mysql.password}
tables: ${mysql.source.table}
server-id: 7601-7604
# Konfigurasi Debezium
debezium.max.queue.size: 162580
debezium.max.batch.size: 40960
debezium.poll.interval.ms: 50
# Aktifkan filter penguraian
scan.only.deserialize.captured.tables.changelog.enabled: trueVersi enterprise MySQL CDC mengonsumsi binlog dengan kecepatan 85 MB/detik, sekitar dua kali lipat kecepatan versi komunitas open-source. Jika laju pembuatan binlog melebihi 85 MB/detik (setara dengan satu file 512 MB setiap 6 detik), latensi pekerjaan Flink terus meningkat. Latensi secara bertahap berkurang setelah laju pembuatan binlog melambat. Saat file binlog berisi transaksi besar, latensi pemrosesan mungkin sementara meningkat lalu berkurang setelah log transaksi terbaca.
API DataStream MySQL CDC
Untuk membaca dan menulis data menggunakan API DataStream, Anda harus menggunakan konektor DataStream yang sesuai untuk terhubung ke Flink. Untuk petunjuk cara menyiapkan konektor DataStream, lihat Penggunaan konektor DataStream.
Contoh berikut menunjukkan cara membuat program API DataStream dan menggunakan MySqlSource, termasuk dependensi pom yang diperlukan.
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
public class MySqlSourceExample {
public static void main(String[] args) throws Exception {
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("yourHostname")
.port(yourPort)
.databaseList("yourDatabaseName") // Atur database yang ditangkap.
.tableList("yourDatabaseName.yourTableName") // Atur tabel yang ditangkap.
.username("yourUsername")
.password("yourPassword")
.deserializer(new JsonDebeziumDeserializationSchema()) // Mengonversi SourceRecord ke String JSON.
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Aktifkan checkpointing.
env.enableCheckpointing(3000);
env
.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
// Atur 4 tugas sumber paralel.
.setParallelism(4)
.print().setParallelism(1); // Gunakan paralelisme 1 untuk sink guna mempertahankan urutan pesan.
env.execute("Print MySQL Snapshot + Binlog");
}
}<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-mysql</artifactId>
<version>${vvr.version}</version>
</dependency>Saat membangun MySqlSource, Anda harus menentukan parameter berikut dalam kode Anda:
Parameter | Deskripsi |
hostname | Alamat IP atau hostname database MySQL. |
port | Nomor port layanan database MySQL. |
databaseList | Nama database MySQL. Catatan Nama database mendukung ekspresi reguler untuk membaca data dari beberapa database. Gunakan |
username | Username untuk layanan database MySQL. |
password | Password untuk layanan database MySQL. |
deserializer | Deserializer yang mengonversi objek SourceRecord ke tipe tertentu. Nilai yang valid:
|
Dependensi pom Anda harus menentukan parameter berikut:
${vvr.version} | Versi mesin Alibaba Cloud Realtime Compute for Apache Flink—misalnya, Catatan Gunakan nomor versi yang ditampilkan di Maven. Kami secara berkala merilis versi hotfix, dan pembaruan ini mungkin tidak diumumkan melalui saluran lain. |
${flink.version} | Versi Apache Flink—misalnya, Penting Gunakan versi Apache Flink yang sesuai dengan versi mesin Realtime Compute for Apache Flink Anda untuk menghindari masalah kompatibilitas selama runtime pekerjaan. Untuk detail pemetaan versi, lihat Mesin. |
FAQ
Untuk informasi tentang masalah yang mungkin Anda temui saat menggunakan tabel sumber CDC, lihat Masalah CDC.