Tema ini menjelaskan cara menggunakan konektor MySQL.
Informasi latar belakang
Konektor MySQL mendukung semua database yang kompatibel dengan protokol MySQL, seperti ApsaraDB RDS for MySQL, PolarDB for MySQL, OceanBase (mode MySQL), dan self-managed MySQL.
Saat menggunakan konektor MySQL untuk membaca data dari OceanBase, pastikan binary logging (Binlog) diaktifkan dan dikonfigurasi dengan benar. Untuk informasi selengkapnya, lihat Operasi terkait Binlog. Fitur ini berada dalam pratinjau publik. Kami menyarankan agar Anda mengevaluasinya secara menyeluruh sebelum menggunakannya.
Konektor MySQL mendukung fitur-fitur berikut.
Kategori | Rincian |
Tipe yang didukung | Tabel sumber, tabel dimensi, tabel sink, dan sumber data Ingesti Data |
Mode runtime | Hanya mode streaming yang didukung. |
Format data | Tidak berlaku |
Metrik pemantauan spesifik | |
Tipe API | DataStream, SQL, dan YAML ingesti data |
Mendukung pembaruan atau penghapusan data di tabel sink | Ya |
Fitur
Tabel sumber change data capture (CDC) MySQL adalah tabel sumber streaming yang pertama-tama membaca seluruh data historis dari database, lalu secara mulus beralih ke pembacaan log biner (Binlog) untuk memastikan tidak ada data yang terlewat atau duplikat. Semantik tepat-sekali dijamin bahkan jika terjadi kegagalan. Tabel sumber CDC MySQL mendukung pembacaan konkuren data penuh dan menggunakan algoritma snapshot inkremental untuk menerapkan pembacaan tanpa lock serta transfer data yang dapat dilanjutkan. Untuk informasi selengkapnya, lihat Tentang tabel sumber CDC MySQL.
Pemrosesan batch dan stream terpadu: Konektor mendukung pembacaan data penuh dan inkremental, sehingga menghilangkan kebutuhan akan pipeline terpisah.
Mendukung pembacaan konkuren data penuh untuk penskalaan kinerja horizontal.
Beralih mulus dari pembacaan data penuh ke pembacaan data inkremental dan secara otomatis melakukan skala-masuk untuk menghemat sumber daya komputasi.
Mendukung transfer data yang dapat dilanjutkan selama fase pembacaan data penuh guna meningkatkan stabilitas.
Pembacaan data penuh tanpa lock tidak memengaruhi operasi bisnis online.
Mendukung pembacaan log backup dari ApsaraDB RDS for MySQL.
Mengurai file log biner secara paralel untuk mengurangi latensi data.
Prasyarat
Sebelum menggunakan tabel sumber CDC MySQL, Anda harus mengonfigurasi MySQL sesuai petunjuk dalam Konfigurasi MySQL untuk memenuhi prasyarat.
RDS for MySQL
Anda dapat melakukan probe jaringan dengan Realtime Compute for Apache Flink untuk memastikan konektivitas jaringan.
Versi MySQL yang didukung: 5.6, 5.7, dan 8.0.x.
Binary logging (Binlog) harus diaktifkan. Secara default sudah diaktifkan.
Format log biner harus ROW. Ini adalah format default.
Atur binlog_row_image ke FULL. Ini adalah pengaturan default.
Nonaktifkan Binary Log Transaction Compression. Fitur ini diperkenalkan di MySQL 8.0.20 dan dinonaktifkan secara default.
Akun pengguna MySQL harus dibuat dan diberikan izin SELECT, SHOW DATABASES, REPLICATION SLAVE, dan REPLICATION CLIENT.
Database dan tabel MySQL harus dibuat. Untuk informasi selengkapnya, lihat Buat database dan akun untuk instans ApsaraDB RDS for MySQL. Kami menyarankan agar Anda menggunakan akun istimewa untuk membuat database MySQL guna mencegah kegagalan operasi akibat izin yang tidak mencukupi.
Daftar putih IP harus dikonfigurasi. Untuk informasi selengkapnya, lihat Konfigurasi daftar putih untuk instans ApsaraDB RDS for MySQL.
PolarDB untuk MySQL
Anda dapat melakukan probe jaringan dengan Realtime Compute for Apache Flink untuk memastikan konektivitas jaringan.
Versi MySQL yang didukung: 5.6, 5.7, dan 8.0.x.
Binary logging (Binlog) harus diaktifkan. Secara default dinonaktifkan.
Format log biner harus ROW. Ini adalah format default.
Atur binlog_row_image ke FULL. Ini adalah pengaturan default.
Nonaktifkan Binary Log Transaction Compression. Fitur ini diperkenalkan di MySQL 8.0.20 dan dinonaktifkan secara default.
Akun pengguna MySQL harus dibuat dan diberikan izin SELECT, SHOW DATABASES, REPLICATION SLAVE, dan REPLICATION CLIENT.
Database dan tabel MySQL harus dibuat. Untuk informasi selengkapnya, lihat Buat database dan akun untuk kluster PolarDB for MySQL. Kami menyarankan agar Anda menggunakan akun istimewa untuk membuat database MySQL guna mencegah kegagalan operasi akibat izin yang tidak mencukupi.
Daftar putih IP harus dikonfigurasi. Untuk informasi selengkapnya, lihat Konfigurasi daftar putih untuk kluster PolarDB for MySQL.
MySQL yang dikelola sendiri
Anda dapat melakukan probe jaringan dengan Realtime Compute for Apache Flink untuk memastikan konektivitas jaringan.
Versi MySQL yang didukung: 5.6, 5.7, dan 8.0.x.
Binary logging (Binlog) harus diaktifkan. Secara default dinonaktifkan.
Format log biner harus ROW. Format default adalah STATEMENT.
Atur binlog_row_image ke FULL. Ini adalah pengaturan default.
Nonaktifkan Binary Log Transaction Compression. Fitur ini diperkenalkan di MySQL 8.0.20 dan dinonaktifkan secara default.
Akun pengguna MySQL harus dibuat dan diberikan izin SELECT, SHOW DATABASES, REPLICATION SLAVE, dan REPLICATION CLIENT.
Database dan tabel MySQL harus dibuat. Untuk informasi selengkapnya, lihat Buat database dan akun untuk instans self-managed MySQL. Kami menyarankan agar Anda menggunakan akun istimewa untuk membuat database MySQL guna mencegah kegagalan operasi akibat izin yang tidak mencukupi.
Daftar putih IP harus dikonfigurasi. Untuk informasi selengkapnya, lihat Konfigurasi daftar putih untuk instans self-managed MySQL.
Batasan
Batasan umum
Tabel sumber CDC MySQL tidak mendukung definisi watermark.
Pada pekerjaan Create Table As Select (CTAS) dan Create Database As Select (CDAS), tabel sumber CDC MySQL dapat menyinkronkan beberapa perubahan skema. Untuk informasi selengkapnya tentang tipe perubahan yang didukung, lihat Kebijakan sinkronisasi evolusi skema.
Konektor CDC MySQL tidak mendukung fitur Binary Log Transaction Compression. Oleh karena itu, saat menggunakan konektor CDC MySQL untuk mengonsumsi data inkremental, pastikan fitur ini dinonaktifkan. Jika tidak, Anda mungkin gagal mengambil data inkremental.
RDS for MySQL batasan
Untuk RDS for MySQL, kami tidak menyarankan membaca data dari database secondary atau replica read-only. Hal ini karena periode retensi default untuk log biner pada instans tersebut sangat singkat. Jika log biner kedaluwarsa dan dibersihkan, pekerjaan gagal mengonsumsi data log biner dan melaporkan error.
RDS for MySQL secara default mengaktifkan replikasi paralel primary/secondary tetapi tidak menjamin urutan transaksi yang konsisten antara database primary dan secondary. Hal ini dapat menyebabkan kehilangan data selama pemulihan data dari checkpoint setelah alih bencana primary/secondary. Untuk menghindari masalah ini, Anda dapat secara manual mengaktifkan opsi slave_preserve_commit_order untuk RDS for MySQL.
PolarDB untuk MySQL batasan
Tabel sumber CDC MySQL tidak mendukung pembacaan data dari arsitektur Kluster Multi-master (untuk informasi selengkapnya, lihat Apa itu Kluster Multi-master?) pada PolarDB for MySQL versi 1.0.19 dan sebelumnya. Log biner yang dihasilkan oleh versi kluster ini mungkin berisi ID tabel duplikat, yang dapat menyebabkan kesalahan pemetaan skema pada tabel sumber CDC dan mengakibatkan error saat mengurai data log biner.
Open source MySQL batasan
Dengan konfigurasi default, MySQL mempertahankan urutan transaksi selama replikasi log biner primary-secondary. Jika replica MySQL memiliki replikasi paralel diaktifkan (slave_parallel_workers > 1) tetapi tidak memiliki slave_preserve_commit_order=ON, urutan commit transaksinya mungkin tidak konsisten dengan database primary. Saat Flink CDC pulih dari checkpoint, data mungkin terlewat karena urutan yang salah. Kami menyarankan mengatur slave_preserve_commit_order = ON pada replica MySQL. Atau, Anda dapat mengatur slave_parallel_workers = 1, yang akan mengorbankan kinerja replikasi.
Catatan
Tabel sink
Jangan deklarasikan primary key auto-increment dalam DDL. MySQL akan mengisinya secara otomatis saat menulis data.
Anda harus mendeklarasikan setidaknya satu bidang non-primary key. Jika tidak, error akan dilaporkan.
Dalam DDL, NOT ENFORCED berarti Flink tidak menegakkan kendala primary key. Anda harus memastikan kebenaran dan integritas primary key. Untuk informasi selengkapnya, lihat Validity Check.
Tabel dimensi
Jika Anda ingin menggunakan indeks untuk mempercepat kueri, urutan bidang dalam kondisi JOIN harus sesuai dengan urutan yang ditentukan dalam indeks (aturan prefiks paling kiri). Misalnya, jika indeks adalah (a, b, c), kondisi JOIN-nya adalah
ON t.a = x AND t.b = y.SQL yang dihasilkan oleh Flink mungkin ditulis ulang oleh pengoptimal, yang mencegah penggunaan indeks selama kueri database aktual. Untuk memastikan apakah indeks digunakan, Anda dapat memeriksa rencana eksekusi (EXPLAIN) atau Redis Slow Log di MySQL untuk melihat pernyataan SELECT aktual yang dieksekusi.
SQL
Anda dapat menggunakan konektor MySQL dalam pekerjaan SQL sebagai tabel sumber, tabel dimensi, atau tabel sink.
Sintaksis
CREATE TEMPORARY TABLE mysqlcdc_source (
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
'connector' = 'mysql',
'hostname' = '<yourHostname>',
'port' = '3306',
'username' = '<yourUsername>',
'password' = '<yourPassword>',
'database-name' = '<yourDatabaseName>',
'table-name' = '<yourTableName>'
);Cara konektor menulis ke tabel sink: Saat menulis ke tabel sink, konektor membuat dan mengeksekusi pernyataan SQL untuk setiap catatan data yang diterima. Pernyataan SQL spesifik yang dieksekusi adalah sebagai berikut:
Untuk tabel sink tanpa primary key, pernyataan
INSERT INTO table_name (column1, column2, ...) VALUES (value1, value2, ...);dieksekusi.Untuk tabel sink dengan primary key, pernyataan
INSERT INTO table_name (column1, column2, ...) VALUES (value1, value2, ...) ON DUPLICATE KEY UPDATE column1 = VALUES(column1), column2 = VALUES(column2), ...;dieksekusi. Catatan: Jika tabel fisik memiliki kendala indeks unik selain primary key, memasukkan dua catatan dengan primary key berbeda tetapi nilai indeks unik yang sama menyebabkan konflik indeks unik. Konflik ini dapat menyebabkan data ditimpa dan hilang.
Jika primary key auto-increment didefinisikan dalam database MySQL, jangan deklarasikan kolom auto-increment dalam DDL Flink. Database akan mengisi kolom ini secara otomatis selama penulisan data. Konektor hanya mendukung penulisan dan penghapusan data dengan kolom auto-increment, bukan pembaruan.
Parameter WITH
Umum
Parameter
Deskripsi
Required
Tipe data
Nilai default
Catatan
connector
Tipe tabel.
Ya
STRING
Tidak ada
Saat digunakan sebagai tabel sumber, Anda dapat mengatur ini ke
mysql-cdcataumysql. Kedua nilai ini setara. Saat digunakan sebagai tabel dimensi atau sink, nilainya harusmysql.hostname
Alamat IP atau nama host database MySQL.
Ya
STRING
Tidak ada
Kami menyarankan agar Anda memasukkan alamat virtual private cloud (VPC).
CatatanJika database MySQL dan Realtime Compute for Apache Flink tidak berada dalam VPC yang sama, Anda harus membuat koneksi jaringan cross-VPC atau menggunakan titik akhir publik untuk akses. Untuk informasi selengkapnya, lihat Manajemen dan operasi penyimpanan dan Bagaimana kluster Flink yang sepenuhnya dikelola mengakses Internet?.
username
Nama pengguna layanan database MySQL.
Ya
STRING
Tidak ada
Tidak ada.
password
Kata sandi layanan database MySQL.
Ya
STRING
Tidak ada
Tidak ada.
database-name
Nama database MySQL.
Ya
STRING
Tidak ada
Saat digunakan sebagai tabel sumber, nama database mendukung ekspresi reguler untuk membaca data dari beberapa database.
Saat menggunakan ekspresi reguler, hindari penggunaan simbol ^ dan $ untuk mencocokkan awal dan akhir string. Untuk alasan spesifiknya, lihat keterangan untuk table-name.
table-name
Nama tabel MySQL.
Ya
STRING
Tidak ada
Saat digunakan sebagai tabel sumber, nama tabel mendukung ekspresi reguler untuk membaca data dari beberapa tabel.
Saat membaca dari beberapa tabel MySQL, kirimkan beberapa pernyataan CTAS sebagai satu pekerjaan untuk menghindari pengaktifan beberapa listener Binlog, yang meningkatkan kinerja dan efisiensi. Untuk informasi selengkapnya, lihat Beberapa pernyataan CTAS: Kirim sebagai satu pekerjaan.
Saat menggunakan ekspresi reguler, hindari penggunaan simbol ^ dan $ untuk mencocokkan awal dan akhir string. Alasannya dijelaskan di bawah ini.
CatatanSaat mencocokkan nama tabel dengan ekspresi reguler, tabel sumber CDC MySQL menggabungkan database-name dan table-name yang Anda berikan dengan string \\. (atau karakter . sebelum Ververica Runtime (VVR) 8.0.1) untuk membentuk ekspresi reguler yang memenuhi syarat sepenuhnya. Ekspresi ini kemudian digunakan untuk mencocokkan nama tabel yang memenuhi syarat sepenuhnya dalam database MySQL.
Misalnya, jika Anda mengonfigurasi 'database-name'='db_.*' dan 'table-name'='tb_.+', konektor akan menggunakan ekspresi reguler db_.*\\.tb_.+ (atau db_.*.tb_.+ sebelum versi 8.0.1) untuk mencocokkan nama tabel yang memenuhi syarat sepenuhnya guna menentukan tabel mana yang akan dibaca.
port
Nomor port layanan database MySQL.
Tidak
INTEGER
3306
Tidak ada.
Khusus tabel sumber
Parameter
Deskripsi
Diperlukan
Tipe data
Nilai default
Catatan
server-id
ID numerik untuk klien database.
Tidak
STRING
Nilai acak antara 5400 dan 6400 dihasilkan.
ID ini harus unik secara global dalam kluster MySQL. Kami menyarankan agar Anda menetapkan ID yang berbeda untuk setiap pekerjaan yang terhubung ke database yang sama.
Parameter ini juga mendukung rentang ID, seperti 5400-5408. Saat pembacaan inkremental diaktifkan, beberapa pembaca konkuren didukung. Dalam kasus ini, kami menyarankan agar Anda menetapkan rentang ID sehingga setiap pembaca konkuren menggunakan ID yang berbeda. Untuk informasi selengkapnya, lihat Penggunaan server ID.
scan.incremental.snapshot.enabled
Menentukan apakah akan mengaktifkan snapshot inkremental.
Tidak
BOOLEAN
true
Snapshot inkremental diaktifkan secara default. Ini adalah mekanisme baru untuk membaca snapshot data penuh. Dibandingkan dengan metode pembacaan snapshot lama, snapshot inkremental menawarkan beberapa keunggulan:
Sumber dapat membaca data penuh secara paralel.
Sumber mendukung checkpoint tingkat chunk saat membaca data penuh.
Sumber tidak perlu memperoleh kunci baca global (FLUSH TABLES WITH read lock) saat membaca data penuh.
Jika Anda ingin sumber mendukung pembacaan konkuren, setiap pembaca konkuren memerlukan server ID yang unik. Oleh karena itu, server-id harus berupa rentang, seperti 5400-6400, dan ukuran rentang harus lebih besar dari atau sama dengan tingkat paralelisme.
CatatanItem konfigurasi ini dihapus di mesin komputasi Flink VVR 11.1 dan versi yang lebih baru.
scan.incremental.snapshot.chunk.size
Ukuran setiap chunk (jumlah baris).
Tidak
INTEGER
8096
Saat pembacaan snapshot inkremental diaktifkan, tabel dibagi menjadi beberapa chunk untuk dibaca. Data dalam chunk di-cache dalam memori sebelum sepenuhnya dibaca.
Jumlah baris per chunk yang lebih kecil menghasilkan total jumlah chunk yang lebih besar di tabel. Meskipun ini meningkatkan granularitas pemulihan kesalahan, ini dapat menyebabkan kesalahan kehabisan memori (OOM) dan throughput keseluruhan yang lebih rendah. Oleh karena itu, Anda perlu menemukan keseimbangan dan menetapkan ukuran chunk yang masuk akal.
scan.snapshot.fetch.size
Jumlah maksimum record yang diambil sekaligus saat membaca data penuh dari tabel.
Tidak
INTEGER
1024
Tidak ada.
scan.startup.mode
Mode startup untuk konsumsi data.
Tidak
STRING
initial
Nilai valid:
initial (default): Saat startup pertama, memindai data historis penuh dan kemudian membaca data Binlog terbaru.
latest-offset: Saat startup pertama, tidak memindai data historis dan mulai membaca dari akhir Binlog (posisi terbaru). Hanya membaca perubahan yang terjadi setelah konektor dimulai.
earliest-offset: Tidak memindai data historis dan mulai membaca dari Binlog yang tersedia paling awal.
specific-offset: Tidak memindai data historis dan mulai dari offset Binlog tertentu. Anda dapat menentukan offset dengan mengonfigurasi scan.startup.specific-offset.file dan scan.startup.specific-offset.pos, atau Anda dapat menentukan set GTID dengan hanya mengonfigurasi scan.startup.specific-offset.gtid-set.
timestamp: Tidak memindai data historis dan mulai membaca Binlog dari timestamp tertentu. Timestamp ditentukan oleh scan.startup.timestamp-millis dalam milidetik.
PentingSaat menggunakan mode startup earliest-offset, specific-offset, atau timestamp, pastikan skema tabel tidak berubah antara posisi konsumsi Binlog yang ditentukan dan waktu startup pekerjaan untuk menghindari error akibat ketidakcocokan skema.
scan.startup.specific-offset.file
Nama file Binlog untuk offset awal saat menggunakan mode startup offset tertentu.
Tidak
STRING
Tidak ada
Saat menggunakan konfigurasi ini, scan.startup.mode harus diatur ke specific-offset. Contoh format nama file:
mysql-bin.000003.scan.startup.specific-offset.pos
Offset dalam file Binlog yang ditentukan untuk offset awal saat menggunakan mode startup offset tertentu.
Tidak
INTEGER
Tidak ada
Saat menggunakan konfigurasi ini, scan.startup.mode harus diatur ke specific-offset.
scan.startup.specific-offset.gtid-set
Set GTID untuk offset awal saat menggunakan mode startup offset tertentu.
Tidak
STRING
Tidak ada
Saat menggunakan konfigurasi ini, scan.startup.mode harus diatur ke specific-offset. Contoh format set GTID:
24DA167-0C0C-11E8-8442-00059A3C7B00:1-19.scan.startup.timestamp-millis
Timestamp offset awal dalam milidetik saat menggunakan mode startup timestamp.
Tidak
LONG
Tidak ada
Saat menggunakan konfigurasi ini, scan.startup.mode harus diatur ke timestamp. Timestamp dalam milidetik.
PentingSaat menggunakan waktu tertentu, CDC MySQL mencoba membaca event awal setiap file Binlog untuk menentukan timestamp-nya, akhirnya menemukan file Binlog yang sesuai dengan waktu yang ditentukan. Pastikan file Binlog untuk timestamp yang ditentukan belum dibersihkan dari database dan dapat dibaca.
server-time-zone
Zona waktu sesi yang digunakan oleh database.
Tidak
STRING
Jika Anda tidak menentukan parameter ini, sistem menggunakan zona waktu lingkungan runtime pekerjaan Flink sebagai zona waktu server database, yaitu zona waktu wilayah yang Anda pilih.
Misalnya, Asia/Shanghai. Parameter ini mengontrol bagaimana tipe TIMESTAMP MySQL dikonversi ke tipe STRING. Untuk informasi selengkapnya, lihat Debezium temporal values.
debezium.min.row.count.to.stream.results
Ketika jumlah baris dalam tabel melebihi nilai ini, mode pembacaan batch digunakan.
Tidak
INTEGER
1.000
Flink membaca data dari tabel sumber MySQL dengan salah satu cara berikut:
Baca penuh: Membaca seluruh data tabel langsung ke memori. Ini cepat tetapi mengonsumsi memori yang sesuai. Jika tabel sumber sangat besar, ada risiko error OOM.
Baca batch: Membaca data dalam beberapa batch, mengambil sejumlah baris tertentu setiap kali hingga semua data dibaca. Ini menghindari risiko OOM untuk tabel besar tetapi relatif lebih lambat.
connect.timeout
Jangka waktu maksimum untuk menunggu koneksi ke server database MySQL mengalami timeout sebelum mencoba kembali.
Tidak
DURATION
30s
Tidak ada.
connect.max-retries
Jumlah maksimum percobaan ulang setelah koneksi gagal ke layanan database MySQL.
Tidak
INTEGER
3
Tidak ada.
connection.pool.size
Ukuran kolam koneksi database.
Tidak
INTEGER
20
Kolam koneksi database digunakan untuk menggunakan kembali koneksi, yang dapat mengurangi jumlah koneksi ke database.
jdbc.properties.*
Parameter koneksi kustom untuk URL JDBC.
Tidak
STRING
Tidak ada
Anda dapat meneruskan parameter koneksi kustom. Misalnya, untuk tidak menggunakan protokol SSL, Anda dapat mengonfigurasi 'jdbc.properties.useSSL' = 'false'.
Untuk informasi selengkapnya tentang parameter koneksi yang didukung, lihat MySQL Configuration Properties.
debezium.*
Parameter kustom untuk Debezium guna membaca Binlog.
Tidak
STRING
Tidak ada
Anda dapat meneruskan parameter Debezium kustom. Misalnya, gunakan 'debezium.event.deserialization.failure.handling.mode'='ignore' untuk menentukan logika penanganan error parsing.
heartbeat.interval
Interval waktu sumber memajukan offset Binlog menggunakan event heartbeat.
Tidak
DURATION
30s
Event heartbeat digunakan untuk memajukan offset Binlog di sumber, yang sangat berguna untuk tabel MySQL yang diperbarui secara perlahan. Untuk tabel tersebut, offset Binlog tidak maju secara otomatis. Event heartbeat dapat mendorong offset Binlog maju, mencegahnya kedaluwarsa. Offset Binlog yang kedaluwarsa akan menyebabkan pekerjaan gagal dan memerlukan restart tanpa status.
scan.incremental.snapshot.chunk.key-column
Menentukan kolom yang akan digunakan sebagai kunci pemisahan untuk sharding selama fase snapshot.
Lihat Keterangan.
STRING
Tidak ada
Diperlukan untuk tabel tanpa kunci utama. Kolom yang dipilih harus bertipe non-null (NOT NULL).
Opsional untuk tabel dengan primary key. Hanya satu kolom dari primary key yang dapat dipilih.
rds.region-id
ID wilayah instans Alibaba Cloud RDS for MySQL.
Wajib saat menggunakan fitur membaca log arsip dari OSS.
STRING
Tidak ada
Untuk informasi lebih lanjut tentang ID wilayah, lihat Wilayah dan zona.
rds.access-key-id
ID AccessKey akun Alibaba Cloud RDS for MySQL.
Wajib saat menggunakan fitur membaca log arsip dari OSS.
STRING
Tidak ada
Untuk informasi selengkapnya, lihat Bagaimana cara melihat Informasi AccessKey?.
PentingUntuk mencegah Informasi AccessKey Anda bocor, kami menyarankan agar Anda mengelola rahasia untuk menentukan ID AccessKey. Untuk informasi selengkapnya, lihat Manajemen variabel.
rds.access-key-secret
Rahasia AccessKey akun Alibaba Cloud RDS for MySQL.
Wajib saat menggunakan fitur membaca log arsip dari OSS.
STRING
Tidak ada
Untuk informasi selengkapnya, lihat Bagaimana cara melihat Informasi AccessKey?
PentingUntuk mencegah Informasi AccessKey Anda bocor, kami menyarankan agar Anda mengelola rahasia untuk menentukan Rahasia AccessKey. Untuk informasi selengkapnya, lihat Manajemen variabel.
rds.db-instance-id
ID instans Alibaba Cloud RDS for MySQL.
Wajib saat menggunakan fitur membaca log arsip dari OSS.
STRING
Tidak ada
Tidak ada.
rds.main-db-id
ID database utama instans Alibaba Cloud RDS for MySQL.
Tidak
STRING
Tidak ada
Untuk informasi selengkapnya tentang cara mendapatkan ID database utama, lihat Cadangan log RDS for MySQL.
Hanya didukung di mesin komputasi Flink VVR 8.0.7 dan versi yang lebih baru.
rds.download.timeout
Periode waktu habis untuk mengunduh satu log arsip dari OSS.
Tidak
DURASI
60s
Tidak ada.
rds.endpoint
Titik akhir untuk mendapatkan informasi Binlog OSS.
Tidak
STRING
Tidak ada
Untuk informasi lebih lanjut tentang nilai yang valid, lihat Endpoints.
Hanya didukung di mesin komputasi Flink VVR 8.0.8 dan versi yang lebih baru.
scan.incremental.close-idle-reader.enabled
Menentukan apakah akan menutup pembaca tidak aktif setelah fase Snapshot berakhir.
Tidak
BOOLEAN
false
Hanya didukung di mesin komputasi Flink VVR 8.0.1 dan versi yang lebih baru.
Agar konfigurasi ini berlaku, atur execution.checkpointing.checkpoints-after-tasks-finish.enabled ke true.
scan.read-changelog-as-append-only.enabled
Menentukan apakah aliran data changelog diubah menjadi aliran data append-only.
Tidak
BOOLEAN
false
Nilai yang valid:
true: Semua jenis pesan (termasuk INSERT, DELETE, UPDATE_BEFORE, dan UPDATE_AFTER) dikonversi menjadi pesan INSERT. Aktifkan ini hanya dalam skenario khusus, seperti ketika Anda perlu menyimpan pesan penghapusan dari tabel upstream.
false (default): Semua jenis pesan dikirim ke hilir apa adanya.
CatatanHanya didukung di mesin komputasi Flink VVR 8.0.8 dan versi yang lebih baru.
scan.only.deserialize.captured.tables.changelog.enabled
Pada fase inkremental, menentukan apakah hanya mendeserialisasi event perubahan dari tabel yang ditentukan.
Tidak
BOOLEAN
Nilai default adalah false di versi VVR 8.x.
Nilai default adalah true di VVR 11.1 dan versi yang lebih baru.
Nilai yang valid:
true: Hanya mendeserialisasi data perubahan dari tabel target, yang mempercepat pembacaan Binlog.
false (default): Mendeserialisasi data perubahan dari semua tabel.
CatatanHanya didukung di mesin komputasi Flink VVR 8.0.7 dan versi yang lebih baru.
Saat menggunakan ini di mesin komputasi Flink VVR 8.0.8 dan sebelumnya, Anda harus mengubah nama parameter menjadi debezium.scan.only.deserialize.captured.tables.changelog.enable.
scan.parse.online.schema.changes.enabled
Pada fase inkremental, menentukan apakah mencoba mengurai event DDL perubahan tanpa lock RDS.
Tidak
BOOLEAN
false
Nilai yang valid:
true: Mengurai event DDL perubahan tanpa lock RDS.
false (default): Tidak mengurai event DDL perubahan tanpa lock RDS.
Ini adalah fitur eksperimen. Kami menyarankan agar Anda mengambil snapshot pekerjaan Flink untuk pemulihan sebelum melakukan perubahan online tanpa lock.
CatatanHanya didukung di mesin komputasi Flink VVR 11.1 dan versi yang lebih baru.
scan.incremental.snapshot.backfill.skip
Menentukan apakah akan melewati backfill selama fase pembacaan snapshot.
Tidak
BOOLEAN
false
Nilai yang valid:
true: Melewati backfill selama fase pembacaan snapshot.
false (default): Tidak melewati backfill selama fase pembacaan snapshot.
Jika backfill dilewati, perubahan pada tabel selama fase snapshot akan dibaca pada fase inkremental berikutnya, bukan digabungkan ke dalam snapshot.
PentingMelewati backfill dapat menyebabkan ketidaksesuaian data karena perubahan yang terjadi selama fase snapshot mungkin diputar ulang. Hanya jaminan semantik setidaknya sekali yang diberikan.
CatatanHanya didukung di mesin komputasi Flink VVR 11.1 dan versi yang lebih baru.
scan.incremental.snapshot.unbounded-chunk-first.enabled
Menentukan apakah mendistribusikan chunk tak terbatas terlebih dahulu selama fase pembacaan snapshot.
Tidak
BOOELEAN
false
Nilai yang valid:
true: Mendistribusikan chunk tak terbatas terlebih dahulu selama fase pembacaan snapshot.
false (default): Tidak mendistribusikan chunk tak terbatas terlebih dahulu selama fase pembacaan snapshot.
Ini adalah fitur eksperimen. Mengaktifkannya dapat mengurangi risiko error OOM pada Pengelola Tugas saat menyinkronkan chunk terakhir selama fase snapshot. Kami menyarankan menambahkan ini sebelum startup pertama pekerjaan.
CatatanHanya didukung di mesin komputasi Flink VVR 11.1 dan versi yang lebih baru.
Khusus tabel dimensi
Parameter
Deskripsi
Diperlukan
Tipe data
Nilai default
Catatan
url
URL JDBC MySQL.
Tidak
STRING
Tidak ada
Format URL adalah:
jdbc:mysql://<endpoint>:<port>/<database_name>.lookup.max-retries
Jumlah maksimum percobaan ulang setelah pembacaan data gagal.
Tidak
BILANGAN BULAT
3
Hanya didukung di mesin komputasi Flink VVR 6.0.7 dan versi yang lebih baru.
lookup.cache.strategy
Kebijakan cache.
Tidak
STRING
Tidak ada
Mendukung tiga kebijakan cache: None, LRU, dan ALL. Untuk informasi lebih lanjut tentang nilai-nilai tersebut, lihat Informasi latar belakang.
CatatanSaat menggunakan kebijakan cache least recently used (LRU), Anda juga harus mengonfigurasi parameter lookup.cache.max-rows.
lookup.cache.max-rows
Jumlah maksimum baris yang di-cache.
Tidak
INTEGER
100.000
Jika Anda memilih kebijakan cache LRU, Anda harus menetapkan ukuran cache.
Jika Anda memilih kebijakan cache ALL, Anda tidak perlu menetapkan ukuran cache.
lookup.cache.ttl
Waktu hidup cache (TTL).
Tidak
DURASI
10 s
Konfigurasi lookup.cache.ttl bergantung pada lookup.cache.strategy:
Jika lookup.cache.strategy diatur ke None, Anda tidak perlu mengonfigurasi lookup.cache.ttl. Ini berarti cache tidak akan kedaluwarsa.
Jika lookup.cache.strategy diatur ke LRU, lookup.cache.ttl adalah waktu kedaluwarsa cache. Secara default, tidak kedaluwarsa.
Jika lookup.cache.strategy diatur ke ALL, lookup.cache.ttl adalah waktu reload cache. Secara default, tidak reload.
Gunakan format waktu, seperti 1min atau 10s.
lookup.max-join-rows
Jumlah maksimum hasil yang dikembalikan saat catatan dari tabel utama cocok dengan catatan di tabel dimensi.
Tidak
INTEGER
1.024
Tidak ada.
lookup.filter-push-down.enabled
Menentukan apakah mengaktifkan filter pushdown untuk tabel dimensi.
Tidak
BOOLEAN
false
Nilai yang valid:
true: Mengaktifkan filter pushdown untuk tabel dimensi. Saat memuat data dari tabel database MySQL, tabel dimensi memfilter data terlebih dahulu berdasarkan kondisi yang ditetapkan dalam pekerjaan SQL.
false (default): Menonaktifkan filter pushdown untuk tabel dimensi. Tabel dimensi memuat semua data saat memuat dari tabel database MySQL.
CatatanHanya didukung di mesin komputasi Flink VVR 8.0.7 dan versi yang lebih baru.
PentingPushdown tabel dimensi hanya boleh diaktifkan saat tabel Flink digunakan sebagai tabel dimensi. Tabel sumber MySQL tidak mendukung pengaktifan filter pushdown. Jika tabel Flink digunakan sebagai tabel sumber dan tabel dimensi, dan filter pushdown diaktifkan untuk tabel dimensi, Anda harus secara eksplisit mengatur item konfigurasi ini ke false menggunakan SQL Hints saat menggunakannya sebagai tabel sumber. Jika tidak, pekerjaan mungkin berjalan tidak normal.
Khusus tabel sink
Parameter
Deskripsi
Diperlukan
Tipe data
Nilai default
Catatan
url
URL JDBC MySQL.
Tidak
STRING
Tidak ada
Format URL adalah:
jdbc:mysql://<endpoint>:<port>/<database_name>.sink.max-retries
Jumlah maksimum percobaan ulang setelah penulisan data gagal.
Tidak
INTEGER
3
Tidak ada.
sink.buffer-flush.batch-size
Jumlah catatan yang ditulis dalam satu batch.
Tidak
INTEGER
4096
Tidak ada.
sink.buffer-flush.max-rows
Jumlah catatan data yang di-cache dalam memori.
Tidak
INTEGER
10.000
Parameter ini hanya berlaku setelah primary key ditentukan.
sink.buffer-flush.interval
Interval waktu untuk membersihkan cache. Jika data yang di-cache tidak memenuhi kondisi output setelah waktu tunggu yang ditentukan, sistem secara otomatis mengeluarkan semua data dalam cache.
Tidak
DURATION
1s
Tidak ada.
sink.ignore-delete
Menentukan apakah mengabaikan operasi Delete data.
Tidak
BOOLEAN
false
Saat aliran yang dihasilkan oleh Flink SQL mencakup catatan delete atau update-before, jika beberapa tugas output memperbarui bidang berbeda dari tabel yang sama secara bersamaan, ketidaksesuaian data dapat terjadi.
Misalnya, setelah catatan dihapus, tugas lain hanya memperbarui beberapa bidang. Bidang yang tidak diperbarui akan menjadi null atau nilai default-nya, menyebabkan error data.
Dengan menyetel sink.ignore-delete ke true, Anda dapat mengabaikan operasi DELETE dan UPDATE_BEFORE dari upstream untuk menghindari masalah tersebut.
CatatanUPDATE_BEFORE adalah bagian dari mekanisme retraksi Flink, digunakan untuk "menarik kembali" nilai lama dalam operasi pembaruan.
Saat ignoreDelete = true, semua catatan tipe DELETE dan UPDATE_BEFORE dilewati. Hanya catatan INSERT dan UPDATE_AFTER yang diproses.
sink.ignore-null-when-update
Saat memperbarui data, menentukan apakah memperbarui bidang yang sesuai ke null atau melewati pembaruan untuk bidang tersebut jika nilai bidang data yang masuk adalah null.
Tidak
BOOLEAN
false
Nilai yang valid:
true: Tidak memperbarui bidang. Parameter ini dapat diatur ke true hanya saat primary key ditetapkan untuk tabel Flink. Saat diatur ke true:
Untuk VVR 8.0.6 dan sebelumnya, penulisan data ke tabel sink tidak mendukung eksekusi batch.
Untuk VVR 8.0.7 dan yang lebih baru, penulisan data ke tabel sink mendukung eksekusi batch.
Meskipun penulisan batch dapat secara signifikan meningkatkan efisiensi penulisan dan throughput keseluruhan, hal ini dapat menyebabkan latensi data dan risiko error OOM. Oleh karena itu, buat pertimbangan berdasarkan skenario bisnis aktual Anda.
false: Memperbarui bidang menjadi null.
CatatanParameter ini hanya didukung di mesin komputasi waktu nyata VVR 8.0.5 dan versi yang lebih baru.
Pemetaan tipe data
Tabel sumber CDC
Tipe bidang MySQL CDC
Tipe bidang Flink
TINYINT
TINYINT
SMALLINT
SMALLINT
TINYINT UNSIGNED
TINYINT UNSIGNED ZEROFILL
INT
INT
MEDIUMINT
SMALLINT UNSIGNED
SMALLINT UNSIGNED ZEROFILL
BIGINT
BIGINT
INT UNSIGNED
INT UNSIGNED ZEROFILL
MEDIUMINT UNSIGNED
MEDIUMINT UNSIGNED ZEROFILL
BIGINT UNSIGNED
DECIMAL(20, 0)
BIGINT UNSIGNED ZEROFILL
SERIAL
FLOAT [UNSIGNED] [ZEROFILL]
FLOAT
DOUBLE [UNSIGNED] [ZEROFILL]
DOUBLE
DOUBLE PRECISION [UNSIGNED] [ZEROFILL]
REAL [UNSIGNED] [ZEROFILL]
NUMERIC(p, s) [UNSIGNED] [ZEROFILL]
DECIMAL(p, s)
DECIMAL(p, s) [UNSIGNED] [ZEROFILL]
BOOLEAN
BOOLEAN
TINYINT(1)
DATE
DATE
TIME [(p)]
TIME [(p)] [WITHOUT TIME ZONE]
DATETIME [(p)]
TIMESTAMP [(p)] [WITHOUT TIME ZONE]
TIMESTAMP [(p)]
TIMESTAMP [(p)]
TIMESTAMP [(p)] WITH LOCAL TIME ZONE
CHAR(n)
STRING
VARCHAR(n)
TEXT
BINARY
BYTES
VARBINARY
BLOB
PentingKami menyarankan agar Anda tidak menggunakan tipe TINYINT(1) di MySQL untuk menyimpan nilai selain 0 dan 1. Saat property-version diatur ke 0, tabel sumber CDC MySQL memetakan TINYINT(1) ke tipe BOOLEAN Flink secara default, yang dapat menyebabkan ketidakakuratan data. Untuk menggunakan TINYINT(1) guna menyimpan nilai selain 0 dan 1, lihat parameter konfigurasi catalog.table.treat-tinyint1-as-boolean.
Tabel dimensi dan sink
Tipe Bidang MySQL
Tipe bidang Flink
TINYINT
TINYINT
SMALLINT
SMALLINT
TINYINT UNSIGNED
INT
INT
MEDIUMINT
SMALLINT UNSIGNED
BIGINT
BIGINT
INT UNSIGNED
BIGINT UNSIGNED
DECIMAL(20, 0)
FLOAT
FLOAT
DOUBLE
DOUBLE
DOUBLE PRECISION
NUMERIC(p, s)
DECIMAL(p, s)
Catatandengan p <= 38.
DECIMAL(p, s)
BOOLEAN
BOOLEAN
TINYINT(1)
DATE
DATE
TIME [(p)]
TIME [(p)] [WITHOUT TIME ZONE]
DATETIME [(p)]
TIMESTAMP [(p)] [WITHOUT TIME ZONE]
TIMESTAMP [(p)]
CHAR(n)
CHAR(n)
VARCHAR(n)
VARCHAR(n)
BIT(n)
BINARY(⌈n/8⌉)
BINARY(n)
BINARY(n)
VARBINARY(N)
VARBINARY(N)
TINYTEXT
STRING
TEXT
MEDIUMTEXT
LONGTEXT
TINYBLOB
BYTES
PentingFlink hanya mendukung catatan tipe BLOB MySQL yang kurang dari atau sama dengan 2.147.483.647 (2^31 - 1) byte.
BLOB
MEDIUMBLOB
LONGBLOB
Ingesti Data
Anda dapat menggunakan konektor MySQL sebagai sumber data dalam pekerjaan YAML ingesti data.
Sintaksis
source:
type: mysql
name: Sumber MySQL
hostname: localhost
port: 3306
username: <username>
password: <password>
tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
server-id: 5401-5404
sink:
type: xxxParameter
Parameter | Deskripsi | Diperlukan | Tipe data | Nilai default | Catatan |
type | Tipe sumber data. | Ya | STRING | Tidak ada | Nilainya harus mysql. |
name | Nama sumber data. | Tidak | STRING | Tidak ada | Tidak ada. |
hostname | Alamat IP atau nama host database MySQL. | Ya | STRING | Tidak ada | Kami menyarankan agar Anda memasukkan alamat VPC. Catatan Jika database MySQL dan Realtime Compute for Apache Flink tidak berada dalam VPC yang sama, Anda harus membuat koneksi jaringan cross-VPC atau menggunakan titik akhir publik untuk akses. Untuk informasi selengkapnya, lihat Manajemen dan operasi penyimpanan dan Bagaimana kluster Flink yang sepenuhnya dikelola mengakses Internet?. |
username | Nama pengguna layanan database MySQL. | Ya | STRING | Tidak ada | Tidak ada. |
password | Kata sandi layanan database MySQL. | Ya | STRING | Tidak ada | Tidak ada. |
tables | Tabel data MySQL untuk disinkronkan. | Ya | STRING | Tidak ada |
Catatan
|
tables.exclude | Tabel yang akan dikecualikan dari sinkronisasi. | Tidak | STRING | Tidak ada |
Catatan Titik digunakan untuk memisahkan nama database dan nama tabel. Untuk menggunakan titik untuk mencocokkan karakter apa pun, Anda harus meng-escape-nya dengan backslash. Misalnya: db0.\.*, db1.user_table_[0-9]+, db[1-2].[app|web]order_\.*. |
port | Nomor port layanan database MySQL. | Tidak | INTEGER | 3306 | Tidak ada. |
schema-change.enabled | Menentukan apakah akan mengirim acara perubahan skema. | Tidak | BOOLEAN | true | Tidak ada. |
server-id | ID numerik atau rentang untuk klien database yang digunakan untuk sinkronisasi. | Tidak | STRING | Nilai acak antara 5400 dan 6400 dihasilkan. | ID ini harus unik secara global dalam kluster MySQL. Kami menyarankan agar Anda menetapkan ID yang berbeda untuk setiap pekerjaan yang terhubung ke database yang sama. Parameter ini juga mendukung rentang ID, seperti 5400-5408. Saat pembacaan inkremental diaktifkan, beberapa pembaca konkuren didukung. Dalam kasus ini, kami menyarankan agar Anda menetapkan rentang ID sehingga setiap pembaca konkuren menggunakan ID yang berbeda. |
jdbc.properties.* | Parameter koneksi kustom untuk URL JDBC. | Tidak | STRING | Tidak ada | Anda dapat meneruskan parameter koneksi kustom. Misalnya, untuk tidak menggunakan protokol SSL, Anda dapat mengonfigurasi 'jdbc.properties.useSSL' = 'false'. Untuk informasi selengkapnya tentang parameter koneksi yang didukung, lihat MySQL Configuration Properties. |
debezium.* | Parameter kustom untuk Debezium guna membaca Binlog. | Tidak | STRING | Tidak ada | Anda dapat meneruskan parameter Debezium kustom. Misalnya, gunakan 'debezium.event.deserialization.failure.handling.mode'='ignore' untuk menentukan logika penanganan error parsing. |
scan.incremental.snapshot.chunk.size | Ukuran setiap chunk (jumlah baris). | Tidak | INTEGER | 8096 | Tabel MySQL dibagi menjadi beberapa chunk untuk dibaca. Data dalam chunk di-cache dalam memori sebelum sepenuhnya dibaca. Jumlah baris per chunk yang lebih kecil menghasilkan total jumlah chunk yang lebih besar di tabel. Meskipun ini meningkatkan granularitas pemulihan kesalahan, ini dapat menyebabkan kesalahan OOM dan throughput keseluruhan yang lebih rendah. Oleh karena itu, Anda perlu menemukan keseimbangan dan menetapkan ukuran chunk yang masuk akal. |
scan.snapshot.fetch.size | Jumlah maksimum record yang diambil sekaligus saat membaca data penuh dari tabel. | Tidak | INTEGER | 1024 | Tidak ada. |
scan.startup.mode | Mode startup untuk konsumsi data. | Tidak | STRING | initial | Nilai valid:
Penting Untuk mode startup earliest-offset, specific-offset, dan timestamp, jika skema tabel pada waktu startup berbeda dari skema pada waktu offset awal yang ditentukan, pekerjaan akan gagal. Dengan kata lain, saat menggunakan ketiga mode ini, Anda harus memastikan bahwa skema tabel tidak berubah antara posisi konsumsi Binlog yang ditentukan dan waktu startup pekerjaan. |
scan.startup.specific-offset.file | Nama file Binlog untuk offset awal saat menggunakan mode startup offset tertentu. | Tidak | STRING | Tidak ada | Saat menggunakan konfigurasi ini, scan.startup.mode harus diatur ke specific-offset. Contoh format nama file: |
scan.startup.specific-offset.pos | Offset dalam file Binlog yang ditentukan untuk offset awal saat menggunakan mode startup offset tertentu. | Tidak | INTEGER | Tidak ada | Saat menggunakan konfigurasi ini, scan.startup.mode harus diatur ke specific-offset. |
scan.startup.specific-offset.gtid-set | Set GTID untuk offset awal saat menggunakan mode startup offset tertentu. | Tidak | STRING | Tidak ada | Saat menggunakan konfigurasi ini, scan.startup.mode harus diatur ke specific-offset. Contoh format set GTID: |
scan.startup.timestamp-millis | Timestamp offset awal dalam milidetik saat menggunakan mode startup timestamp. | Tidak | PANJANG | Tidak ada | Saat menggunakan konfigurasi ini, scan.startup.mode harus diatur ke timestamp. Timestamp dalam milidetik. Penting Saat menggunakan waktu tertentu, CDC MySQL mencoba membaca event awal setiap file Binlog untuk menentukan timestamp-nya, akhirnya menemukan file Binlog yang sesuai dengan waktu yang ditentukan. Pastikan file Binlog untuk timestamp yang ditentukan belum dibersihkan dari database dan dapat dibaca. |
server-time-zone | Zona waktu sesi yang digunakan oleh database. | Tidak | STRING | Jika Anda tidak menentukan parameter ini, sistem menggunakan zona waktu lingkungan runtime pekerjaan Flink sebagai zona waktu server database, yaitu zona waktu wilayah yang Anda pilih. | Misalnya, Asia/Shanghai. Parameter ini mengontrol bagaimana tipe TIMESTAMP MySQL dikonversi ke tipe STRING. Untuk informasi selengkapnya, lihat Debezium temporal values. |
scan.startup.specific-offset.skip-events | Jumlah event Binlog yang dilewati saat membaca dari offset tertentu. | Tidak | INTEGER | Tidak ada | Saat menggunakan konfigurasi ini, scan.startup.mode harus diatur ke specific-offset. |
scan.startup.specific-offset.skip-rows | Jumlah perubahan baris yang dilewati saat membaca dari offset tertentu. Satu event Binlog mungkin sesuai dengan beberapa perubahan baris. | Tidak | INTEGER | Tidak ada | Saat menggunakan konfigurasi ini, scan.startup.mode harus diatur ke specific-offset. |
connect.timeout | Waktu maksimum untuk menunggu koneksi ke server database MySQL habis sebelum mencoba lagi. | Tidak | DURATION | 30s | Tidak ada. |
connect.max-retries | Jumlah maksimum percobaan ulang setelah koneksi ke layanan database MySQL gagal. | Tidak | INTEGER | 3 | Tidak ada. |
connection.pool.size | Ukuran kolam koneksi database. | Tidak | INTEGER | 20 | Kolam koneksi database digunakan untuk menggunakan kembali koneksi, yang dapat mengurangi jumlah koneksi database. |
heartbeat.interval | Interval waktu sumber memajukan offset Binlog menggunakan event heartbeat. | Tidak | DURATION | 30s | Event heartbeat digunakan untuk memajukan offset Binlog di sumber, yang sangat berguna untuk tabel MySQL yang diperbarui secara perlahan. Untuk tabel tersebut, offset Binlog tidak maju secara otomatis. Event heartbeat dapat mendorong offset Binlog maju, mencegahnya kedaluwarsa. Offset Binlog yang kedaluwarsa akan menyebabkan pekerjaan gagal dan memerlukan restart tanpa status. |
scan.incremental.snapshot.chunk.key-column | Menentukan kolom yang akan digunakan sebagai kunci pemisahan untuk sharding selama fase snapshot. | Nomor. | STRING | Tidak ada | Hanya satu kolom dari primary key yang dapat dipilih. |
rds.region-id | ID wilayah instans Alibaba Cloud RDS for MySQL. | Wajib saat menggunakan fitur membaca log arsip dari OSS. | STRING | Tidak ada | Untuk informasi lebih lanjut tentang ID wilayah, lihat Wilayah dan zona. |
rds.access-key-id | ID AccessKey akun Alibaba Cloud RDS for MySQL. | Wajib saat menggunakan fitur membaca log arsip dari OSS. | STRING | Tidak ada | Untuk informasi selengkapnya, lihat Bagaimana cara melihat Informasi AccessKey? Penting Untuk mencegah Informasi AccessKey Anda bocor, kami menyarankan agar Anda mengelola rahasia untuk menentukan ID AccessKey. Untuk informasi selengkapnya, lihat Manajemen variabel. |
rds.access-key-secret | Rahasia AccessKey akun Alibaba Cloud RDS for MySQL. | Wajib saat menggunakan fitur membaca log arsip dari OSS. | STRING | Tidak ada | Untuk informasi selengkapnya, lihat Bagaimana cara melihat Informasi AccessKey? Penting Untuk mencegah Informasi AccessKey Anda bocor, kami menyarankan agar Anda mengelola rahasia untuk menentukan Rahasia AccessKey. Untuk informasi selengkapnya, lihat Manajemen variabel. |
rds.db-instance-id | ID instans Alibaba Cloud RDS for MySQL. | Wajib saat menggunakan fitur membaca log arsip dari OSS. | STRING | Tidak ada | Tidak ada. |
rds.main-db-id | ID database utama instans Alibaba Cloud RDS for MySQL. | Tidak | STRING | Tidak ada | Untuk informasi selengkapnya tentang cara mendapatkan ID database utama, lihat Cadangan log RDS for MySQL. |
rds.download.timeout | Periode timeout untuk mengunduh satu log arsip dari OSS. | Tidak | DURATION | 60s | Tidak ada. |
rds.endpoint | Titik akhir untuk mendapatkan informasi Binlog OSS. | Tidak | STRING | Tidak ada | Untuk informasi lebih lanjut tentang nilai valid, lihat Titik akhir. |
rds.binlog-directory-prefix | Awalan direktori untuk menyimpan file Binlog. | Tidak | STRING | rds-binlog- | Tidak ada. |
rds.use-intranet-link | Menentukan apakah menggunakan tautan jaringan internal untuk mengunduh file Binlog. | Tidak | BOOLEAN | true | Tidak ada. |
rds.binlog-directories-parent-path | Jalur absolut direktori induk tempat file Binlog disimpan. | Tidak | STRING | Tidak ada | Tidak ada. |
chunk-meta.group.size | Ukuran metadata chunk. | Tidak | INTEGER | 1000 | Jika metadata lebih besar dari nilai ini, metadata akan dibagi menjadi beberapa bagian untuk transmisi. |
chunk-key.even-distribution.factor.lower-bound | Batas bawah faktor distribusi chunk untuk sharding merata. | Tidak | DOUBLE | 0.05 | Jika faktor distribusi kurang dari nilai ini, sharding tidak merata digunakan. Faktor distribusi chunk = (MAX(chunk-key) - MIN(chunk-key) + 1) / Jumlah total baris data. |
chunk-key.even-distribution.factor.upper-bound | Batas atas faktor distribusi chunk untuk sharding merata. | Tidak | DOUBLE | 1000.0 | Jika faktor distribusi lebih besar dari nilai ini, sharding tidak merata digunakan. Faktor distribusi chunk = (MAX(chunk-key) - MIN(chunk-key) + 1) / Jumlah total baris data. |
scan.incremental.close-idle-reader.enabled | Menentukan apakah akan menutup pembaca tidak aktif setelah fase Snapshot berakhir. | Tidak | BOOLEAN | false | Agar konfigurasi ini berlaku, atur |
scan.only.deserialize.captured.tables.changelog.enabled | Pada fase inkremental, menentukan apakah hanya mendeserialisasi event perubahan dari tabel yang ditentukan. | Tidak | BOOLEAN |
| Nilai valid:
|
scan.parallel-deserialize-changelog.enabled | Pada fase inkremental, menentukan apakah menggunakan beberapa thread untuk mengurai event perubahan. | Tidak | BOOLEAN | false | Nilai valid:
Catatan Hanya didukung di mesin komputasi Flink VVR 8.0.11 dan versi yang lebih baru. |
scan.parallel-deserialize-changelog.handler.size | Jumlah handler acara saat menggunakan beberapa thread untuk mengurai acara perubahan. | Tidak | INTEGER | 2 | Catatan Hanya didukung di mesin komputasi Flink VVR 8.0.11 dan versi yang lebih baru. |
metadata-column.include-list | Kolom metadata yang diteruskan ke sink hilir. | Tidak | STRING | Tidak ada | Metadata yang tersedia meliputi Catatan Konektor YAML CDC MySQL tidak memerlukan atau mendukung penambahan kolom metadata Penting Kolom metadata |
scan.newly-added-table.enabled | Saat memulai ulang dari checkpoint, menentukan apakah menyinkronkan tabel yang baru ditambahkan yang tidak cocok selama startup sebelumnya atau menghapus tabel yang saat ini tidak cocok yang disimpan dalam status. | Tidak | BOOLEAN | false | Berlaku saat memulai ulang dari checkpoint atau titik simpan. |
scan.binlog.newly-added-table.enabled | Pada fase inkremental, menentukan apakah mengirim data dari tabel yang baru ditambahkan yang cocok. | Tidak | BOOLEAN | false | Tidak dapat diaktifkan secara bersamaan dengan |
scan.incremental.snapshot.chunk.key-column | Menentukan kolom untuk tabel tertentu yang akan digunakan sebagai kunci pemisahan untuk sharding selama fase snapshot. | Tidak | STRING | Tidak ada |
|
scan.parse.online.schema.changes.enabled | Pada fase inkremental, menentukan apakah mencoba mengurai event DDL perubahan tanpa lock RDS. | Tidak | BOOLEAN | false | Nilai valid:
Ini adalah fitur eksperimen. Kami menyarankan agar Anda mengambil snapshot pekerjaan Flink untuk pemulihan sebelum melakukan perubahan online tanpa lock. Catatan Hanya didukung di mesin komputasi Flink VVR 11.0 dan versi yang lebih baru. |
scan.incremental.snapshot.backfill.skip | Menentukan apakah melewati backfill selama fase pembacaan snapshot. | Tidak | BOOLEAN | false | Nilai valid:
Jika backfill dilewati, perubahan pada tabel selama fase snapshot akan dibaca pada fase inkremental berikutnya, bukan digabungkan ke dalam snapshot. Penting Melewati backfill dapat menyebabkan ketidaksesuaian data karena perubahan yang terjadi selama fase snapshot mungkin diputar ulang. Hanya semantik setidaknya-sekali yang dijamin. Catatan Hanya didukung di mesin komputasi Flink VVR 11.1 dan versi yang lebih baru. |
treat-tinyint1-as-boolean.enabled | Menentukan apakah memperlakukan tipe TINYINT(1) sebagai tipe Boolean. | Tidak | BOOLEAN | true | Nilai yang valid:
|
treat-timestamp-as-datetime-enabled | Menentukan apakah memperlakukan tipe TIMESTAMP sebagai tipe DATETIME. | Tidak | BOOLEAN | false | Nilai yang valid:
Tipe TIMESTAMP MySQL menyimpan waktu UTC dan dipengaruhi oleh zona waktu. Tipe DATETIME MySQL menyimpan waktu literal dan tidak dipengaruhi oleh zona waktu. Saat diaktifkan, ini mengonversi data tipe TIMESTAMP MySQL ke tipe DATETIME berdasarkan server-time-zone. |
include-comments.enabled | Menentukan apakah menyinkronkan komentar tabel dan bidang. | Tidak | BOOELEAN | false | Nilai valid:
Mengaktifkan ini meningkatkan penggunaan memori pekerjaan. |
scan.incremental.snapshot.unbounded-chunk-first.enabled | Menentukan apakah mendistribusikan chunk tak terbatas terlebih dahulu selama fase pembacaan snapshot. | Tidak | BOOELEAN | false | Parameter ini mendukung nilai-nilai berikut:
Ini adalah fitur eksperimental. Mengaktifkan fitur ini mengurangi risiko error kehabisan memori (OOM) saat Pengelola Tugas menyinkronkan shard terakhir selama fase snapshot. Tambahkan parameter ini sebelum pekerjaan dimulai untuk pertama kalinya. Catatan Hanya didukung di Flink compute engine Ververica Runtime (VVR) 11.1 dan versi yang lebih baru. |
Pemetaan tipe
Tabel berikut menunjukkan pemetaan tipe data untuk Ingesti Data.
Tipe bidang CDC MySQL | Tipe bidang CDC |
TINYINT(n) | TINYINT |
SMALLINT | SMALLINT |
TINYINT UNSIGNED | |
TINYINT UNSIGNED ZEROFILL | |
YEAR | |
INT | INT |
MEDIUMINT | |
MEDIUMINT UNSIGNED | |
MEDIUMINT UNSIGNED ZEROFILL | |
SMALLINT UNSIGNED | |
SMALLINT UNSIGNED ZEROFILL | |
BIGINT | BIGINT |
INT UNSIGNED | |
INT UNSIGNED ZEROFILL | |
BIGINT UNSIGNED | DECIMAL(20, 0) |
BIGINT UNSIGNED ZEROFILL | |
SERIAL | |
FLOAT [UNSIGNED] [ZEROFILL] | FLOAT |
DOUBLE [UNSIGNED] [ZEROFILL] | DOUBLE |
DOUBLE PRECISION [UNSIGNED] [ZEROFILL] | |
REAL [UNSIGNED] [ZEROFILL] | |
NUMERIC(p, s) [UNSIGNED] [ZEROFILL] where p <= 38 | DECIMAL(p, s) |
DECIMAL(p, s) [UNSIGNED] [ZEROFILL] where p <= 38 | |
FIXED(p, s) [UNSIGNED] [ZEROFILL] where p <= 38 | |
BOOLEAN | BOOLEAN |
BIT(1) | |
TINYINT(1) | |
DATE | DATE |
TIME [(p)] | WAKTU [(p)] |
DATETIME [(p)] | TIMESTAMP [(p)] |
TIMESTAMP [(p)] | Berdasarkan nilai parameter
|
CHAR(n) | CHAR(n) |
VARCHAR(n) | VARCHAR(n) |
BIT(n) | BINARY(⌈(n + 7) / 8⌉) |
BINARY(n) | BINARY(n) |
VARBINARY(N) | VARBINARY(N) |
NUMERIC(p, s) [UNSIGNED] [ZEROFILL] where 38 < p <= 65 | STRING Catatan Dalam MySQL, tipe data desimal memiliki presisi hingga 65, tetapi dalam Flink, presisi tipe data desimal dibatasi hingga 38. Oleh karena itu, jika Anda mendefinisikan kolom desimal dengan presisi lebih dari 38, Anda harus memetakannya ke string untuk menghindari kehilangan presisi. |
DECIMAL(p, s) [UNSIGNED] [ZEROFILL] where 38 < p <= 65 | |
FIXED(p, s) [UNSIGNED] [ZEROFILL] where 38 < p <= 65 | |
TINYTEXT | STRING |
TEXT | |
MEDIUMTEXT | |
LONGTEXT | |
ENUM | |
JSON | STRING Catatan Tipe data JSON akan dikonversi ke string berformat JSON dalam Flink. |
GEOMETRY | STRING Catatan Tipe data spasial dalam MySQL akan dikonversi ke string dengan format JSON tetap. Untuk informasi selengkapnya, lihat Pemetaan Tipe Data Spasial MySQL. |
POINT | |
LINESTRING | |
POLYGON | |
MULTIPOINT | |
MULTILINESTRING | |
MULTIPOLYGON | |
GEOMETRYCOLLECTION | |
TINYBLOB | BYTES Catatan Untuk tipe data BLOB dalam MySQL, hanya blob dengan panjang tidak lebih dari 2.147.483.647 (2**31-1) yang didukung. |
BLOB | |
MEDIUMBLOB | |
LONGBLOB |
Contoh penggunaan
Tabel sumber CDC
CREATE TEMPORARY TABLE mysqlcdc_source ( order_id INT, order_date TIMESTAMP(0), customer_name STRING, price DECIMAL(10, 5), product_id INT, order_status BOOLEAN, PRIMARY KEY(order_id) NOT ENFORCED ) WITH ( 'connector' = 'mysql', 'hostname' = '<yourHostname>', 'port' = '3306', 'username' = '<yourUsername>', 'password' = '<yourPassword>', 'database-name' = '<yourDatabaseName>', 'table-name' = '<yourTableName>' ); CREATE TEMPORARY TABLE blackhole_sink( order_id INT, customer_name STRING ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT order_id, customer_name FROM mysqlcdc_source;Tabel dimensi
CREATE TEMPORARY TABLE datagen_source( a INT, b BIGINT, c STRING, `proctime` AS PROCTIME() ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE mysql_dim ( a INT, b VARCHAR, c VARCHAR ) WITH ( 'connector' = 'mysql', 'hostname' = '<yourHostname>', 'port' = '3306', 'username' = '<yourUsername>', 'password' = '<yourPassword>', 'database-name' = '<yourDatabaseName>', 'table-name' = '<yourTableName>' ); CREATE TEMPORARY TABLE blackhole_sink( a INT, b STRING ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT T.a, H.b FROM datagen_source AS T JOIN mysql_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H ON T.a = H.a;Tabel sink
CREATE TEMPORARY TABLE datagen_source ( `name` VARCHAR, `age` INT ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE mysql_sink ( `name` VARCHAR, `age` INT ) WITH ( 'connector' = 'mysql', 'hostname' = '<yourHostname>', 'port' = '3306', 'username' = '<yourUsername>', 'password' = '<yourPassword>', 'database-name' = '<yourDatabaseName>', 'table-name' = '<yourTableName>' ); INSERT INTO mysql_sink SELECT * FROM datagen_source;Sumber data Ingesti Data
source: type: mysql name: MySQL Source hostname: ${mysql.hostname} port: ${mysql.port} username: ${mysql.username} password: ${mysql.password} tables: ${mysql.source.table} server-id: 7601-7604 sink: type: values name: Values Sink print.enabled: true sink.print.logger: true
Tentang tabel sumber CDC MySQL
Prinsip implementasi
Saat tabel sumber CDC MySQL dimulai, tabel tersebut memindai seluruh tabel, membaginya menjadi beberapa chunk berdasarkan primary key, dan mencatat offset log biner saat ini. Kemudian, menggunakan algoritma snapshot inkremental untuk membaca data dari setiap chunk menggunakan pernyataan SELECT. Pekerjaan secara berkala melakukan checkpoint untuk mencatat chunk yang telah selesai. Jika terjadi failover, pekerjaan hanya perlu membaca chunk yang belum selesai. Setelah semua chunk dibaca, pekerjaan mulai membaca catatan perubahan inkremental dari offset log biner yang sebelumnya dicatat. Pekerjaan Flink terus melakukan checkpoint berkala dan mencatat offset log biner. Jika pekerjaan gagal, pekerjaan melanjutkan pemrosesan dari offset log biner terakhir yang dicatat. Proses ini mencapai semantik tepat-sekali.
Untuk penjelasan lebih rinci tentang algoritma snapshot inkremental, lihat Konektor CDC MySQL.
Metadata
Metadata berguna dalam skenario di mana database dan tabel yang di-shard digabung dan disinkronkan. Hal ini karena setelah penggabungan, bisnis sering ingin membedakan database dan tabel sumber untuk setiap data. Kolom metadata dapat digunakan untuk mengakses informasi nama database dan tabel dari tabel sumber. Oleh karena itu, Anda dapat dengan mudah menggabungkan beberapa tabel yang di-shard menjadi satu tabel tujuan menggunakan kolom metadata.
Sumber CDC MySQL mendukung sintaks kolom metadata. Anda dapat mengakses metadata berikut menggunakan kolom metadata.
Kunci metadata
Tipe metadata
Deskripsi
database_name
STRING NOT NULL
Nama database yang berisi baris tersebut.
table_name
STRING NOT NULL
Nama tabel yang berisi baris tersebut.
op_ts
TIMESTAMP_LTZ(3) NOT NULL
Waktu perubahan dilakukan di database. Jika catatan berasal dari data historis tabel dan bukan dari Binlog, nilai ini selalu 0.
op_type
STRING NOT NULL
Jenis perubahan pada baris tersebut.
+I: Pesan INSERT
-D: Pesan DELETE
-U: Pesan UPDATE_BEFORE
+U: Pesan UPDATE_AFTER
CatatanHanya didukung di Realtime Compute for Apache Flink VVR 8.0.7 dan versi yang lebih baru.
query_log
STRING NOT NULL
Catatan log kueri MySQL yang sesuai dengan baris yang dibaca.
CatatanMySQL perlu mengaktifkan parameter binlog_rows_query_log_events untuk mencatat log kueri.
Contoh kode berikut menunjukkan cara menggabungkan dan menyinkronkan beberapa tabel pesanan dari beberapa database yang di-shard dalam instans MySQL ke tabel holo_orders di instans Hologres hilir.
CREATE TEMPORARY TABLE mysql_orders ( db_name STRING METADATA FROM 'database_name' VIRTUAL, -- Membaca nama database. table_name STRING METADATA FROM 'table_name' VIRTUAL, -- Membaca nama tabel. operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL, -- Membaca timestamp perubahan. op_type STRING METADATA FROM 'op_type' VIRTUAL, -- Membaca tipe perubahan. order_id INT, order_date TIMESTAMP(0), customer_name STRING, price DECIMAL(10, 5), product_id INT, order_status BOOLEAN, PRIMARY KEY(order_id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'localhost', 'port' = '3306', 'username' = 'flinkuser', 'password' = 'flinkpw', 'database-name' = 'mydb_.*', -- Ekspresi reguler untuk mencocokkan beberapa database yang di-shard. 'table-name' = 'orders_.*' -- Ekspresi reguler untuk mencocokkan beberapa tabel yang di-shard. ); INSERT INTO holo_orders SELECT * FROM mysql_orders;Berdasarkan kode di atas, jika parameter scan.read-changelog-as-append-only.enabled diatur ke true dalam klausa WITH, hasil output bervariasi berdasarkan pengaturan primary key tabel hilir:
Jika primary key tabel hilir adalah order_id, hasil output hanya berisi perubahan terakhir untuk setiap primary key di tabel hulu. Misalnya, untuk primary key yang perubahan terakhirnya adalah operasi hapus, Anda dapat melihat catatan di tabel hilir dengan primary key yang sama dan op_type -D.
Jika primary key tabel hilir adalah gabungan dari order_id, operation_ts, dan op_type, hasil output berisi riwayat perubahan lengkap untuk setiap primary key di tabel hulu.
Dukungan ekspresi reguler
Tabel sumber CDC MySQL mendukung penggunaan ekspresi reguler dalam nama tabel atau nama database untuk mencocokkan beberapa tabel atau database. Contoh kode berikut menunjukkan cara menentukan beberapa tabel menggunakan ekspresi reguler.
CREATE TABLE products ( db_name STRING METADATA FROM 'database_name' VIRTUAL, table_name STRING METADATA FROM 'table_name' VIRTUAL, operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL, order_id INT, order_date TIMESTAMP(0), customer_name STRING, price DECIMAL(10, 5), product_id INT, order_status BOOLEAN, PRIMARY KEY(order_id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'localhost', 'port' = '3306', 'username' = 'root', 'password' = '123456', 'database-name' = '(^(test).*|^(tpc).*|txc|.*[p$]|t{2})', -- Ekspresi reguler untuk mencocokkan beberapa database. 'table-name' = '(t[5-8]|tt)' -- Ekspresi reguler untuk mencocokkan beberapa tabel. );Tabel berikut menjelaskan ekspresi reguler dalam contoh di atas:
^(test).* adalah contoh pencocokan awalan. Ekspresi ini dapat mencocokkan nama database yang dimulai dengan "test", seperti "test1" dan "test2".
.*[p$] adalah contoh pencocokan akhiran. Ekspresi ini dapat mencocokkan nama database yang diakhiri dengan "p", seperti "cdcp" dan "edcp".
txc adalah pencocokan spesifik. Ekspresi ini dapat mencocokkan nama database yang persis "txc".
Saat mencocokkan nama tabel yang memenuhi syarat sepenuhnya, CDC MySQL menggunakan nama database dan nama tabel untuk mengidentifikasi tabel secara unik. Polanya adalah database-name.table-name. Misalnya, pola pencocokan (^(test).*|^(tpc).*|txc|.*[p$]|t{2}).(t[ 5-8]|tt) dapat mencocokkan tabel dalam database, seperti txc.tt dan test2.test5.
PentingDalam konfigurasi pekerjaan SQL, table-name dan database-name tidak mendukung penggunaan koma (,) untuk menentukan beberapa tabel atau database.
Untuk mencocokkan beberapa tabel atau menggunakan beberapa ekspresi reguler, Anda dapat menghubungkannya dengan bilah vertikal (|) dan membungkusnya dalam tanda kurung. Misalnya, untuk membaca tabel "user" dan "product", Anda dapat mengonfigurasi table-name sebagai
(user|product).Jika ekspresi reguler berisi koma, Anda harus menulis ulang menggunakan operator bilah vertikal (|). Misalnya, ekspresi reguler
mytable_\d{1, 2}harus ditulis ulang sebagai(mytable_\d{1}|mytable_\d{2})yang setara untuk menghindari penggunaan koma.
Kontrol konkurensi
Konektor MySQL mendukung pembacaan data penuh dengan beberapa thread konkuren, yang dapat meningkatkan efisiensi pemuatan data. Bersamaan dengan fitur penyetelan otomatis Autopilot di konsol Realtime Compute for Apache Flink, konektor dapat secara otomatis melakukan skala-masuk setelah pembacaan multi-thread selesai dan selama fase inkremental untuk menghemat sumber daya komputasi.
Di Konsol pengembangan Realtime Compute for Apache Flink, Anda dapat mengatur konkurensi pekerjaan dalam mode Dasar atau Ahli pada halaman Konfigurasi Sumber Daya. Perbedaannya adalah sebagai berikut:
Konkurensi yang diatur dalam mode Dasar adalah konkurensi global untuk seluruh pekerjaan.

Mode Ahli mendukung pengaturan konkurensi untuk VERTEX tertentu sesuai kebutuhan.

Untuk informasi selengkapnya tentang konfigurasi sumber daya, lihat Konfigurasi informasi penerapan untuk pekerjaan.
PentingTerlepas dari apakah Anda menggunakan mode Dasar atau Ahli, saat Anda mengatur konkurensi, rentang server-id yang dideklarasikan dalam tabel harus lebih besar dari atau sama dengan konkurensi pekerjaan. Misalnya, jika rentang server-id adalah 5404-5412, terdapat 8 server-id unik. Dalam kasus ini, pekerjaan dapat memiliki maksimal 8 thread konkuren. Pekerjaan berbeda untuk instans MySQL yang sama tidak boleh memiliki rentang server-id yang tumpang tindih. Artinya, setiap pekerjaan harus dikonfigurasi secara eksplisit dengan server-id yang berbeda.
Skala-masuk otomatis Autopilot
Fase data penuh mengakumulasi banyak data historis. Untuk meningkatkan efisiensi pembacaan, data historis biasanya dibaca secara konkuren. Dalam fase log biner inkremental, pembacaan konkuren tunggal biasanya cukup karena volume data log biner kecil dan urutan global harus dipastikan. Persyaratan sumber daya yang berbeda dari fase penuh dan inkremental dapat diseimbangkan secara otomatis oleh fitur penyetelan otomatis untuk mencapai kinerja tinggi dan efisiensi sumber daya.
Penyetelan otomatis memantau trafik setiap tugas di Sumber CDC MySQL. Saat pekerjaan memasuki fase log biner, jika hanya satu tugas yang bertanggung jawab untuk membaca log biner dan tugas lainnya idle, penyetelan otomatis secara otomatis mengurangi jumlah CU dan konkurensi Sumber. Untuk mengaktifkan penyetelan otomatis, Anda dapat mengatur mode penyetelan otomatis ke Aktif di halaman O&M pekerjaan.
CatatanInterval pemicu minimum default untuk mengurangi tingkat paralelisme adalah 24 jam. Untuk informasi selengkapnya tentang parameter dan detail penyetelan otomatis, lihat Konfigurasi penyetelan otomatis.
Mode startup
Anda dapat menggunakan opsi konfigurasi scan.startup.mode untuk menentukan mode startup untuk tabel sumber CDC MySQL. Opsi yang tersedia adalah sebagai berikut:
initial (default): Saat startup pertama, konektor melakukan pembacaan penuh tabel database dan kemudian beralih ke mode inkremental untuk membaca log biner.
earliest-offset: Melewati fase snapshot dan mulai membaca dari offset log biner yang tersedia paling awal.
latest-offset: Melewati fase snapshot dan mulai membaca dari akhir log biner. Dalam mode ini, tabel sumber hanya dapat membaca perubahan data yang terjadi setelah pekerjaan dimulai.
specific-offset: Melewati fase snapshot dan mulai membaca dari offset log biner tertentu. Offset dapat ditentukan oleh nama file log biner dan posisi, atau oleh set GTID.
timestamp: Melewati fase snapshot dan mulai membaca event log biner dari timestamp tertentu.
Contoh penggunaan:
CREATE TABLE mysql_source (...) WITH ( 'connector' = 'mysql-cdc', 'scan.startup.mode' = 'earliest-offset', -- Mulai dari offset paling awal. 'scan.startup.mode' = 'latest-offset', -- Mulai dari offset terbaru. 'scan.startup.mode' = 'specific-offset', -- Mulai dari offset tertentu. 'scan.startup.mode' = 'timestamp', -- Mulai dari timestamp tertentu. 'scan.startup.specific-offset.file' = 'mysql-bin.000003', -- Tentukan nama file Binlog dalam mode specific-offset. 'scan.startup.specific-offset.pos' = '4', -- Tentukan posisi Binlog dalam mode specific-offset. 'scan.startup.specific-offset.gtid-set' = '24DA167-0C0C-11E8-8442-00059A3C7B00:1-19', -- Tentukan set GTID dalam mode specific-offset. 'scan.startup.timestamp-millis' = '1667232000000' -- Tentukan timestamp startup dalam mode timestamp. ... )PentingSumber MySQL mencetak offset saat ini ke log pada level INFO selama checkpoint. Awalan log adalah
Binlog offset on checkpoint {checkpoint-id}. Log ini dapat membantu Anda memulai pekerjaan dari offset checkpoint tertentu.Jika tabel yang dibaca telah mengalami perubahan skema, memulai dari offset paling awal (earliest-offset), offset tertentu (specific-offset), atau timestamp (timestamp) dapat menyebabkan error. Hal ini karena pembaca Debezium secara internal menyimpan skema tabel terbaru, dan data sebelumnya dengan skema yang tidak cocok tidak dapat diurai dengan benar.
Tentang tabel sumber CDC tanpa primary key
Jika Anda menggunakan tabel tanpa primary key, Anda harus mengatur scan.incremental.snapshot.chunk.key-column. Anda hanya dapat memilih bidang non-null.
Semantik pemrosesan untuk tabel sumber CDC tanpa primary key ditentukan oleh perilaku kolom yang ditentukan oleh scan.incremental.snapshot.chunk.key-column:
Jika kolom yang ditentukan tidak diperbarui, semantik tepat-sekali dapat dijamin.
Jika kolom yang ditentukan diperbarui, hanya semantik setidaknya-sekali yang dapat dijamin. Namun, Anda dapat memastikan kebenaran data dengan menggabungkan kolom tersebut dengan sink hilir yang memiliki primary key dan menggunakan operasi idempoten.
Membaca log cadangan Alibaba Cloud RDS for MySQL
Tabel sumber CDC MySQL mendukung pembacaan log cadangan dari Alibaba Cloud RDS for MySQL. Ini berguna dalam skenario di mana fase data penuh memakan waktu lama dan file log biner lokal telah dibersihkan secara otomatis, tetapi file cadangan yang diunggah secara otomatis atau manual masih ada.
Contoh penggunaan:
CREATE TABLE mysql_source (...) WITH ( 'connector' = 'mysql-cdc', 'rds.region-id' = 'cn-beijing', 'rds.access-key-id' = 'xxxxxxxxx', 'rds.access-key-secret' = 'xxxxxxxxx', 'rds.db-instance-id' = 'rm-xxxxxxxxxxxxxxxxx', 'rds.main-db-id' = '12345678', 'rds.download.timeout' = '60s' ... )Mengaktifkan penggunaan ulang Sumber CDC
Pekerjaan yang menggunakan beberapa tabel sumber CDC MySQL dari instans yang sama akan menjalankan beberapa klien Binlog, sehingga meningkatkan beban pada database. Untuk informasi selengkapnya, lihat FAQ CDC MySQL.
Solusi
Realtime Compute for Apache Flink VVR 8.0.7 dan versi yang lebih baru mendukung penggunaan ulang Sumber CDC MySQL. Penggunaan ulang menggabungkan tabel sumber CDC MySQL yang dapat digabungkan. Penggabungan terjadi ketika tabel sumber memiliki item konfigurasi yang sama, kecuali nama database, nama tabel, dan
server-id. Mesin secara otomatis menggabungkan sumber CDC MySQL dalam pekerjaan yang sama.Prosedur
Gunakan perintah
SETdalam pekerjaan SQL Anda:SET 'table.optimizer.source-merge.enabled' = 'true'; # (Untuk VVR 8.0.8 dan 8.0.9) Atur juga item ini: SET 'sql-gateway.exec-plan.enabled' = 'false';Penggunaan ulang diaktifkan secara default di VVR 11.1 dan versi yang lebih baru.
Mulai pekerjaan tanpa status. Mengubah konfigurasi penggunaan ulang Sumber mengubah topologi pekerjaan. Anda harus memulai pekerjaan tanpa status. Jika tidak, pekerjaan mungkin gagal dimulai atau data dapat hilang. Jika Sumber digabung, Anda dapat melihat node
MergetableSourceScan.
PentingSetelah Anda mengaktifkan penggunaan ulang, kami tidak menyarankan agar Anda menonaktifkan operator chaining. Jika Anda mengatur
pipeline.operator-chainingkefalse, overhead serialisasi dan deserialisasi data meningkat. Semakin banyak Sumber yang digabung, semakin besar overhead-nya.Di VVR 8.0.7, menonaktifkan operator chaining menyebabkan masalah serialisasi.
Mempercepat pembacaan Binlog
Saat konektor MySQL digunakan sebagai tabel sumber atau sumber data Ingesti Data, konektor tersebut mengurai file log biner untuk menghasilkan berbagai pesan perubahan selama fase inkremental. File log biner mencatat semua perubahan tabel dalam format biner. Anda dapat mempercepat penguraian file log biner dengan cara berikut.
Aktifkan konfigurasi filter penguraian
Gunakan item konfigurasi
scan.only.deserialize.captured.tables.changelog.enabled: Hanya mengurai event perubahan dari tabel yang ditentukan.
Optimalkan parameter Debezium
debezium.max.queue.size: 162580 debezium.max.batch.size: 40960 debezium.poll.interval.ms: 50debezium.max.queue.size: Jumlah maksimum record yang dapat ditampung oleh antrian pemblokiran. Saat Debezium membaca aliran event dari database, ia menempatkan event dalam antrian pemblokiran sebelum menuliskannya ke downstream. Nilai default adalah 8192.debezium.max.batch.size: Jumlah maksimum event yang diproses konektor dalam setiap iterasi. Nilai default adalah 2048.debezium.poll.interval.ms: Jumlah milidetik yang harus ditunggu konektor sebelum meminta event perubahan baru. Nilai default adalah 1000 milidetik, atau 1 detik.
Contoh penggunaan:
CREATE TABLE mysql_source (...) WITH (
'connector' = 'mysql-cdc',
-- Konfigurasi Debezium
'debezium.max.queue.size' = '162580',
'debezium.max.batch.size' = '40960',
'debezium.poll.interval.ms' = '50',
-- Aktifkan filter parsing
'scan.only.deserialize.captured.tables.changelog.enabled' = 'true', -- Hanya mengurai event perubahan dari tabel yang ditentukan.
...
)source:
type: mysql
name: MySQL Source
hostname: ${mysql.hostname}
port: ${mysql.port}
username: ${mysql.username}
password: ${mysql.password}
tables: ${mysql.source.table}
server-id: 7601-7604
# Konfigurasi Debezium
debezium.max.queue.size: 162580
debezium.max.batch.size: 40960
debezium.poll.interval.ms: 50
# Aktifkan filter penguraian
scan.only.deserialize.captured.tables.changelog.enabled: trueKapasitas konsumsi log biner CDC MySQL Edisi Perusahaan adalah 85 MB/detik, sekitar dua kali lipat dari versi komunitas open source. Saat kecepatan pembuatan file log biner melebihi 85 MB/detik (yang berarti satu file 512 MB dihasilkan setiap 6 detik), latensi pekerjaan Flink terus meningkat. Latensi pemrosesan secara bertahap menurun setelah kecepatan pembuatan file log biner melambat. Saat file log biner berisi transaksi besar, hal ini dapat menyebabkan peningkatan sementara dalam latensi pemrosesan. Latensi menurun setelah log transaksi dibaca.
MySQL CDC DataStream API
Saat Anda membaca dan menulis data menggunakan API DataStream, Anda harus menggunakan konektor DataStream yang sesuai untuk terhubung ke Flink. Untuk informasi selengkapnya tentang cara menyiapkan konektor DataStream, lihat Cara menggunakan konektor DataStream.
Anda dapat membuat program API DataStream dan menggunakan MySqlSource. Bagian berikut memberikan contoh kode dan dependensi pom:
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
public class MySqlSourceExample {
public static void main(String[] args) throws Exception {
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("yourHostname")
.port(yourPort)
.databaseList("yourDatabaseName") // set captured database
.tableList("yourDatabaseName.yourTableName") // set captured table
.username("yourUsername")
.password("yourPassword")
.deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// enable checkpoint
env.enableCheckpointing(3000);
env
.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
// set 4 parallel source tasks
.setParallelism(4)
.print().setParallelism(1); // use parallelism 1 for sink to keep message ordering
env.execute("Print MySQL Snapshot + Binlog");
}
}<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-mysql</artifactId>
<version>${vvr.version}</version>
</dependency>Saat Anda membuat MySqlSource, Anda harus menentukan parameter berikut dalam kode:
Parameter | Deskripsi |
hostname | Alamat IP atau hostname dari database MySQL. |
port | Nomor port layanan database MySQL. |
databaseList | Nama database MySQL. Catatan Nama database mendukung ekspresi reguler untuk membaca data dari beberapa database. Anda dapat menggunakan |
username | Nama pengguna untuk layanan database MySQL. |
password | Kata sandi untuk layanan database MySQL. |
deserializer | Deserializer yang mendeserialisasi catatan tipe SourceRecord ke tipe yang ditentukan. Nilai yang valid:
|
Dependensi pom harus menentukan parameter berikut:
${vvr.version} | Versi mesin Alibaba Cloud Realtime Compute for Apache Flink, misalnya: Catatan Harap gunakan nomor versi yang ditampilkan di Maven, karena kami mungkin merilis versi Hotfix secara berkala, dan pembaruan ini mungkin tidak diumumkan melalui saluran lain. |
${flink.version} | Versi Apache Flink, misalnya: Penting Harap gunakan versi Apache Flink yang sesuai dengan versi mesin Alibaba Cloud Realtime Compute for Apache Flink untuk menghindari masalah ketidakcocokan selama runtime pekerjaan. Untuk korespondensi versi, lihat Mesin DPI. |
FAQ
Untuk informasi tentang masalah yang mungkin Anda temui saat menggunakan tabel sumber CDC, lihat Masalah CDC.