Topik ini menjelaskan cara menggunakan konektor MySQL.
Informasi latar belakang
Konektor MySQL mendukung semua database yang kompatibel dengan protokol MySQL, termasuk ApsaraDB RDS for MySQL, PolarDB for MySQL, OceanBase (mode MySQL), dan self-managed MySQL.
Saat Anda menggunakan konektor MySQL untuk membaca dari OceanBase, pastikan binary logging (binlog) untuk OceanBase diaktifkan dan dikonfigurasi dengan benar. Untuk informasi selengkapnya, lihat Operasi terkait. Fitur ini berada dalam pratinjau publik. Kami menyarankan agar Anda mengevaluasinya secara menyeluruh dan menggunakannya dengan hati-hati.
Konektor MySQL mendukung hal-hal berikut.
Kategori | Rincian |
Jenis yang didukung | Tabel sumber, tabel dimensi, tabel sink, dan sumber ingesti data |
Mode runtime | Hanya mode streaming |
Format data | Tidak berlaku |
Metrik pemantauan spesifik | |
Jenis API | DataStream, SQL, dan YAML ingesti data |
Apakah saya dapat memperbarui atau menghapus data di tabel sink? | Ya |
Fitur
Tabel sumber MySQL Change Data Capture (CDC) adalah tabel sumber streaming yang pertama-tama membaca seluruh data historis dari database, lalu secara mulus beralih ke pembacaan binlog untuk memastikan data tidak dibaca lebih dari sekali atau terlewat. Bahkan jika terjadi kegagalan, data diproses dengan semantik tepat-sekali. Tabel sumber CDC MySQL mendukung pembacaan konkuren data lengkap dan menggunakan algoritma snapshot inkremental untuk mencapai pembacaan tanpa penguncian serta transfer data yang dapat dilanjutkan. Untuk informasi selengkapnya, lihat Tentang tabel sumber CDC MySQL.
Pemrosesan batch dan stream terpadu: Membaca data lengkap dan inkremental, menghilangkan kebutuhan akan pipeline terpisah.
Pembacaan data lengkap secara konkuren: Meningkatkan kinerja secara horizontal.
Peralihan mulus dari pembacaan lengkap ke inkremental: Secara otomatis mengurangi penggunaan sumber daya komputasi.
Transfer data yang dapat dilanjutkan: Mendukung pemulihan transfer data selama fase pembacaan data lengkap guna meningkatkan stabilitas.
Pembacaan tanpa penguncian: Membaca data lengkap tanpa memengaruhi operasi bisnis online.
Mendukung pembacaan log cadangan dari ApsaraDB RDS for MySQL.
Penguraian file binlog secara paralel mengurangi latensi baca.
Prasyarat
Anda harus mengonfigurasi database MySQL seperti yang dijelaskan dalam Konfigurasi MySQL sebelum dapat menggunakan tabel sumber CDC MySQL.
ApsaraDB RDS for MySQL
Lakukan probe jaringan dengan Realtime Compute for Apache Flink untuk memastikan konektivitas jaringan.
Versi MySQL: 5.6, 5.7, dan 8.0.x.
Aktifkan binlog. Ini diaktifkan secara default.
Atur format binlog ke ROW. Ini adalah format default.
Atur binlog_row_image ke FULL. Ini adalah pengaturan default.
Nonaktifkan Binary Log Transaction Compression. Fitur ini diperkenalkan di MySQL 8.0.20 dan dinonaktifkan secara default.
Pengguna MySQL telah dibuat dengan izin SELECT, SHOW DATABASES, REPLICATION SLAVE, dan REPLICATION CLIENT.
Buat database dan tabel MySQL. Untuk informasi selengkapnya, lihat Buat database dan akun untuk ApsaraDB RDS for MySQL. Gunakan akun istimewa untuk membuat database MySQL guna mencegah kegagalan operasional akibat izin yang tidak mencukupi.
Konfigurasikan daftar putih alamat IP. Untuk informasi selengkapnya, lihat Konfigurasi daftar putih untuk ApsaraDB RDS for MySQL.
PolarDB for MySQL
Lakukan probe jaringan dengan Realtime Compute for Apache Flink untuk memastikan konektivitas jaringan.
Versi MySQL: 5.6, 5.7, dan 8.0.x.
Aktifkan binlog. Ini dinonaktifkan secara default.
Atur format binlog ke ROW. Ini adalah format default.
Atur binlog_row_image ke FULL. Ini adalah pengaturan default.
Nonaktifkan Binary Log Transaction Compression. Fitur ini diperkenalkan di MySQL 8.0.20 dan dinonaktifkan secara default.
Pengguna MySQL telah dibuat dengan izin SELECT, SHOW DATABASES, REPLICATION SLAVE, dan REPLICATION CLIENT.
Buat database dan tabel MySQL. Untuk informasi selengkapnya, lihat Buat database dan akun untuk PolarDB for MySQL. Gunakan akun istimewa untuk membuat database MySQL guna mencegah kegagalan operasional akibat izin yang tidak mencukupi.
Konfigurasikan daftar putih alamat IP. Untuk informasi selengkapnya, lihat Konfigurasi daftar putih untuk kluster PolarDB for MySQL.
Self-managed MySQL
Lakukan probe jaringan dengan Realtime Compute for Apache Flink untuk memastikan konektivitas jaringan.
Versi MySQL: 5.6, 5.7, dan 8.0.x.
Aktifkan binlog. Ini dinonaktifkan secara default.
Atur format binlog ke ROW. Format default adalah STATEMENT.
Atur binlog_row_image ke FULL. Ini adalah pengaturan default.
Nonaktifkan Binary Log Transaction Compression. Fitur ini diperkenalkan di MySQL 8.0.20 dan dinonaktifkan secara default.
Pengguna MySQL telah dibuat dengan izin SELECT, SHOW DATABASES, REPLICATION SLAVE, dan REPLICATION CLIENT.
Buat database dan tabel MySQL. Untuk informasi selengkapnya, lihat Buat database dan akun untuk instans MySQL yang dikelola sendiri. Gunakan akun istimewa untuk membuat database MySQL guna mencegah kegagalan operasional akibat izin yang tidak mencukupi.
Konfigurasikan daftar putih alamat IP. Untuk informasi selengkapnya, lihat Konfigurasi daftar putih untuk instans MySQL yang dikelola sendiri.
Batasan
Batasan umum
Tabel sumber CDC MySQL tidak mendukung pendefinisian watermark.
Pada pekerjaan CREATE TABLE AS SELECT (CTAS) dan CREATE DATABASE AS SELECT (CDAS), tabel sumber CDC MySQL mendukung sinkronisasi perubahan skema parsial. Untuk informasi tentang jenis perubahan yang didukung, lihat Kebijakan sinkronisasi evolusi skema.
Konektor CDC MySQL tidak mendukung Binary Log Transaction Compression. Oleh karena itu, saat menggunakan konektor CDC MySQL untuk mengonsumsi data inkremental, pastikan fitur ini dinonaktifkan. Jika tidak, pengambilan data inkremental mungkin gagal.
Batasan untuk ApsaraDB RDS for MySQL
Untuk ApsaraDB RDS for MySQL, kami tidak merekomendasikan membaca data dari database secondary atau replika read-only. Periode retensi binlog untuk instans ini singkat secara default. Jika binlog kedaluwarsa dan dihapus, pekerjaan mungkin gagal mengonsumsi data binlog, menyebabkan error.
Secara default, ApsaraDB RDS for MySQL mengaktifkan replikasi paralel antara database primary dan secondary serta tidak menjamin konsistensi urutan transaksi. Hal ini dapat menyebabkan kehilangan data selama alih bencana primary-secondary dan pemulihan checkpoint. Anda dapat mengaktifkan opsi slave_preserve_commit_order secara manual di ApsaraDB RDS for MySQL untuk mengatasi masalah ini.
Batasan untuk PolarDB for MySQL
Tabel sumber CDC MySQL tidak mendukung pembacaan dari Kluster Multi-master PolarDB for MySQL versi 1.0.19 atau lebih lama. Untuk informasi selengkapnya, lihat Apa itu Kluster Multi-master?. Binlog yang dihasilkan oleh kluster ini mungkin berisi ID tabel duplikat, yang dapat menyebabkan kesalahan pemetaan skema pada tabel sumber CDC, sehingga mengakibatkan kesalahan saat parsing data binlog.
Batasan untuk MySQL open source
Secara default, MySQL mempertahankan urutan transaksi selama replikasi Binlog primary-replika. Jika replika MySQL mengaktifkan replikasi paralel (slave_parallel_workers > 1) tetapi tidak memiliki slave_preserve_commit_order = ON, urutan commit transaksinya mungkin tidak konsisten dengan database primary. Saat Flink CDC pulih dari checkpoint, data mungkin terlewat akibat ketidakkonsistenan urutan ini. Kami menyarankan agar Anda mengatur slave_preserve_commit_order = ON pada replika MySQL, atau mengatur slave_parallel_workers = 1. Opsi terakhir ini mungkin mengorbankan performa replikasi.
Catatan
Tabel sink
Jangan deklarasikan primary key auto-increment dalam pernyataan DDL. MySQL akan mengisi bidang ini secara otomatis saat menulis data.
Deklarasikan setidaknya satu bidang non-primary key. Jika tidak, error akan dilaporkan.
NOT ENFORCED dalam pernyataan DDL menunjukkan bahwa Flink tidak melakukan pemeriksaan validitas pada primary key. Anda harus memastikan kebenaran dan integritas primary key. Untuk informasi selengkapnya, lihat Pemeriksaan Validitas.
Tabel dimensi
Untuk menggunakan indeks guna mempercepat kueri, urutan bidang dalam klausa JOIN harus sesuai dengan urutan definisi indeks. Ini dikenal sebagai aturan prefiks paling kiri (leftmost prefix rule). Misalnya, jika indeks berada pada (a, b, c), kondisi JOIN harus berupa
ON t.a = x AND t.b = y.SQL yang dihasilkan Flink mungkin ditulis ulang oleh pengoptimal, yang dapat mencegah penggunaan indeks selama kueri database aktual. Untuk memverifikasi apakah indeks digunakan, periksa rencana eksekusi (EXPLAIN) atau log kueri lambat di MySQL untuk melihat pernyataan SELECT aktual yang dieksekusi.
SQL
Anda dapat menggunakan konektor MySQL dalam pekerjaan SQL sebagai tabel sumber, tabel dimensi, atau tabel sink.
Sintaks
CREATE TEMPORARY TABLE mysqlcdc_source (
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
'connector' = 'mysql',
'hostname' = '<yourHostname>',
'port' = '3306',
'username' = '<yourUsername>',
'password' = '<yourPassword>',
'database-name' = '<yourDatabaseName>',
'table-name' = '<yourTableName>'
);Cara konektor menulis ke tabel sink: Untuk setiap catatan yang diterima, konektor membuat dan mengeksekusi satu pernyataan SQL. Pernyataan SQL spesifik tergantung pada skema tabel:
Untuk tabel sink tanpa primary key, konektor mengeksekusi pernyataan
INSERT INTO table_name (column1, column2, ...) VALUES (value1, value2, ...);.Untuk tabel sink dengan primary key, konektor mengeksekusi pernyataan
INSERT INTO table_name (column1, column2, ...) VALUES (value1, value2, ...) ON DUPLICATE KEY UPDATE column1 = VALUES(column1), column2 = VALUES(column2), ...;. Catatan: Jika tabel fisik memiliki batasan indeks unik selain primary key, memasukkan dua catatan dengan primary key berbeda tetapi nilai indeks unik yang sama dapat menyebabkan konflik indeks unik di database downstream, mengakibatkan penimpaan data dan potensi kehilangan data.
Jika Anda mendefinisikan primary key auto-increment di database MySQL, jangan deklarasikan kolom auto-increment dalam pernyataan DDL Flink. Database akan mengisi bidang ini secara otomatis selama penyisipan data. Konektor hanya mendukung penulisan dan penghapusan data dengan kolom auto-increment dan tidak mendukung pembaruan.
Parameter WITH
Umum
Parameter
Deskripsi
Wajib
Tipe data
Nilai default
Keterangan
connector
Jenis tabel.
Ya
STRING
None
Saat digunakan sebagai tabel sumber, atur opsi ini ke
mysql-cdcataumysql. Keduanya setara. Saat digunakan sebagai tabel dimensi atau sink, atur opsi ini kemysql.hostname
Alamat IP atau hostname database MySQL.
Ya
STRING
None
Kami menyarankan memasukkan alamat VPC.
CatatanJika database MySQL dan Realtime Compute for Apache Flink tidak berada dalam VPC yang sama, Anda harus membuat koneksi jaringan cross-VPC atau menggunakan jaringan publik untuk akses. Untuk informasi selengkapnya, lihat Kelola dan operasikan ruang kerja dan Bagaimana kluster Flink yang sepenuhnya dikelola mengakses jaringan publik?.
username
Username untuk layanan database MySQL.
Ya
STRING
None
None.
password
Password untuk layanan database MySQL.
Ya
STRING
None
None.
database-name
Nama database MySQL.
Ya
STRING
None
Saat Anda menggunakan database sebagai tabel sumber, Anda dapat menggunakan ekspresi reguler untuk nama database guna membaca data dari beberapa database.
Saat menggunakan ekspresi reguler, hindari penggunaan simbol ^ dan $ untuk mencocokkan awal dan akhir string. Untuk informasi selengkapnya, lihat kolom Keterangan untuk opsi table-name.
table-name
Nama tabel MySQL.
Ya
STRING
None
Anda dapat menggunakan ekspresi reguler untuk nama tabel sumber guna membaca data dari beberapa tabel.
Saat Anda membaca data dari beberapa tabel MySQL, kirimkan beberapa pernyataan CTAS sebagai satu Pekerjaan. Ini menghindari pengaktifan beberapa Pendengar log biner dan meningkatkan kinerja dan efisiensi. Untuk informasi selengkapnya, lihat Beberapa pernyataan CTAS: Kirim sebagai satu Pekerjaan.
Saat menggunakan ekspresi reguler, hindari penggunaan simbol ^ dan $ untuk mencocokkan awal dan akhir string. Untuk informasi selengkapnya, lihat catatan berikut.
CatatanSaat tabel sumber CDC MySQL mencocokkan nama tabel, sistem menggabungkan database-name dan table-name yang Anda tentukan menjadi ekspresi reguler jalur lengkap menggunakan string \\. (karakter . digunakan dalam versi VVR sebelum 8.0.1). Sistem kemudian menggunakan ekspresi reguler ini untuk mencocokkan nama tabel lengkap di database MySQL.
Misalnya, jika Anda mengatur 'database-name'='db_.*' dan 'table-name'='tb_.+', konektor menggunakan ekspresi reguler db_.*\\.tb_.+ (db_.*.tb_.+ dalam versi sebelum 8.0.1) untuk mencocokkan nama tabel lengkap guna menentukan tabel mana yang akan dibaca.
port
Nomor port layanan database MySQL.
Tidak
INTEGER
3306
None.
Khusus sumber
Parameter
Deskripsi
Wajib
Tipe data
Nilai default
Keterangan
server-id
ID numerik untuk klien database.
Tidak
STRING
Nilai acak antara 5400 dan 6400 dihasilkan.
ID ini harus unik secara global dalam kluster MySQL. Kami merekomendasikan agar Anda menetapkan ID berbeda untuk setiap pekerjaan yang terhubung ke database yang sama.
Opsi ini juga mendukung rentang ID, seperti 5400-5408. Saat pembacaan inkremental diaktifkan, pembacaan konkuren didukung. Dalam hal ini, kami merekomendasikan agar Anda menetapkan rentang ID sehingga setiap pembaca konkuren menggunakan ID berbeda. Untuk informasi selengkapnya, lihat Penggunaan Server ID.
scan.incremental.snapshot.enabled
Menentukan apakah snapshot inkremental diaktifkan.
Tidak
BOOLEAN
true
Snapshot inkremental diaktifkan secara default. Snapshot inkremental adalah mekanisme baru untuk membaca snapshot data lengkap. Dibandingkan dengan metode pembacaan snapshot lama, snapshot inkremental menawarkan beberapa keunggulan:
Sumber dapat membaca data lengkap secara paralel.
Sumber mendukung checkpoint tingkat chunk saat membaca data lengkap.
Sumber tidak perlu mengambil kunci baca global (FLUSH TABLES WITH read lock) saat membaca data lengkap.
Jika Anda ingin sumber mendukung pembacaan konkuren, setiap pembaca konkuren memerlukan server ID unik. Oleh karena itu, server-id harus berupa rentang, seperti 5400-6400, dan rentang tersebut harus lebih besar dari atau sama dengan tingkat paralelisme.
CatatanItem konfigurasi ini dihapus di Ververica Runtime (VVR) 11.1 dan versi selanjutnya.
scan.incremental.snapshot.chunk.size
Ukuran setiap chunk dalam jumlah baris.
Tidak
INTEGER
8096
Saat pembacaan snapshot inkremental diaktifkan, tabel dibagi menjadi beberapa chunk untuk dibaca. Data chunk disimpan dalam memori sebelum sepenuhnya dibaca.
Semakin sedikit baris dalam chunk, semakin banyak jumlah chunk total dalam tabel. Meskipun ini mengurangi granularitas pemulihan kesalahan, hal ini dapat menyebabkan masalah kehabisan memori (OOM) dan mengurangi throughput keseluruhan. Oleh karena itu, Anda perlu mempertimbangkan trade-off dan menetapkan ukuran chunk yang wajar.
scan.snapshot.fetch.size
Jumlah maksimum catatan yang diambil sekaligus saat membaca data lengkap tabel.
Tidak
INTEGER
1024
None.
scan.startup.mode
Mode startup untuk konsumsi data.
Tidak
STRING
initial
Nilai yang valid:
initial (Default): Memindai data historis lengkap terlebih dahulu, lalu membaca data binlog terbaru saat startup pertama kali.
latest-offset: Tidak memindai data historis saat startup pertama kali. Mulai membaca dari akhir binlog, artinya hanya membaca perubahan terbaru setelah konektor dimulai.
earliest-offset: Tidak memindai data historis. Mulai membaca dari posisi binlog paling awal yang tersedia.
specific-offset: Tidak memindai data historis. Mulai dari offset binlog tertentu yang Anda tentukan. Anda dapat menentukan offset dengan mengonfigurasi scan.startup.specific-offset.file dan scan.startup.specific-offset.pos, atau hanya mengonfigurasi scan.startup.specific-offset.gtid-set untuk memulai dari set GTID tertentu.
timestamp: Tidak memindai data historis. Mulai membaca binlog dari timestamp tertentu. Timestamp ditentukan oleh scan.startup.timestamp-millis dalam milidetik.
PentingSaat Anda menggunakan mode startup earliest-offset, specific-offset, atau timestamp, pastikan skema tabel yang sesuai tidak berubah antara posisi konsumsi binlog yang ditentukan dan waktu startup pekerjaan untuk menghindari error akibat ketidakcocokan skema.
scan.startup.specific-offset.file
Nama file binlog dari offset awal saat menggunakan mode startup offset tertentu.
Tidak
STRING
None
Saat menggunakan konfigurasi ini, Anda harus mengatur scan.startup.mode ke specific-offset. Format nama file contohnya
mysql-bin.000003.scan.startup.specific-offset.pos
Offset dalam file binlog tertentu untuk memulai saat menggunakan mode startup offset tertentu.
Tidak
INTEGER
None
Saat menggunakan konfigurasi ini, Anda harus mengatur scan.startup.mode ke specific-offset.
scan.startup.specific-offset.gtid-set
Set GTID dari offset awal saat menggunakan mode startup offset tertentu.
Tidak
STRING
None
Saat menggunakan konfigurasi ini, Anda harus mengatur scan.startup.mode ke specific-offset. Format set GTID contohnya
24DA167-0C0C-11E8-8442-00059A3C7B00:1-19.scan.startup.timestamp-millis
Offset awal sebagai timestamp milidetik saat menggunakan mode startup timestamp.
Tidak
LONG
None
Saat menggunakan konfigurasi ini, Anda harus mengatur scan.startup.mode ke timestamp. Timestamp dalam milidetik.
PentingSaat Anda menentukan waktu, CDC MySQL mencoba membaca event awal setiap file binlog untuk menentukan timestamp-nya dan menemukan file binlog yang sesuai dengan waktu yang ditentukan. Pastikan file binlog untuk timestamp yang ditentukan belum dihapus dari database dan dapat dibaca.
server-time-zone
Zona waktu sesi yang digunakan oleh database.
Tidak
STRING
Jika Anda tidak menentukan opsi ini, sistem menggunakan zona waktu lingkungan runtime pekerjaan Flink sebagai zona waktu server database. Ini adalah zona waktu wilayah yang Anda pilih.
Contoh: Asia/Shanghai. Opsi ini mengontrol bagaimana tipe TIMESTAMP di MySQL dikonversi ke tipe STRING. Untuk informasi selengkapnya, lihat Debezium temporal values.
debezium.min.row.count.to.stream.results
Saat jumlah baris dalam tabel melebihi nilai ini, mode pembacaan batch digunakan.
Tidak
INTEGER
1000
Flink membaca data dari tabel sumber MySQL dengan cara berikut:
Pembacaan lengkap: Membaca seluruh data tabel langsung ke memori. Metode ini cepat tetapi mengonsumsi memori yang sesuai. Jika tabel sumber sangat besar, hal ini dapat menimbulkan risiko OOM.
Pembacaan batch: Membaca data dalam beberapa batch, dengan jumlah baris tertentu per batch, hingga semua data terbaca. Metode ini menghindari risiko OOM saat membaca tabel besar tetapi relatif lebih lambat.
connect.timeout
Waktu maksimum menunggu koneksi ke server database MySQL hingga timeout sebelum mencoba lagi.
Tidak
DURATION
30s
None.
connect.max-retries
Jumlah maksimum percobaan ulang setelah koneksi ke layanan database MySQL gagal.
Tidak
INTEGER
3
None.
connection.pool.size
Ukuran kolam koneksi database.
Tidak
INTEGER
20
Kolam koneksi database digunakan untuk menggunakan ulang koneksi, yang dapat mengurangi jumlah koneksi database.
jdbc.properties.*
Opsi koneksi kustom dalam URL JDBC.
Tidak
STRING
None
Anda dapat meneruskan opsi koneksi kustom. Misalnya, untuk tidak menggunakan protokol SSL, Anda dapat mengatur 'jdbc.properties.useSSL' = 'false'.
Untuk informasi selengkapnya tentang opsi koneksi yang didukung, lihat MySQL Configuration Properties.
debezium.*
Opsi kustom untuk Debezium guna membaca binlog.
Tidak
STRING
None
Anda dapat meneruskan opsi Debezium kustom. Misalnya, gunakan 'debezium.event.deserialization.failure.handling.mode'='ignore' untuk menentukan logika penanganan kesalahan penguraian.
heartbeat.interval
Interval sumber menggunakan event heartbeat untuk memajukan offset binlog.
Tidak
DURATION
30s
Event heartbeat digunakan untuk memajukan offset binlog di sumber, yang sangat berguna untuk tabel MySQL yang diperbarui secara perlahan. Untuk tabel tersebut, offset binlog tidak maju secara otomatis. Event heartbeat dapat mendorong offset binlog maju, mencegah masalah di mana offset binlog kedaluwarsa menyebabkan pekerjaan gagal dan memerlukan restart tanpa status.
scan.incremental.snapshot.chunk.key-column
Kolom yang digunakan untuk membagi chunk selama fase snapshot.
Lihat Keterangan.
STRING
None
Wajib untuk tabel tanpa primary key. Kolom yang dipilih harus bertipe non-null (NOT NULL).
Opsional untuk tabel dengan primary key. Anda hanya dapat memilih satu kolom dari primary key.
rds.region-id
ID wilayah instans ApsaraDB RDS for MySQL.
Wajib saat membaca log arsip dari OSS.
STRING
None
Untuk informasi selengkapnya tentang ID wilayah, lihat Wilayah dan zona.
rds.access-key-id
ID AccessKey akun untuk instans ApsaraDB RDS for MySQL.
Wajib saat membaca log arsip dari OSS.
STRING
None
Untuk informasi selengkapnya, lihat Bagaimana cara melihat ID AccessKey dan Rahasia AccessKey?.
PentingUntuk mencegah kebocoran informasi AccessKey Anda, kami merekomendasikan agar Anda mengelola ID AccessKey menggunakan manajemen rahasia. Untuk informasi selengkapnya, lihat Kelola variabel.
rds.access-key-secret
Rahasia AccessKey akun untuk instans ApsaraDB RDS for MySQL.
Wajib saat membaca log arsip dari OSS.
STRING
None
Untuk informasi selengkapnya, lihat Bagaimana cara melihat ID AccessKey dan Rahasia AccessKey?
PentingUntuk mencegah kebocoran informasi AccessKey Anda, kami merekomendasikan agar Anda mengelola Rahasia AccessKey menggunakan manajemen rahasia. Untuk informasi selengkapnya, lihat Kelola variabel.
rds.db-instance-id
ID instans instans ApsaraDB RDS for MySQL.
Wajib saat membaca log arsip dari OSS.
STRING
None
None.
rds.main-db-id
ID database utama instans ApsaraDB RDS for MySQL.
Tidak
STRING
None
Untuk informasi selengkapnya tentang cara mendapatkan ID database utama, lihat Backup log untuk ApsaraDB RDS for MySQL.
Didukung hanya di VVR 8.0.7 dan versi selanjutnya.
rds.download.timeout
Waktu tunggu untuk mengunduh satu log arsip dari OSS.
Tidak
DURATION
60s
None.
rds.endpoint
Titik akhir layanan untuk mendapatkan informasi binlog OSS.
Tidak
STRING
None
Untuk informasi selengkapnya tentang nilai yang valid, lihat Titik akhir.
Didukung hanya di VVR 8.0.8 dan versi selanjutnya.
scan.incremental.close-idle-reader.enabled
Menentukan apakah pembaca idle ditutup setelah fase snapshot berakhir.
Tidak
BOOLEAN
false
Didukung hanya di VVR 8.0.1 dan versi selanjutnya.
Agar konfigurasi ini berlaku, Anda harus mengatur execution.checkpointing.checkpoints-after-tasks-finish.enabled ke true.
scan.read-changelog-as-append-only.enabled
Menentukan apakah aliran changelog diubah menjadi aliran append-only.
Tidak
BOOLEAN
false
Nilai yang valid:
true: Semua jenis pesan, termasuk INSERT, DELETE, UPDATE_BEFORE, dan UPDATE_AFTER, diubah menjadi pesan INSERT. Aktifkan ini hanya dalam skenario khusus, seperti saat Anda perlu menyimpan pesan hapus dari tabel upstream.
false (Default): Semua jenis pesan dikirim downstream apa adanya.
CatatanDidukung hanya di VVR 8.0.8 dan versi selanjutnya.
scan.only.deserialize.captured.tables.changelog.enabled
Selama fase inkremental, menentukan apakah hanya mendeserialisasi event perubahan untuk tabel yang ditentukan.
Tidak
BOOLEAN
Nilai default adalah false di versi VVR 8.x.
Nilai default adalah true di VVR 11.1 dan versi selanjutnya.
Nilai yang valid:
true: Mendeserialisasi data perubahan hanya untuk tabel target guna mempercepat pembacaan binlog.
false (Default): Mendeserialisasi data perubahan untuk semua tabel.
CatatanDidukung hanya di VVR 8.0.7 dan versi selanjutnya.
Saat digunakan di VVR 8.0.8 dan versi sebelumnya, nama opsi harus diubah menjadi debezium.scan.only.deserialize.captured.tables.changelog.enable.
scan.parse.online.schema.changes.enabled
Selama fase inkremental, menentukan apakah mencoba mengurai event DDL perubahan tanpa penguncian RDS.
Tidak
BOOLEAN
false
Nilai yang valid:
true: Mengurai event DDL perubahan tanpa penguncian RDS.
false (Default): Tidak mengurai event DDL perubahan tanpa penguncian RDS.
Ini adalah fitur eksperimental. Kami merekomendasikan agar Anda mengambil snapshot pekerjaan Flink untuk pemulihan sebelum melakukan perubahan online tanpa penguncian.
CatatanDidukung hanya di VVR 11.1 dan versi selanjutnya.
scan.incremental.snapshot.backfill.skip
Menentukan apakah melewati backfill selama fase pembacaan snapshot.
Tidak
BOOLEAN
false
Nilai yang valid:
true: Melewati backfill selama fase pembacaan snapshot.
false (Default): Tidak melewati backfill selama fase pembacaan snapshot.
Jika Anda melewati backfill, perubahan pada tabel selama fase snapshot dibaca pada fase inkremental berikutnya, bukan digabungkan ke dalam snapshot.
PentingMelewati backfill dapat menyebabkan ketidakkonsistenan data karena perubahan yang terjadi selama fase snapshot mungkin diputar ulang. Hanya semantik at-least-once yang dijamin.
CatatanDidukung hanya di VVR 11.1 dan versi selanjutnya.
scan.incremental.snapshot.unbounded-chunk-first.enabled
Menentukan apakah mendistribusikan chunk tak terbatas terlebih dahulu selama fase pembacaan snapshot.
Tidak
BOOLEAN
false
Nilai yang valid:
true: Mendistribusikan chunk tak terbatas terlebih dahulu selama fase pembacaan snapshot.
false (Default): Tidak mendistribusikan chunk tak terbatas terlebih dahulu selama fase pembacaan snapshot.
Ini adalah fitur eksperimental. Mengaktifkannya dapat mengurangi risiko error kehabisan memori (OOM) saat Pengelola Tugas menyinkronkan chunk terakhir selama fase snapshot. Kami menyarankan agar Anda menambahkan ini sebelum startup pekerjaan pertama kali.
CatatanDidukung hanya di VVR 11.1 dan versi selanjutnya.
binlog.session.network.timeout
Timeout baca/tulis jaringan untuk koneksi binlog.
Tidak
DURATION
10m
Jika Anda mengatur opsi ini ke 0s, timeout default server MySQL digunakan.
CatatanDidukung hanya di VVR 11.5 dan versi selanjutnya.
scan.rate-limit.records-per-second
Jumlah maksimum catatan yang dapat dikirim sumber per detik.
Tidak
LONG
None
Opsi ini berlaku untuk skenario di mana Anda perlu membatasi pembacaan data. Batasan berlaku di kedua fase, lengkap dan inkremental.
Metrik
numRecordsOutPerSeconddari sumber mencerminkan jumlah catatan yang dikeluarkan aliran data per detik. Anda dapat menyesuaikan opsi ini berdasarkan metrik tersebut.Pada fase pembacaan data lengkap, Anda biasanya perlu mengurangi jumlah catatan data yang dibaca dalam setiap batch. Anda dapat mengurangi nilai opsi
scan.incremental.snapshot.chunk.size.CatatanDidukung hanya di VVR 11.5 dan versi selanjutnya.
Khusus tabel dimensi
Parameter
Deskripsi
Wajib
Tipe data
Nilai default
Keterangan
url
URL JDBC MySQL.
Tidak
STRING
None
Format URL adalah:
jdbc:mysql://<endpoint>:<port>/<database_name>.lookup.max-retries
Jumlah maksimum percobaan ulang setelah pembacaan data gagal.
Tidak
INTEGER
3
Didukung hanya di VVR 6.0.7 dan versi selanjutnya.
lookup.cache.strategy
Kebijakan cache.
Tidak
STRING
None
Mendukung tiga kebijakan cache: None, LRU, dan ALL. Untuk informasi selengkapnya tentang nilai-nilai tersebut, lihat Informasi latar belakang.
CatatanSaat Anda menggunakan kebijakan cache LRU, Anda juga harus mengonfigurasi opsi lookup.cache.max-rows.
lookup.cache.max-rows
Jumlah maksimum baris yang dicache.
Tidak
INTEGER
100000
Jika Anda memilih kebijakan cache LRU, Anda harus mengatur ukuran cache.
Jika Anda memilih kebijakan cache ALL, Anda tidak perlu mengatur ukuran cache.
lookup.cache.ttl
Waktu hidup (TTL) cache.
Tidak
DURATION
10 s
Konfigurasi lookup.cache.ttl tergantung pada lookup.cache.strategy:
Jika lookup.cache.strategy diatur ke None, Anda tidak perlu mengonfigurasi lookup.cache.ttl. Artinya cache tidak kedaluwarsa.
Jika lookup.cache.strategy diatur ke LRU, lookup.cache.ttl adalah TTL cache. Secara default, tidak kedaluwarsa.
Jika lookup.cache.strategy diatur ke ALL, lookup.cache.ttl adalah waktu reload cache. Secara default, tidak direload.
Gunakan format waktu, seperti 1min atau 10s.
lookup.max-join-rows
Jumlah maksimum hasil yang dikembalikan saat mengkueri tabel dimensi untuk setiap catatan di tabel utama.
Tidak
INTEGER
1024
None.
lookup.filter-push-down.enabled
Menentukan apakah mengaktifkan filter pushdown untuk tabel dimensi.
Tidak
BOOLEAN
false
Nilai yang valid:
true: Mengaktifkan filter pushdown untuk tabel dimensi. Saat memuat data dari tabel database MySQL, tabel dimensi memfilter data terlebih dahulu berdasarkan kondisi yang ditetapkan dalam pekerjaan SQL.
false (Default): Menonaktifkan filter pushdown untuk tabel dimensi. Saat memuat data dari tabel database MySQL, tabel dimensi memuat semua data.
CatatanDidukung hanya di VVR 8.0.7 dan versi selanjutnya.
PentingFilter pushdown untuk tabel dimensi hanya boleh diaktifkan saat tabel Flink digunakan sebagai tabel dimensi. Tabel sumber MySQL tidak mendukung pengaktifan filter pushdown. Jika tabel Flink digunakan sebagai tabel sumber dan tabel dimensi, dan filter pushdown diaktifkan untuk tabel dimensi, Anda harus secara eksplisit mengatur item konfigurasi ini ke false untuk tabel sumber menggunakan SQL Hints. Jika tidak, pekerjaan mungkin berjalan abnormal.
Khusus sink
Parameter
Deskripsi
Wajib
Tipe data
Nilai default
Keterangan
url
URL JDBC MySQL.
Tidak
STRING
None
Format URL adalah:
jdbc:mysql://<endpoint>:<port>/<database_name>.sink.max-retries
Jumlah maksimum percobaan ulang setelah penulisan data gagal.
Tidak
INTEGER
3
None.
sink.buffer-flush.batch-size
Jumlah catatan dalam satu batch penulisan.
Tidak
INTEGER
4096
None.
sink.buffer-flush.max-rows
Jumlah catatan data yang disimpan dalam memori.
Tidak
INTEGER
10000
Opsi ini hanya berlaku setelah primary key ditentukan.
sink.buffer-flush.interval
Interval waktu untuk mengosongkan buffer. Jika data dalam buffer tidak memenuhi kondisi output setelah waktu tunggu yang ditentukan, sistem secara otomatis mengeluarkan semua data dalam buffer.
Tidak
DURATION
1s
None.
sink.ignore-delete
Menentukan apakah mengabaikan operasi DELETE.
Tidak
BOOLEAN
false
Saat aliran yang dihasilkan Flink SQL berisi catatan delete atau update-before, jika beberapa tugas output memperbarui bidang berbeda dari tabel yang sama secara bersamaan, ketidakkonsistenan data dapat terjadi.
Misalnya, setelah catatan dihapus, tugas lain hanya memperbarui beberapa bidang. Bidang yang tidak diperbarui akan menjadi null atau nilai default-nya, menyebabkan kesalahan data.
Dengan mengatur sink.ignore-delete ke true, Anda dapat mengabaikan operasi DELETE dan UPDATE_BEFORE upstream untuk menghindari masalah tersebut.
CatatanUPDATE_BEFORE adalah bagian dari mekanisme retraksi Flink, digunakan untuk "menarik kembali" nilai lama dalam operasi pembaruan.
Saat ignoreDelete = true, semua catatan DELETE dan UPDATE_BEFORE dilewati. Hanya catatan INSERT dan UPDATE_AFTER yang diproses.
sink.ignore-null-when-update
Saat memperbarui data, menentukan apakah memperbarui bidang yang sesuai menjadi null atau melewati pembaruan untuk bidang tersebut jika bidang data masuk adalah null.
Tidak
BOOLEAN
false
Nilai yang valid:
true: Tidak memperbarui bidang. Opsi ini dapat diatur ke true hanya saat primary key ditetapkan untuk tabel Flink. Saat diatur ke true:
Di VVR 8.0.6 dan versi sebelumnya, eksekusi batch tidak didukung untuk menulis data ke tabel sink.
Di VVR 8.0.7 dan versi selanjutnya, eksekusi batch didukung untuk menulis data ke tabel sink.
Meskipun penulisan batch dapat meningkatkan efisiensi penulisan dan throughput keseluruhan secara signifikan, hal ini dapat menyebabkan latensi data dan risiko OOM. Oleh karena itu, pertimbangkan trade-off berdasarkan skenario bisnis aktual Anda.
false: Memperbarui bidang menjadi null.
CatatanOpsi ini didukung hanya di VVR 8.0.5 dan versi selanjutnya.
Pemetaan tipe
Tabel sumber CDC
Tipe bidang CDC MySQL
Tipe bidang Flink
TINYINT
TINYINT
SMALLINT
SMALLINT
TINYINT UNSIGNED
TINYINT UNSIGNED ZEROFILL
INT
INT
MEDIUMINT
SMALLINT UNSIGNED
SMALLINT UNSIGNED ZEROFILL
BIGINT
BIGINT
INT UNSIGNED
INT UNSIGNED ZEROFILL
MEDIUMINT UNSIGNED
MEDIUMINT UNSIGNED ZEROFILL
BIGINT UNSIGNED
DECIMAL(20, 0)
BIGINT UNSIGNED ZEROFILL
SERIAL
FLOAT [UNSIGNED] [ZEROFILL]
FLOAT
DOUBLE [UNSIGNED] [ZEROFILL]
DOUBLE
DOUBLE PRECISION [UNSIGNED] [ZEROFILL]
REAL [UNSIGNED] [ZEROFILL]
NUMERIC(p, s) [UNSIGNED] [ZEROFILL]
DECIMAL(p, s)
DECIMAL(p, s) [UNSIGNED] [ZEROFILL]
BOOLEAN
BOOLEAN
TINYINT(1)
DATE
DATE
TIME [(p)]
TIME [(p)] [WITHOUT TIME ZONE]
DATETIME [(p)]
TIMESTAMP [(p)] [WITHOUT TIME ZONE]
TIMESTAMP [(p)]
TIMESTAMP [(p)]
TIMESTAMP [(p)] WITH LOCAL TIME ZONE
CHAR(n)
STRING
VARCHAR(n)
TEXT
BINARY
BYTES
VARBINARY
BLOB
PentingKami menyarankan agar Anda tidak menggunakan tipe TINYINT(1) di MySQL untuk menyimpan nilai selain 0 dan 1. Saat property-version = 0, tabel sumber CDC MySQL memetakan TINYINT(1) ke tipe BOOLEAN Flink secara default, yang dapat menyebabkan ketidakakuratan data. Untuk menggunakan tipe TINYINT(1) guna menyimpan nilai selain 0 dan 1, lihat opsi konfigurasi catalog.table.treat-tinyint1-as-boolean.
Tabel dimensi dan tabel sink
Tipe bidang MySQL
Tipe bidang Flink
TINYINT
TINYINT
SMALLINT
SMALLINT
TINYINT UNSIGNED
INT
INT
MEDIUMINT
SMALLINT UNSIGNED
BIGINT
BIGINT
INT UNSIGNED
BIGINT UNSIGNED
DECIMAL(20, 0)
FLOAT
FLOAT
DOUBLE
DOUBLE
DOUBLE PRECISION
NUMERIC(p, s)
DECIMAL(p, s)
Catatanp harus kurang dari atau sama dengan 38.
DECIMAL(p, s)
BOOLEAN
BOOLEAN
TINYINT(1)
DATE
DATE
TIME [(p)]
TIME [(p)] [WITHOUT TIME ZONE]
DATETIME [(p)]
TIMESTAMP [(p)] [WITHOUT TIME ZONE]
TIMESTAMP [(p)]
CHAR(n)
CHAR(n)
VARCHAR(n)
VARCHAR(n)
BIT(n)
BINARY(⌈n/8⌉)
BINARY(n)
BINARY(n)
VARBINARY(N)
VARBINARY(N)
TINYTEXT
STRING
TEXT
MEDIUMTEXT
LONGTEXT
TINYBLOB
BYTES
PentingFlink hanya mendukung catatan tipe BLOB MySQL hingga 2.147.483.647 (2^31 - 1) byte.
BLOB
MEDIUMBLOB
LONGBLOB
Ingesti Data
Anda dapat menggunakan konektor MySQL sebagai sumber data dalam pekerjaan YAML ingesti data.
Sintaks
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: <username>
password: <password>
tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
server-id: 5401-5404
sink:
type: xxxItem konfigurasi
Parameter | Deskripsi | Wajib | Tipe data | Nilai default | Keterangan |
type | Jenis sumber data. | Ya | STRING | None | Bidang statis diatur ke mysql. |
name | Nama sumber data. | Tidak | STRING | None | None. |
hostname | Alamat IP atau hostname database MySQL. | Ya | STRING | None | Kami menyarankan memasukkan alamat Virtual Private Cloud (VPC). Catatan Jika database MySQL dan Realtime Compute for Apache Flink tidak berada dalam VPC yang sama, Anda harus membuat koneksi jaringan cross-VPC atau menggunakan jaringan publik untuk akses. Untuk informasi selengkapnya, lihat Kelola dan operasikan ruang kerja dan Bagaimana kluster Flink yang sepenuhnya dikelola mengakses jaringan publik?. |
username | Username untuk layanan database MySQL. | Ya | STRING | None | None. |
password | Password untuk layanan database MySQL. | Ya | STRING | None | None. |
tables | Tabel data MySQL yang akan disinkronkan. | Ya | STRING | None |
Catatan
|
tables.exclude | Tabel yang dikecualikan dari sinkronisasi. | Tidak | STRING | None |
Catatan Titik digunakan untuk memisahkan nama database dan nama tabel. Untuk menggunakan titik untuk mencocokkan karakter apa pun, Anda harus meng-escape titik dengan backslash. Contoh: db0.\.*, db1.user_table_[0-9]+, db[1-2].[app|web]order_\.*. |
port | Nomor port layanan database MySQL. | Tidak | INTEGER | 3306 | None. |
schema-change.enabled | Menentukan apakah mengirim event perubahan skema. | Tidak | BOOLEAN | true | None. |
server-id | ID numerik atau rentang ID untuk klien database yang digunakan untuk sinkronisasi. | Tidak | STRING | Nilai acak antara 5400 dan 6400 dihasilkan. | ID ini harus unik secara global dalam kluster MySQL. Kami merekomendasikan agar Anda menetapkan ID berbeda untuk setiap pekerjaan yang terhubung ke database yang sama. Opsi ini juga mendukung rentang ID, seperti 5400-5408. Saat pembacaan inkremental diaktifkan, pembacaan konkuren didukung. Dalam hal ini, kami merekomendasikan agar Anda menetapkan rentang ID sehingga setiap pembaca konkuren menggunakan ID berbeda. |
jdbc.properties.* | Opsi koneksi kustom dalam URL JDBC. | Tidak | STRING | None | Anda dapat meneruskan opsi koneksi kustom. Misalnya, untuk tidak menggunakan protokol SSL, Anda dapat mengatur 'jdbc.properties.useSSL' = 'false'. Untuk informasi selengkapnya tentang opsi koneksi yang didukung, lihat MySQL Configuration Properties. |
debezium.* | Opsi kustom untuk Debezium guna membaca binlog. | Tidak | STRING | None | Anda dapat meneruskan opsi Debezium kustom. Misalnya, gunakan 'debezium.event.deserialization.failure.handling.mode'='ignore' untuk menentukan logika penanganan kesalahan penguraian. |
scan.incremental.snapshot.chunk.size | Ukuran setiap chunk dalam jumlah baris. | Tidak | INTEGER | 8096 | Tabel MySQL dibagi menjadi beberapa chunk untuk dibaca. Data chunk disimpan dalam memori sebelum sepenuhnya dibaca. Meskipun menggunakan lebih sedikit baris per chunk memberikan granularitas lebih baik untuk pemulihan kesalahan, hal ini juga dapat menyebabkan error kehabisan memori (OOM) dan mengurangi throughput keseluruhan. Oleh karena itu, Anda harus menetapkan ukuran chunk yang wajar yang menyeimbangkan trade-off ini. |
scan.snapshot.fetch.size | Jumlah maksimum catatan yang diambil sekaligus saat membaca data lengkap tabel. | Tidak | INTEGER | 1024 | None. |
scan.startup.mode | Mode startup untuk konsumsi data. | Tidak | STRING | initial | Nilai yang valid:
Penting Untuk mode startup earliest-offset, specific-offset, dan timestamp, jika skema tabel saat startup berbeda dengan skema pada waktu offset awal yang ditentukan, pekerjaan akan gagal akibat ketidakcocokan skema. Dengan kata lain, saat Anda menggunakan ketiga mode startup ini, Anda harus memastikan bahwa skema tabel yang sesuai tidak berubah antara posisi konsumsi binlog yang ditentukan dan waktu startup pekerjaan. |
scan.startup.specific-offset.file | Nama file binlog dari offset awal saat menggunakan mode startup offset tertentu. | Tidak | STRING | None | Saat menggunakan konfigurasi ini, Anda harus mengatur scan.startup.mode ke specific-offset. Format nama file contohnya |
scan.startup.specific-offset.pos | Offset dalam file binlog tertentu untuk memulai saat menggunakan mode startup offset tertentu. | Tidak | INTEGER | None | Saat menggunakan konfigurasi ini, Anda harus mengatur scan.startup.mode ke specific-offset. |
scan.startup.specific-offset.gtid-set | Set GTID dari offset awal saat menggunakan mode startup offset tertentu. | Tidak | STRING | None | Saat menggunakan konfigurasi ini, Anda harus mengatur scan.startup.mode ke specific-offset. Format set GTID contohnya |
scan.startup.timestamp-millis | Offset awal sebagai timestamp milidetik saat menggunakan mode startup timestamp. | Tidak | LONG | None | Saat menggunakan konfigurasi ini, Anda harus mengatur scan.startup.mode ke timestamp. Timestamp dalam milidetik. Penting Saat Anda menentukan waktu, CDC MySQL mencoba membaca event awal setiap file binlog untuk menentukan timestamp-nya dan menemukan file binlog yang sesuai dengan waktu yang ditentukan. Pastikan file binlog untuk timestamp yang ditentukan belum dihapus dari database dan dapat dibaca. |
server-time-zone | Zona waktu sesi yang digunakan oleh database. | Tidak | STRING | Jika Anda tidak menentukan opsi ini, sistem menggunakan zona waktu lingkungan runtime pekerjaan Flink sebagai zona waktu server database. Ini adalah zona waktu wilayah yang Anda pilih. | Contoh: Asia/Shanghai. Opsi ini mengontrol bagaimana tipe TIMESTAMP di MySQL dikonversi ke tipe STRING. Untuk informasi selengkapnya, lihat Debezium temporal values. |
scan.startup.specific-offset.skip-events | Jumlah event binlog yang dilewati saat membaca dari offset tertentu. | Tidak | INTEGER | None | Saat menggunakan konfigurasi ini, scan.startup.mode harus diatur ke specific-offset. |
scan.startup.specific-offset.skip-rows | Jumlah perubahan baris yang dilewati saat membaca dari offset tertentu. Satu event binlog mungkin sesuai dengan beberapa perubahan baris. | Tidak | INTEGER | None | Saat menggunakan konfigurasi ini, scan.startup.mode harus diatur ke specific-offset. |
connect.timeout | Waktu maksimum menunggu koneksi ke server database MySQL hingga timeout sebelum mencoba lagi. | Tidak | DURATION | 30s | None. |
connect.max-retries | Jumlah maksimum percobaan ulang setelah koneksi ke layanan database MySQL gagal. | Tidak | INTEGER | 3 | None. |
connection.pool.size | Ukuran kolam koneksi database. | Tidak | INTEGER | 20 | Kolam koneksi database digunakan untuk menggunakan ulang koneksi, yang dapat mengurangi jumlah koneksi database. |
heartbeat.interval | Interval sumber menggunakan event heartbeat untuk memajukan offset binlog. | Tidak | DURATION | 30s | Event heartbeat digunakan untuk memajukan offset binlog di sumber, yang sangat berguna untuk tabel MySQL yang diperbarui secara perlahan. Untuk tabel tersebut, offset binlog tidak maju secara otomatis. Event heartbeat dapat mendorong offset binlog maju, mencegah masalah di mana offset binlog kedaluwarsa menyebabkan pekerjaan gagal dan memerlukan restart tanpa status. |
scan.incremental.snapshot.chunk.key-column | Kolom yang digunakan untuk membagi chunk selama fase snapshot. | Tidak. | STRING | None | Anda hanya dapat memilih satu kolom dari primary key. |
rds.region-id | ID wilayah instans ApsaraDB RDS for MySQL. | Wajib saat membaca log arsip dari OSS. | STRING | None | Untuk informasi selengkapnya tentang ID wilayah, lihat Wilayah dan zona. |
rds.access-key-id | ID AccessKey akun untuk instans ApsaraDB RDS for MySQL. | Wajib saat membaca log arsip dari OSS. | STRING | None | Untuk informasi selengkapnya, lihat Bagaimana cara melihat ID AccessKey dan Rahasia AccessKey? Penting Untuk mencegah kebocoran informasi AccessKey Anda, kami merekomendasikan agar Anda mengelola ID AccessKey menggunakan manajemen rahasia. Untuk informasi selengkapnya, lihat Kelola variabel. |
rds.access-key-secret | Rahasia AccessKey akun untuk instans ApsaraDB RDS for MySQL. | Wajib saat membaca log arsip dari OSS. | STRING | None | Untuk informasi selengkapnya, lihat Bagaimana cara melihat ID AccessKey dan Rahasia AccessKey? Penting Untuk mencegah kebocoran informasi AccessKey Anda, kami merekomendasikan agar Anda mengelola Rahasia AccessKey menggunakan manajemen rahasia. Untuk informasi selengkapnya, lihat Kelola variabel. |
rds.db-instance-id | ID instans dari instans ApsaraDB RDS untuk MySQL. | Wajib saat membaca log arsip dari OSS. | STRING | None | None. |
rds.main-db-id | ID database utama instans ApsaraDB RDS for MySQL. | Tidak | STRING | None | Untuk informasi selengkapnya tentang cara mendapatkan ID database utama, lihat Backup log untuk ApsaraDB RDS for MySQL. |
rds.download.timeout | Waktu tunggu untuk mengunduh satu log arsip dari OSS. | Tidak | DURATION | 60s | None. |
rds.endpoint | Titik akhir layanan untuk mendapatkan informasi binlog OSS. | Tidak | STRING | None | Untuk informasi selengkapnya tentang nilai yang valid, lihat Titik akhir. |
rds.binlog-directory-prefix | Awalan direktori untuk menyimpan file binlog. | Tidak | STRING | rds-binlog- | None. |
rds.use-intranet-link | Menentukan apakah menggunakan jaringan internal untuk mengunduh file binlog. | Tidak | BOOLEAN | true | None. |
rds.binlog-directories-parent-path | Jalur mutlak direktori induk tempat file binlog disimpan. | Tidak | STRING | None | None. |
chunk-meta.group.size | Ukuran metadata chunk. | Tidak | INTEGER | 1000 | Jika metadata lebih besar dari nilai ini, metadata dikirim dalam beberapa bagian. |
chunk-key.even-distribution.factor.lower-bound | Batas bawah faktor distribusi chunk untuk pemisahan merata. | Tidak | DOUBLE | 0.05 | Jika faktor distribusi kurang dari nilai parameter ini, chunk tidak didistribusikan secara merata. Faktor distribusi chunk = (MAX(chunk-key) - MIN(chunk-key) + 1) / jumlah total baris data. |
chunk-key.even-distribution.factor.upper-bound | Batas atas faktor distribusi chunk untuk pemisahan merata. | Tidak | DOUBLE | 1000.0 | Jika faktor distribusi lebih besar dari nilai ini, pemisahan tidak merata digunakan. Faktor distribusi chunk = (MAX(chunk-key) - MIN(chunk-key) + 1) / Jumlah total baris. |
scan.incremental.close-idle-reader.enabled | Menentukan apakah pembaca idle ditutup setelah fase snapshot berakhir. | Tidak | BOOLEAN | false | Agar konfigurasi ini berlaku, Anda harus mengatur |
scan.only.deserialize.captured.tables.changelog.enabled | Selama fase inkremental, menentukan apakah hanya mendeserialisasi event perubahan untuk tabel yang ditentukan. | Tidak | BOOLEAN |
| Nilai yang valid:
|
scan.parallel-deserialize-changelog.enabled | Selama fase inkremental, menentukan apakah menggunakan beberapa thread untuk mengurai event perubahan. | Tidak | BOOLEAN | false | Nilai yang valid:
Catatan Didukung hanya di VVR 8.0.11 dan versi selanjutnya. |
scan.parallel-deserialize-changelog.handler.size | Jumlah handler event saat menggunakan beberapa thread untuk mengurai event perubahan. | Tidak | INTEGER | 2 | Catatan Didukung hanya di VVR 8.0.11 dan versi selanjutnya. |
metadata-column.include-list | Kolom metadata yang diteruskan ke downstream. | Tidak | STRING | None | Metadata yang tersedia meliputi Catatan Konektor YAML CDC MySQL tidak perlu dan tidak mendukung penambahan kolom metadata Penting Kolom metadata Kolom metadata |
scan.newly-added-table.enabled | Saat memulai ulang dari checkpoint, menentukan apakah menyinkronkan tabel yang baru ditambahkan yang tidak cocok pada proses sebelumnya atau menghapus tabel yang saat ini tidak cocok yang disimpan dalam state. | Tidak | BOOLEAN | false | Ini berlaku saat memulai ulang dari checkpoint atau titik simpan. |
scan.binlog.newly-added-table.enabled | Selama fase inkremental, menentukan apakah mengirim data dari tabel yang baru ditambahkan yang cocok. | Tidak | BOOLEAN | false | Tidak dapat diaktifkan secara bersamaan dengan |
scan.incremental.snapshot.chunk.key-column | Menentukan kolom untuk tabel tertentu yang akan digunakan sebagai kunci pemisahan chunk selama fase snapshot. | Tidak | STRING | None |
|
scan.parse.online.schema.changes.enabled | Selama fase inkremental, menentukan apakah mencoba mengurai event DDL perubahan tanpa penguncian RDS. | Tidak | BOOLEAN | false | Nilai yang valid:
Ini adalah fitur eksperimental. Kami merekomendasikan agar Anda mengambil snapshot pekerjaan Flink untuk pemulihan sebelum melakukan perubahan online tanpa penguncian. Catatan Didukung hanya di VVR 11.0 dan versi selanjutnya. |
scan.incremental.snapshot.backfill.skip | Menentukan apakah melewati backfill selama fase pembacaan snapshot. | Tidak | BOOLEAN | false | Nilai yang valid:
Jika backfill dilewati, perubahan pada tabel selama fase snapshot dibaca pada fase inkremental berikutnya, bukan digabungkan ke dalam snapshot. Penting Melewati backfill dapat menyebabkan ketidakkonsistenan data karena perubahan yang terjadi selama fase snapshot mungkin diputar ulang, yang hanya menjamin semantik at-least-once. Catatan Fitur ini hanya tersedia untuk mesin komputasi Flink Ververica Runtime (VVR) 11.1 dan versi selanjutnya. |
treat-tinyint1-as-boolean.enabled | Menentukan apakah memperlakukan tipe TINYINT(1) sebagai tipe Boolean. | Tidak | BOOLEAN | true | Nilai yang valid:
|
treat-timestamp-as-datetime-enabled | Menentukan apakah memperlakukan tipe TIMESTAMP sebagai tipe DATETIME. | Tidak | BOOLEAN | false | Nilai yang valid:
Tipe TIMESTAMP MySQL menyimpan waktu UTC dan dipengaruhi oleh zona waktu. Tipe DATETIME MySQL menyimpan waktu literal dan tidak dipengaruhi oleh zona waktu. Saat diaktifkan, fitur ini mengubah data TIMESTAMP MySQL ke tipe DATETIME berdasarkan server-time-zone. |
include-comments.enabled | Menentukan apakah menyinkronkan komentar tabel dan komentar bidang. | Tidak | BOOLEAN | false | Nilai yang valid:
Mengaktifkan fitur ini meningkatkan penggunaan memori pekerjaan. |
scan.incremental.snapshot.unbounded-chunk-first.enabled | Menentukan apakah mendistribusikan shard tak terbatas terlebih dahulu selama fase pembacaan snapshot. | Tidak | BOOLEAN | false | Nilai yang valid:
Ini adalah fitur eksperimental. Mengaktifkannya dapat mengurangi ancaman Pengelola Tugas mengalami error kehabisan memori (OOM) saat menyinkronkan shard terakhir selama fase snapshot. Tambahkan parameter ini sebelum memulai pekerjaan untuk pertama kalinya. Catatan Fitur ini hanya tersedia untuk mesin komputasi Flink VVR 11.1 dan versi selanjutnya. |
binlog.session.network.timeout | Durasi timeout jaringan untuk koneksi pencatatan biner. | Tidak | DURATION | 10m | Jika parameter ini diatur ke 0 s, durasi timeout default sisi server MySQL digunakan. Catatan Fitur ini hanya tersedia untuk mesin komputasi Flink VVR 11.5 dan versi selanjutnya. |
scan.rate-limit.records-per-second | Jumlah maksimum catatan yang dapat dikirim sumber per detik. | Tidak | LONG | None | Parameter ini berlaku untuk skenario di mana pembacaan data harus dibatasi. Batasan ini berlaku baik pada fase lengkap maupun inkremental. Metrik Selama fase baca lengkap, juga kurangi jumlah catatan yang dibaca dalam setiap batch. Untuk melakukan ini, kurangi nilai parameter Catatan Fitur ini hanya tersedia untuk mesin komputasi Flink VVR 11.5 dan versi selanjutnya. |
Pemetaan tipe
Tabel berikut menunjukkan pemetaan tipe untuk ingesti data.
Tipe bidang CDC MySQL | Tipe bidang CDC |
TINYINT(n) | TINYINT |
SMALLINT | SMALLINT |
TINYINT UNSIGNED | |
TINYINT UNSIGNED ZEROFILL | |
YEAR | |
INT | INT |
MEDIUMINT | |
MEDIUMINT UNSIGNED | |
MEDIUMINT UNSIGNED ZEROFILL | |
SMALLINT UNSIGNED | |
SMALLINT UNSIGNED ZEROFILL | |
BIGINT | BIGINT |
INT UNSIGNED | |
INT UNSIGNED ZEROFILL | |
BIGINT UNSIGNED | DECIMAL(20, 0) |
BIGINT UNSIGNED ZEROFILL | |
SERIAL | |
FLOAT [UNSIGNED] [ZEROFILL] | FLOAT |
DOUBLE [UNSIGNED] [ZEROFILL] | DOUBLE |
DOUBLE PRECISION [UNSIGNED] [ZEROFILL] | |
REAL [UNSIGNED] [ZEROFILL] | |
NUMERIC(p, s) [UNSIGNED] [ZEROFILL] where p <= 38 | DECIMAL(p, s) |
DECIMAL(p, s) [UNSIGNED] [ZEROFILL] where p <= 38 | |
FIXED(p, s) [UNSIGNED] [ZEROFILL] where p <= 38 | |
BOOLEAN | BOOLEAN |
BIT(1) | |
TINYINT(1) | |
DATE | DATE |
TIME [(p)] | TIME [(p)] |
DATETIME [(p)] | TIMESTAMP [(p)] |
TIMESTAMP [(p)] | Bergantung pada nilai parameter
|
CHAR(n) | CHAR(n) |
VARCHAR(n) | VARCHAR(n) |
BIT(n) | BINARY(⌈(n + 7) / 8⌉) |
BINARY(n) | BINARY(n) |
VARBINARY(N) | VARBINARY(N) |
NUMERIC(p, s) [UNSIGNED] [ZEROFILL] where 38 < p <= 65 | STRING Catatan MySQL mendukung presisi desimal hingga 65. Flink membatasi presisi desimal hingga 38. Jika kolom desimal memiliki presisi lebih dari 38, petakan ke string untuk mencegah kehilangan presisi. |
DECIMAL(p, s) [UNSIGNED] [ZEROFILL] where 38 < p <= 65 | |
FIXED(p, s) [UNSIGNED] [ZEROFILL] where 38 < p <= 65 | |
TINYTEXT | STRING |
TEXT | |
MEDIUMTEXT | |
LONGTEXT | |
ENUM | |
JSON | STRING Catatan Tipe data JSON diubah menjadi string berformat JSON di Flink. |
GEOMETRY | STRING Catatan Tipe data spasial MySQL diubah menjadi string dengan format JSON tetap. Untuk informasi selengkapnya, lihat pemetaan tipe data spasial MySQL. |
POINT | |
LINESTRING | |
POLYGON | |
MULTIPOINT | |
MULTILINESTRING | |
MULTIPOLYGON | |
GEOMETRYCOLLECTION | |
TINYBLOB | BYTES Catatan Untuk tipe data BLOB MySQL, panjang maksimum yang didukung adalah 2.147.483.647 (2**31 - 1). |
BLOB | |
MEDIUMBLOB | |
LONGBLOB |
Contoh
Tabel sumber CDC
CREATE TEMPORARY TABLE mysqlcdc_source ( order_id INT, order_date TIMESTAMP(0), customer_name STRING, price DECIMAL(10, 5), product_id INT, order_status BOOLEAN, PRIMARY KEY(order_id) NOT ENFORCED ) WITH ( 'connector' = 'mysql', 'hostname' = '<yourHostname>', 'port' = '3306', 'username' = '<yourUsername>', 'password' = '<yourPassword>', 'database-name' = '<yourDatabaseName>', 'table-name' = '<yourTableName>' ); CREATE TEMPORARY TABLE blackhole_sink( order_id INT, customer_name STRING ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT order_id, customer_name FROM mysqlcdc_source;Tabel dimensi
CREATE TEMPORARY TABLE datagen_source( a INT, b BIGINT, c STRING, `proctime` AS PROCTIME() ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE mysql_dim ( a INT, b VARCHAR, c VARCHAR ) WITH ( 'connector' = 'mysql', 'hostname' = '<yourHostname>', 'port' = '3306', 'username' = '<yourUsername>', 'password' = '<yourPassword>', 'database-name' = '<yourDatabaseName>', 'table-name' = '<yourTableName>' ); CREATE TEMPORARY TABLE blackhole_sink( a INT, b STRING ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT T.a, H.b FROM datagen_source AS T JOIN mysql_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H ON T.a = H.a;Tabel sink
CREATE TEMPORARY TABLE datagen_source ( `name` VARCHAR, `age` INT ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE mysql_sink ( `name` VARCHAR, `age` INT ) WITH ( 'connector' = 'mysql', 'hostname' = '<yourHostname>', 'port' = '3306', 'username' = '<yourUsername>', 'password' = '<yourPassword>', 'database-name' = '<yourDatabaseName>', 'table-name' = '<yourTableName>' ); INSERT INTO mysql_sink SELECT * FROM datagen_source;Sumber ingesti data
source: type: mysql name: MySQL Source hostname: ${mysql.hostname} port: ${mysql.port} username: ${mysql.username} password: ${mysql.password} tables: ${mysql.source.table} server-id: 7601-7604 sink: type: values name: Values Sink print.enabled: true sink.print.logger: true
Tentang tabel sumber CDC MySQL
Cara kerja
Saat tabel sumber CDC MySQL dimulai, sistem memindai seluruh tabel, membaginya menjadi beberapa chunk berdasarkan primary key, dan mencatat posisi binlog saat ini. Sistem kemudian menggunakan algoritma snapshot inkremental untuk membaca data dari setiap chunk menggunakan pernyataan SELECT. Pekerjaan secara berkala melakukan checkpoint untuk mencatat chunk yang telah selesai. Jika terjadi failover, pekerjaan melanjutkan pembacaan dari chunk yang belum selesai. Setelah semua chunk terbaca, pekerjaan mulai membaca catatan perubahan inkremental dari posisi binlog yang sebelumnya dicatat. Pekerjaan Flink terus melakukan checkpoint berkala untuk mencatat posisi binlog. Jika pekerjaan gagal, sistem melanjutkan pemrosesan dari posisi binlog terakhir yang dicatat untuk mencapai semantik tepat-sekali.
Untuk penjelasan lebih rinci tentang algoritma snapshot inkremental, lihat Konektor CDC MySQL.
Metadata
Metadata berguna untuk menggabungkan database dan tabel yang terpisah (sharded). Setelah penggabungan, Anda mungkin perlu mengidentifikasi database dan tabel sumber untuk setiap baris data. Kolom metadata memungkinkan Anda mengakses informasi ini, sehingga memudahkan penggabungan beberapa tabel sharded menjadi satu tabel tujuan.
Sumber CDC MySQL mendukung sintaks kolom metadata. Anda dapat mengakses metadata berikut melalui kolom metadata.
Kunci metadata
Tipe metadata
Deskripsi
database_name
STRING NOT NULL
Nama database yang berisi baris tersebut.
table_name
STRING NOT NULL
Nama tabel yang berisi baris tersebut.
op_ts
TIMESTAMP_LTZ(3) NOT NULL
Waktu saat baris tersebut diubah di database. Jika catatan berasal dari data historis yang sudah ada di tabel, bukan dari binlog, nilai ini selalu 0.
op_type
STRING NOT NULL
Jenis perubahan baris tersebut.
+I: Pesan INSERT
-D: Pesan DELETE
-U: Pesan UPDATE_BEFORE
+U: Pesan UPDATE_AFTER
CatatanDidukung hanya di VVR 8.0.7 dan versi selanjutnya.
query_log
STRING NOT NULL
Catatan log kueri MySQL yang sesuai dengan baris yang dibaca.
CatatanUntuk mencatat kueri, MySQL memerlukan pengaktifan parameter binlog_rows_query_log_events.
Contoh berikut menunjukkan cara menggabungkan beberapa tabel orders dari berbagai database terpisah dalam instans MySQL dan menyinkronkannya ke tabel holo_orders di Hologres.
CREATE TEMPORARY TABLE mysql_orders ( db_name STRING METADATA FROM 'database_name' VIRTUAL, -- Membaca nama database. table_name STRING METADATA FROM 'table_name' VIRTUAL, -- Membaca nama tabel. operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL, -- Membaca timestamp perubahan. op_type STRING METADATA FROM 'op_type' VIRTUAL, -- Membaca jenis perubahan. order_id INT, order_date TIMESTAMP(0), customer_name STRING, price DECIMAL(10, 5), product_id INT, order_status BOOLEAN, PRIMARY KEY(order_id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'localhost', 'port' = '3306', 'username' = 'flinkuser', 'password' = 'flinkpw', 'database-name' = 'mydb_.*', -- Mencocokkan beberapa database terpisah menggunakan ekspresi reguler. 'table-name' = 'orders_.*' -- Mencocokkan beberapa tabel terpisah menggunakan ekspresi reguler. ); INSERT INTO holo_orders SELECT * FROM mysql_orders;Berdasarkan kode di atas, jika Anda mengatur opsi scan.read-changelog-as-append-only.enabled ke true dalam klausa WITH, output bervariasi berdasarkan pengaturan primary key tabel downstream:
Jika primary key tabel downstream adalah order_id, output hanya mencakup perubahan terakhir untuk setiap primary key di tabel sumber. Misalnya, jika perubahan terakhir untuk primary key adalah operasi hapus, catatan dengan primary key yang sama dan op_type -D muncul di tabel downstream.
Jika primary key tabel downstream adalah gabungan dari order_id, operation_ts, dan op_type, output mencakup riwayat perubahan lengkap untuk setiap primary key di tabel sumber.
Dukungan ekspresi reguler
Tabel sumber CDC MySQL mendukung penggunaan ekspresi reguler dalam nama tabel atau nama database untuk mencocokkan beberapa tabel atau database. Contoh berikut menunjukkan cara menentukan beberapa tabel menggunakan ekspresi reguler.
CREATE TABLE products ( db_name STRING METADATA FROM 'database_name' VIRTUAL, table_name STRING METADATA FROM 'table_name' VIRTUAL, operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL, order_id INT, order_date TIMESTAMP(0), customer_name STRING, price DECIMAL(10, 5), product_id INT, order_status BOOLEAN, PRIMARY KEY(order_id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'localhost', 'port' = '3306', 'username' = 'root', 'password' = '123456', 'database-name' = '(^(test).*|^(tpc).*|txc|.*[p$]|t{2})', -- Mencocokkan beberapa database menggunakan ekspresi reguler. 'table-name' = '(t[5-8]|tt)' -- Mencocokkan beberapa tabel menggunakan ekspresi reguler. );Penjelasan ekspresi reguler dalam contoh di atas:
^(test).* adalah contoh pencocokan awalan. Ekspresi ini dapat mencocokkan nama database yang dimulai dengan test, seperti test1 atau test2.
.*[p$] adalah contoh pencocokan akhiran. Ekspresi ini dapat mencocokkan nama database yang diakhiri dengan p, seperti cdcp atau edcp.
txc adalah pencocokan spesifik. Ekspresi ini dapat mencocokkan nama database tertentu, seperti txc.
Saat mencocokkan nama tabel lengkap, CDC MySQL menggunakan nama database dan nama tabel untuk mengidentifikasi tabel secara unik. Sistem menggunakan pola database-name.table-name untuk pencocokan. Misalnya, pola (^(test).*|^(tpc).*|txc|.*[p$]|t{2}).(t[ 5-8]|tt) dapat mencocokkan tabel seperti txc.tt dan test2.test5 dalam database.
PentingDalam konfigurasi pekerjaan SQL, opsi table-name dan database-name tidak mendukung penggunaan koma (,) untuk menentukan beberapa tabel atau database.
Untuk mencocokkan beberapa tabel atau menggunakan beberapa ekspresi reguler, hubungkan dengan garis vertikal (|) dan masukkan dalam tanda kurung. Misalnya, untuk membaca tabel user dan product, Anda dapat mengonfigurasi table-name sebagai
(user|product).Jika ekspresi reguler berisi koma, Anda harus menulis ulang menggunakan operator garis vertikal (|). Misalnya, ekspresi reguler
mytable_\d{1, 2}perlu ditulis ulang menjadi setara(mytable_\d{1}|mytable_\d{2})untuk menghindari penggunaan koma.
Kontrol konkurensi
Konektor MySQL mendukung pembacaan data lengkap secara konkuren, yang meningkatkan efisiensi pemuatan data. Saat dikombinasikan dengan Autopilot di konsol Realtime Compute for Apache Flink, sistem dapat secara otomatis mengurangi sumber daya komputasi selama fase inkremental setelah pembacaan konkuren selesai, sehingga menghemat sumber daya komputasi.
Di Konsol pengembangan Realtime Compute for Apache Flink, Anda dapat mengatur paralelisme pekerjaan di halaman Konfigurasi Sumber Daya baik dalam mode Dasar maupun mode Ahli. Perbedaannya sebagai berikut:
Paralelisme yang diatur dalam mode Dasar adalah paralelisme global untuk seluruh pekerjaan.

Mode Ahli memungkinkan Anda mengatur paralelisme untuk VERTEX tertentu sesuai kebutuhan.

Untuk informasi selengkapnya tentang konfigurasi sumber daya, lihat Konfigurasi penerapan pekerjaan.
PentingTerlepas dari apakah Anda menggunakan mode Dasar atau mode Ahli, rentang server-id yang dideklarasikan dalam tabel harus lebih besar dari atau sama dengan paralelisme pekerjaan. Misalnya, jika rentang server-id adalah 5404-5412, rentang tersebut berisi 9 ID server unik, memungkinkan paralelisme pekerjaan maksimum 9. Pekerjaan berbeda untuk instans MySQL yang sama harus memiliki rentang server-id yang tidak tumpang tindih, artinya setiap pekerjaan harus memiliki server-id yang dikonfigurasi secara eksplisit dan unik.
Autopilot Autoscaling
Fase data lengkap mengakumulasi banyak data historis. Untuk meningkatkan efisiensi pembacaan, data historis biasanya dibaca secara konkuren. Namun, dalam fase binlog inkremental, karena volume data binlog kecil dan urutan global harus dipertahankan, paralelisme tunggal biasanya cukup. Autopilot dapat secara otomatis menyeimbangkan performa dan sumber daya untuk memenuhi persyaratan berbeda dari fase lengkap dan inkremental ini.
Autopilot memantau trafik setiap tugas di Sumber CDC MySQL. Saat pekerjaan memasuki fase binlog, jika hanya satu tugas yang bertanggung jawab membaca binlog dan tugas lainnya idle, Autopilot akan secara otomatis mengurangi jumlah CU dan paralelisme Sumber. Untuk mengaktifkan Autopilot, atur mode Autopilot ke Aktif di halaman O&M pekerjaan.
CatatanInterval pemicu minimum untuk mengurangi paralelisme adalah 24 jam secara default. Untuk informasi selengkapnya tentang opsi dan detail Autopilot, lihat Konfigurasi Autopilot.
Mode mulai
Anda dapat menggunakan opsi scan.startup.mode untuk menentukan mode startup untuk tabel sumber CDC MySQL. Opsi tersebut meliputi:
initial (default): Melakukan pembacaan lengkap tabel database saat startup pertama kali, lalu beralih ke mode inkremental untuk membaca binlog.
earliest-offset: Melewati fase snapshot dan mulai membaca dari posisi binlog paling awal yang tersedia.
latest-offset: Melewati fase snapshot dan mulai membaca dari akhir binlog. Dalam mode ini, tabel sumber hanya dapat membaca perubahan data yang terjadi setelah pekerjaan dimulai.
specific-offset: Melewati fase snapshot dan mulai membaca dari posisi binlog tertentu. Posisi dapat ditentukan oleh nama file binlog dan posisi, atau menggunakan set GTID.
timestamp: Melewati fase snapshot dan mulai membaca event binlog dari timestamp tertentu.
Contoh:
CREATE TABLE mysql_source (...) WITH ( 'connector' = 'mysql-cdc', 'scan.startup.mode' = 'earliest-offset', -- Mulai dari offset paling awal. 'scan.startup.mode' = 'latest-offset', -- Mulai dari offset terbaru. 'scan.startup.mode' = 'specific-offset', -- Mulai dari offset tertentu. 'scan.startup.mode' = 'timestamp', -- Mulai dari timestamp tertentu. 'scan.startup.specific-offset.file' = 'mysql-bin.000003', -- Tentukan nama file binlog untuk mode specific-offset. 'scan.startup.specific-offset.pos' = '4', -- Tentukan posisi binlog untuk mode specific-offset. 'scan.startup.specific-offset.gtid-set' = '24DA167-0C0C-11E8-8442-00059A3C7B00:1-19', -- Tentukan set GTID untuk mode specific-offset. 'scan.startup.timestamp-millis' = '1667232000000' -- Tentukan timestamp startup untuk mode timestamp. ... )PentingSumber MySQL mencatat posisi saat ini pada level INFO selama checkpoint. Awalan log adalah
Binlog offset on checkpoint {checkpoint-id}. Log ini dapat membantu Anda memulai ulang pekerjaan dari posisi checkpoint tertentu.Jika skema tabel yang dibaca telah berubah di masa lalu, memulai dari earliest-offset, specific-offset, atau timestamp dapat menyebabkan error. Hal ini karena pembaca Debezium secara internal menyimpan skema terbaru, dan data lama dengan skema yang tidak cocok tidak dapat diurai dengan benar.
Tentang tabel sumber CDC tanpa kunci
Untuk menggunakan tabel tanpa kunci, Anda harus mengatur scan.incremental.snapshot.chunk.key-column dan menentukan kolom non-null.
Semantik pemrosesan tabel sumber CDC tanpa kunci ditentukan oleh perilaku kolom yang ditentukan dalam scan.incremental.snapshot.chunk.key-column:
Jika kolom yang ditentukan tidak diperbarui, semantik tepat-sekali dapat dijamin.
Jika kolom yang ditentukan diperbarui, hanya semantik at-least-once yang dapat dijamin. Namun, Anda dapat memastikan kebenaran data dengan menggabungkannya dengan sistem downstream, menentukan primary key downstream, dan menggunakan operasi idempoten.
Baca log cadangan dari ApsaraDB RDS for MySQL
Tabel sumber CDC MySQL mendukung pembacaan log cadangan dari ApsaraDB RDS for MySQL. Ini berguna ketika fase snapshot lengkap memakan waktu lama, yang dapat menyebabkan file binlog lokal dibersihkan sebelum sempat dibaca. Jika file cadangan tersedia, konektor dapat membacanya sebagai gantinya.
Contoh:
CREATE TABLE mysql_source (...) WITH ( 'connector' = 'mysql-cdc', 'rds.region-id' = 'cn-beijing', 'rds.access-key-id' = 'xxxxxxxxx', 'rds.access-key-secret' = 'xxxxxxxxx', 'rds.db-instance-id' = 'rm-xxxxxxxxxxxxxxxxx', 'rds.main-db-id' = '12345678', 'rds.download.timeout' = '60s' ... )Aktifkan penggunaan ulang Sumber CDC
Dalam pekerjaan yang sama, beberapa tabel sumber CDC MySQL memulai beberapa klien binlog. Jika semua tabel sumber berada dalam instans yang sama, hal ini meningkatkan tekanan pada database. Untuk informasi selengkapnya, lihat FAQ CDC MySQL.
Solusi
VVR 8.0.7 dan versi selanjutnya mendukung penggunaan ulang Sumber CDC MySQL. Fitur ini menggabungkan tabel sumber CDC MySQL yang memenuhi syarat. Tabel sumber memenuhi syarat untuk digabungkan jika item konfigurasinya identik, kecuali untuk nama database, nama tabel, dan
server-id. Mesin secara otomatis menggabungkan sumber CDC MySQL dalam pekerjaan yang sama.Prosedur
Gunakan perintah
SETdalam pekerjaan SQL:SET 'table.optimizer.source-merge.enabled' = 'true'; # (Untuk VVR 8.0.8 dan 8.0.9) Atur juga item ini: SET 'sql-gateway.exec-plan.enabled' = 'false';Penggunaan ulang diaktifkan secara default di VVR 11.1 dan versi selanjutnya.
Jalankan pekerjaan tanpa state. Memodifikasi item konfigurasi penggunaan ulang Sumber mengubah topologi pekerjaan. Anda harus menjalankan pekerjaan tanpa state. Jika tidak, pekerjaan mungkin gagal dijalankan atau kehilangan data. Jika Sumber digabung, node
MergetableSourceScanmuncul.
PentingSetelah Anda mengaktifkan penggunaan ulang, kami tidak merekomendasikan menonaktifkan operator chaining. Jika Anda mengatur
pipeline.operator-chainingkefalse, hal ini meningkatkan overhead serialisasi dan deserialisasi data. Semakin banyak Sumber yang digabung, semakin besar overhead-nya.Di VVR 8.0.7, menonaktifkan operator chaining menyebabkan masalah serialisasi.
Percepat pembacaan binlog
Saat Anda menggunakan konektor MySQL sebagai tabel sumber atau sumber ingesti data, sistem mengurai file binlog untuk menghasilkan berbagai pesan perubahan selama fase inkremental. File binlog mencatat semua perubahan tabel dalam format biner. Anda dapat mempercepat penguraian file binlog dengan cara berikut.
Aktifkan filter penguraian
Gunakan opsi
scan.only.deserialize.captured.tables.changelog.enableduntuk mengurai event perubahan hanya untuk tabel yang ditentukan.
Optimalkan opsi Debezium
debezium.max.queue.size: 162580 debezium.max.batch.size: 40960 debezium.poll.interval.ms: 50debezium.max.queue.size: Jumlah maksimum catatan yang dapat ditampung oleh antrian pemblokiran. Saat Debezium membaca aliran event dari database, sistem menempatkan event dalam antrian pemblokiran sebelum menuliskannya ke downstream. Nilai default adalah 8192.debezium.max.batch.size: Jumlah maksimum event yang diproses konektor dalam setiap iterasi. Nilai default adalah 2048.debezium.poll.interval.ms: Jumlah milidetik yang harus ditunggu konektor sebelum meminta event perubahan baru. Nilai default adalah 1000 milidetik, atau 1 detik.
Contoh:
CREATE TABLE mysql_source (...) WITH (
'connector' = 'mysql-cdc',
-- Konfigurasi Debezium
'debezium.max.queue.size' = '162580',
'debezium.max.batch.size' = '40960',
'debezium.poll.interval.ms' = '50',
-- Aktifkan filter penguraian
'scan.only.deserialize.captured.tables.changelog.enabled' = 'true', -- Mengurai event perubahan hanya untuk tabel yang ditentukan.
...
)source:
type: mysql
name: MySQL Source
hostname: ${mysql.hostname}
port: ${mysql.port}
username: ${mysql.username}
password: ${mysql.password}
tables: ${mysql.source.table}
server-id: 7601-7604
# Konfigurasi Debezium
debezium.max.queue.size: 162580
debezium.max.batch.size: 40960
debezium.poll.interval.ms: 50
# Aktifkan filter penguraian
scan.only.deserialize.captured.tables.changelog.enabled: trueEdisi Perusahaan CDC MySQL memiliki kapasitas konsumsi binlog 85 MB/detik, sekitar dua kali lipat dari versi komunitas open source. Jika kecepatan pembuatan binlog melebihi 85 MB/detik (setara dengan satu file 512 MB setiap 6 detik), latensi pekerjaan Flink akan terus meningkat. Latensi pemrosesan akan berangsur-angsur berkurang setelah kecepatan pembuatan binlog melambat. Saat file binlog berisi transaksi besar, latensi pemrosesan mungkin sementara meningkat lalu berkurang setelah log transaksi dibaca.
API DataStream CDC MySQL
Untuk membaca dan menulis data menggunakan API DataStream, Anda harus menggunakan konektor DataStream yang sesuai untuk terhubung ke Flink. Untuk informasi tentang cara mengatur konektor DataStream, lihat Integrasikan dan gunakan konektor dalam program DataStream.
Contoh berikut menunjukkan cara membuat program API DataStream dan menggunakan MySqlSource, termasuk dependensi pom yang diperlukan.
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
public class MySqlSourceExample {
public static void main(String[] args) throws Exception {
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("yourHostname")
.port(yourPort)
.databaseList("yourDatabaseName") // Atur database yang akan ditangkap.
.tableList("yourDatabaseName.yourTableName") // Atur tabel yang akan ditangkap.
.username("yourUsername")
.password("yourPassword")
.deserializer(new JsonDebeziumDeserializationSchema()) // Mengonversi SourceRecord ke string JSON.
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Aktifkan checkpointing.
env.enableCheckpointing(3000);
env
.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
// Atur 4 tugas sumber paralel.
.setParallelism(4)
.print().setParallelism(1); // Gunakan paralelisme 1 untuk sink guna mempertahankan urutan pesan.
env.execute("Print MySQL Snapshot + Binlog");
}
}<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-mysql</artifactId>
<version>${vvr.version}</version>
</dependency>Saat membangun MySqlSource, Anda harus menentukan parameter berikut dalam kode Anda:
Parameter | Deskripsi |
hostname | Alamat IP atau hostname database MySQL. |
port | Nomor port layanan database MySQL. |
databaseList | Nama database MySQL. Catatan Nama database mendukung ekspresi reguler untuk membaca data dari beberapa database. Gunakan |
username | Username untuk layanan database MySQL. |
password | Password untuk layanan database MySQL. |
deserializer | Deserializer yang mendeserialisasi catatan tipe SourceRecord ke tipe yang ditentukan. Parameter dapat diatur ke salah satu nilai berikut:
|
Anda harus menentukan parameter berikut dalam dependensi pom Anda:
${vvr.version} | Versi mesin Realtime Compute for Apache Flink. Misalnya, Catatan Gunakan nomor versi yang ditampilkan di Maven. Kami secara berkala merilis versi hotfix, dan pembaruan ini mungkin tidak diumumkan melalui saluran lain. |
${flink.version} | Versi Apache Flink. Misalnya: Penting Pastikan versi Apache Flink Anda sesuai dengan versi mesin Realtime Compute for Apache Flink untuk menghindari masalah ketidakcocokan selama runtime pekerjaan. Untuk informasi selengkapnya tentang pemetaan versi, lihat Mesin. |
FAQ
Untuk informasi tentang masalah yang mungkin Anda temui saat menggunakan tabel sumber CDC, lihat Masalah CDC.