Topik ini menjelaskan praktik terbaik untuk menggunakan MySQL Connector.
Tetapkan ID server yang berbeda untuk setiap klien
Setiap klien yang menyinkronkan data basis data harus memiliki ID server unik. Jika beberapa tabel sumber MySQL CDC berbagi ID server yang sama, kesalahan konflik ID server dapat terjadi. Oleh karena itu, kami menyarankan untuk menetapkan ID server yang berbeda untuk setiap klien.
Konfigurasi ID Server
ID server dapat diatur dalam pernyataan DDL tabel Flink atau melalui petunjuk SQL.
Kami menyarankan Anda menetapkan ID server melalui petunjuk SQL. Untuk informasi lebih lanjut tentang petunjuk SQL, lihat petunjuk SQL.
Konfigurasi ID Server dalam Skenario Berbeda
Skenario 1: Snapshot tambahan dinonaktifkan atau paralelisme adalah 1.
Tentukan ID server yang unik:
SELECT * FROM source_table /*+ OPTIONS('server-id'='123456') */ ;Skenario 2: Snapshot tambahan diaktifkan dan paralelisme lebih besar dari 1.
Tentukan rentang ID server, dan pastikan jumlah ID server yang tersedia dalam rentang tidak kurang dari paralelisme. Misalnya, ketika paralelisme adalah 3, jalankan pernyataan berikut untuk mengatur rentang ID server:
SELECT * FROM source_table /*+ OPTIONS('server-id'='123456-123458') */ ;Skenario 3: Pernyataan CREATE TABLE AS (CTAS) digunakan untuk menyinkronkan data.
Jika Anda menggunakan CTAS untuk menyinkronkan data dan beberapa sumber MySQL CDC berbagi konfigurasi yang sama, tabel sumber secara otomatis digabungkan. Dalam hal ini, Anda dapat menentukan ID server yang sama untuk beberapa tabel sumber MySQL CDC. Untuk informasi lebih lanjut, lihat bagian "Contoh 4: Eksekusi Beberapa Pernyataan CREATE TABLE AS" dari topik Pernyataan CREATE TABLE AS.
Skenario 4: Pekerjaan berisi beberapa tabel sumber MySQL CDC, dan pernyataan CTAS tidak digunakan untuk sinkronisasi data.
Dalam hal ini, tabel sumber MySQL CDC tidak dapat digabungkan. Oleh karena itu, Anda perlu menetapkan ID server yang berbeda untuk setiap tabel sumber MySQL CDC. Jika snapshot tambahan diaktifkan dan paralelisme lebih besar dari 1, Anda harus menentukan rentang ID server.
select * from source_table1 /*+ OPTIONS('server-id'='123456-123457') */ left join source_table2 /*+ OPTIONS('server-id'='123458-123459') */ on source_table1.id=source_table2.id;
Konfigurasikan opsi chunk untuk optimasi memori
Saat konektor sumber MySQL CDC dimulai, konektor memindai seluruh tabel yang datanya perlu dibaca, membagi tabel menjadi beberapa chunk berdasarkan kunci utama, lalu mencatat posisi file log biner pada saat itu. Kemudian, konektor sumber MySQL CDC menerapkan mekanisme snapshot tambahan untuk membaca data dari setiap chunk. Pekerjaan Flink secara berkala menghasilkan titik pemeriksaan untuk mencatat chunk yang datanya telah dibaca. Jika terjadi kegagalan, konektor MySQL CDC hanya perlu melanjutkan membaca data dari chunk yang belum dibaca. Setelah data dari semua chunk dibaca, catatan perubahan tambahan dibaca dari posisi file log biner sebelumnya. Pekerjaan Flink terus secara berkala menghasilkan titik pemeriksaan untuk mencatat posisi file log biner. Jika terjadi kegagalan, konektor MySQL CDC memproses data dari posisi file log biner sebelumnya. Dengan cara ini, semantik exactly-once diimplementasikan.
Untuk informasi lebih lanjut tentang algoritma savepoint tambahan, lihat konektor MySQL CDC dalam dokumentasi Apache Flink.
Tabel dengan kunci utama satu kolom dibagi menjadi chunk berdasarkan kunci tersebut secara default. Tabel fisik dengan kunci utama komposit di-chunk berdasarkan kolom pertama kunci secara default. Realtime Compute for Apache Flink yang menggunakan Ververica Runtime (VVR) 6.0.7 atau lebih baru mendukung membaca data dari tabel tanpa kunci utama. Data dalam tabel seperti itu didistribusikan ke dalam chunk berdasarkan kolom non-null yang ditentukan oleh scan.incremental.snapshot.chunk.key-column.
Strategi optimasi memori
Data chunk dan metadata disimpan di memori. Jika terjadi kehabisan memori (OOM), sesuaikan opsi konektor terkait chunk berdasarkan situasi tertentu:
JobManager
Jumlah chunk yang berlebihan dapat menyebabkan OOM JobManager, yang menyimpan data tentang semua chunk. Untuk menghindari OOM JobManager, kurangi jumlah chunk dengan menyetel scan.incremental.snapshot.chunk.size ke nilai yang lebih besar. Anda juga dapat mengonfigurasi jobmanager.memory.heap.size untuk meningkatkan ukuran memori heap JVM untuk JobManager. Untuk detailnya, lihat Konfigurasi Memori dalam dokumentasi Apache Flink.
TaskManager
Jumlah baris yang berlebihan dalam sebuah chunk dapat menyebabkan OOM TaskManager, yang membaca data dari setiap chunk. Untuk menghindari kesalahan OOM TaskManager, kurangi jumlah baris dalam setiap chunk dengan menyetel scan.incremental.snapshot.chunk.size ke nilai yang lebih kecil. Anda juga dapat menetapkan nilai yang lebih besar ke taskmanager.memory.framework.heap.size untuk meningkatkan ukuran memori heap JVM untuk TaskManager.
Dalam Realtime Compute for Apache Flink yang menggunakan VVR 8.0.8 atau lebih awal, ukuran data di chunk terakhir biasanya besar dan kemungkinan besar menyebabkan kesalahan OOM TaskManager. Untuk menyelesaikan masalah ini, tingkatkan ke VVR 8.0.9 atau lebih baru.
Ketika kolom pertama dari kunci komposit memiliki banyak nilai duplikat, mekanisme chunking default yang bergantung pada kolom kunci utama pertama dapat meningkatkan ukuran chunk dan mungkin menyebabkan kesalahan OOM TaskManager. Untuk menghindarinya, konfigurasikan scan.incremental.snapshot.chunk.key-column untuk membagi tabel berdasarkan kolom kunci utama lainnya.
Aktifkan penggabungan sumber untuk mengurangi koneksi basis data
Penggabungan sumber berguna untuk pekerjaan dengan beberapa tabel sumber MySQL CDC. Ini memungkinkan Flink mengakses log biner melalui koneksi minimum yang diperlukan ke MySQL, mengurangi beban pada basis data MySQL. Fitur ini hanya didukung oleh konektor MySQL CDC dari Realtime Compute for Apache Flink. Konektor MySQL CDC dari Apache Flink tidak mendukung ini.
Untuk mengaktifkan penggabungan sumber, sertakan perintah SET dalam draft SQL Anda:
SET 'table.optimizer.source-merge.enabled' = 'true';Kami menyarankan Anda mengaktifkan fitur ini saat membuat draft SQL yang menggunakan konektor MySQL CDC. Setelah mengaktifkan penggabungan sumber untuk penyebaran yang ada, Anda harus melakukan startup tanpa status. Karena penggabungan sumber mengubah grafik pekerjaan, startup dengan status mungkin gagal dan kehilangan data dapat terjadi.
Setelah penggabungan sumber diaktifkan, tabel sumber MySQL CDC dengan konfigurasi yang sama digabungkan. Jika semua tabel sumber MySQL CDC berbagi konfigurasi yang sama, jumlah koneksi MySQL dalam pekerjaan Flink yang sesuai adalah sebagai berikut:
Selama pembacaan snapshot, jumlah koneksi sama dengan paralelisme sumber.
Selama pembacaan tambahan, jumlah koneksi adalah 1.
Dalam Realtime Compute for Apache Flink yang menggunakan VVR 8.0.8 dan VVR 8.0.9, Anda juga harus menentukan
SET 'sql-gateway.exec-plan.enabled' = 'false';saat Anda mengaktifkan penggabungan sumber untuk tabel sumber MySQL CDC.Jika rantai operator terputus, overhead serialisasi dan deserialisasi data dari sumber ke operator hilir akan meningkat. Semakin banyak tabel sumber MySQL CDC yang digabungkan, semakin tinggi overhead yang dihasilkan. Oleh karena itu, kami tidak mendorong Anda untuk menyetel
pipeline.operator-chainingke false setelah Anda mengaktifkan penggabungan sumber.Dalam Realtime Compute for Apache Flink yang menggunakan VVR 8.0.7, masalah serialisasi dapat terjadi jika Anda menyetel
pipeline.operator-chainingke false.
Konfigurasikan opsi penguraian binlog untuk mempercepat pembacaan tambahan
Jika konektor MySQL CDC digunakan sebagai tabel sumber atau sumber injeksi data, konektor MySQL CDC mengurai file log biner untuk menghasilkan pesan perubahan. File log biner mencatat data perubahan dari semua tabel. Anda dapat menggunakan metode berikut untuk mempercepat penguraian data log biner:
Aktifkan penguraian paralel dan filter penguraian. Fitur ini hanya didukung oleh konektor MySQL CDC dari Realtime Compute for Apache Flink di VVR 8.0.7 atau lebih baru. Ini tidak didukung oleh konektor MySQL CDC dari Apache Flink.
Aktifkan
scan.only.deserialize.captured.tables.changelog.enabled: Mengurai hanya peristiwa perubahan dalam tabel yang ditentukan.Aktifkan
scan.only.deserialize.captured.tables.changelog.enabled: Menggunakan beberapa thread untuk mengurai peristiwa dalam file log biner dan mengirimkan peristiwa yang diurai ke antrian konsumsi secara berurutan. Jika Anda mengonfigurasi opsi ini, kami menyarankan Anda meningkatkan nilai CPU Pengelola Tugas di konsol.
Optimalkan Opsi Terkait Debezium
debezium.max.queue.size: 162580 debezium.max.batch.size: 40960 debezium.poll.interval.ms: 50debezium.max.queue.size: Jumlah maksimum catatan data yang dapat ditampung oleh antrian blokir. Saat Debezium membaca aliran peristiwa dari basis data, ia menempatkan peristiwa dalam antrian blokir sebelum menulisnya ke sistem hilir. Nilai default: 8192.debezium.max.batch.size: Jumlah maksimum peristiwa yang dapat diproses oleh konektor dalam satu batch. Nilai default: 2048.debezium.poll.interval.ms: Durasi yang harus ditunggu konektor sebelum meminta peristiwa perubahan baru. Nilai default: 1000. Unit: milidetik.
Contoh kode:
CREATE TABLE mysql_source (...) WITH (
'connector' = 'mysql-cdc',
-- Konfigurasikan opsi terkait Debezium.
'debezium.max.queue.size' = '162580',
'debezium.max.batch.size' = '40960',
'debezium.poll.interval.ms' = '50',
-- Aktifkan penguraian paralel dan konfigurasi filter penguraian.
'scan.only.deserialize.captured.tables.changelog.enabled' = 'true', -- Mengurai hanya peristiwa perubahan dalam tabel yang ditentukan.
'scan.parallel-deserialize-changelog.enabled' = 'true' -- Menggunakan beberapa thread untuk mengurai peristiwa dalam file binlog.
...
)Analisis latensi data dan optimalkan throughput baca
Jika latensi data terjadi selama pembacaan tambahan, Anda dapat melakukan langkah-langkah berikut untuk mengoptimalkan throughput baca:
Periksa nilai metrik currentFetchEventTimeLag dan currentEmitEventTimeLag. currentFetchEventTimeLag menunjukkan latensi transmisi dari MySQL ke Flink, dan currentEmitEventTimeLag adalah latensi pemrosesan. Untuk informasi lebih lanjut, lihat bagian "Ikhtisar" dari topik Metrik.
Deskripsi Skenario
Analisis
currentFetchEventTimeLag relatif kecil sementara currentEmitEventTimeLag relatif besar dan tidak diperbarui secara sering.
Nilai currentFetchEventTimeLag yang kecil menunjukkan bahwa latensi pengambilan data dari basis data MySQL rendah. Nilai currentEmitEventTimeLag yang terus-menerus besar menunjukkan volume data relevan yang diproses oleh pekerjaan Flink kecil. Skenario ini tipikal dan sesuai dengan harapan.
Baik currentFetchEventTimeLag maupun currentEmitEventTimeLag besar.
Kemampuan sumber untuk menarik data dari MySQL lemah. Lakukan langkah-langkah berikut dalam bagian ini untuk optimasi.
Periksa apakah tekanan balik ada, yang memperlambat streaming data ke operator hilir. Jika tekanan balik ada, nilai metrik sourceIdleTime mungkin meningkat secara periodik dan nilai metrik currentFetchEventTimeLag dan currentEmitEventTimeLag mungkin terus meningkat. Untuk menyelesaikan masalah ini, identifikasi operator lambat dan tingkatkan paralelismenya.
Verifikasi apakah sumber daya CPU atau memori habis dengan memeriksa metrik Penggunaan CPU TM dan metrik Waktu GC TM JVM. Jika kehabisan sumber daya memang ada, pertimbangkan untuk meningkatkan sumber daya untuk mengoptimalkan kinerja baca. Anda juga dapat mengonfigurasi opsi miniBatch untuk meningkatkan throughput baca. Untuk informasi lebih lanjut, lihat Optimalkan Flink SQL.
Ketika operator SinkUpsertMaterializer ada dalam pekerjaan dengan state besar, kinerja baca menurun. Dalam hal ini, tingkatkan paralelisme pekerjaan atau jangan gunakan operator SinkUpsertMaterializer. Untuk informasi lebih lanjut, lihat Hindari Menggunakan SinkUpsertMaterializer. Setelah menghapus operator SinkUpsertMaterializer, lakukan startup tanpa status. Ini diperlukan karena grafik pekerjaan telah berubah, yang dapat menyebabkan startup dengan status gagal atau mengakibatkan kehilangan data.
Aktifkan ApsaraDB RDS for MySQL untuk penyimpanan log biner yang tahan lama
Anda dapat menggunakan ApsaraDB RDS for MySQL sebagai sumber data. Ini memungkinkan Anda membaca cadangan log yang disimpan di OSS. Ketika file log biner yang diminta (ditentukan oleh timestamp atau posisi) disimpan di OSS, Flink mengunduh log biner ke klusternya sebelum memprosesnya. Ketika file log biner yang diminta tersedia secara lokal, Flink secara otomatis beralih ke koneksi basis data untuk membaca log biner. Fitur ini hanya didukung oleh konektor MySQL CDC dari Realtime Compute for Apache Flink. Konektor MySQL CDC dari Apache Flink tidak mendukung ini.
Untuk membaca log biner dari OSS, konfigurasikan opsi koneksi ApsaraDB RDS for MySQL sebagai berikut:
CREATE TABLE mysql_source (...) WITH (
'connector' = 'mysql-cdc',
'rds.region-id' = 'cn-beijing',
'rds.access-key-id' = 'your_access_key_id',
'rds.access-key-secret' = 'your_access_key_secret',
'rds.db-instance-id' = 'rm-xxxxxxxx', -- ID instance ApsaraDB RDS for MySQL.
'rds.main-db-id' = '12345678', -- ID basis data utama.
'rds.endpoint' = 'rds.aliyuncs.com'
...
)Sinkronkan data basis data dan perubahan skema
Untuk tugas sinkronisasi data, kami menyarankan membuat penyebaran injeksi data, yang dioptimalkan untuk skenario integrasi data. Untuk informasi lebih lanjut, lihat Memulai dengan Penyebaran YAML untuk Injeksi Data dan Kembangkan Draft YAML untuk Injeksi Data (Pratinjau Publik).
Potongan kode berikut menunjukkan bagaimana penyebaran injeksi data menyinkronkan data dan perubahan skema dari basis data MySQL bernama app_db ke Hologres:
source:
type: mysql
hostname: <hostname>
port: 3306
username: ${secret_values.mysqlusername}
password: ${secret_values.mysqlpassword}
tables: app_db.\.*
server-id: 5400-5404
sink:
type: hologres
name: Hologres Sink
endpoint: <endpoint>
dbname: <database-name>
username: ${secret_values.holousername}
password: ${secret_values.holopassword}
pipeline:
name: Sinkronkan Basis Data MySQL ke HologresSinkronkan tabel baru
Untuk menyinkronkan tabel baru dari MySQL melalui penyebaran injeksi data, konfigurasikan opsi berikut sesuai kebutuhan:
Opsi | Deskripsi | Catatan |
| Menentukan apakah akan menyinkronkan tabel baru (yang tidak ditemukan pada startup sebelumnya) ketika penyebaran dimulai ulang dari titik pemeriksaan. Jika opsi diaktifkan, Flink membaca snapshot dan data tambahan dari tabel baru. | Opsi ini hanya efektif ketika |
| Menentukan apakah akan menyinkronkan tabel baru selama pembacaan tambahan. |
|
Untuk mencegah duplikasi data, jangan aktifkan scan.newly-added-table.enabled dan scan.binlog.newly-added-table.enabled secara bersamaan.