All Products
Search
Document Center

Realtime Compute for Apache Flink:Praktik terbaik untuk MySQL connector

Last Updated:Jan 27, 2026

Topik ini menjelaskan praktik terbaik untuk menggunakan MySQL Connector.

Tetapkan Server ID untuk menghindari konflik konsumsi Binlog

Setiap klien yang menyinkronkan data basis data harus memiliki ID server unik. Jika beberapa tabel sumber MySQL CDC berbagi ID server yang sama, kesalahan konflik ID server dapat terjadi. Oleh karena itu, kami menyarankan untuk menetapkan ID server yang berbeda untuk setiap klien.

  • Konfigurasi ID Server

    ID server dapat diatur dalam pernyataan DDL tabel Flink atau melalui petunjuk SQL.

    Kami menyarankan Anda menetapkan ID server melalui petunjuk SQL. Untuk informasi lebih lanjut tentang petunjuk SQL, lihat petunjuk SQL.

  • Konfigurasi ID Server dalam Skenario Berbeda

    • Skenario 1: Snapshot tambahan dinonaktifkan atau paralelisme adalah 1.

      Anda dapat menentukan Server ID jika kerangka snapshot inkremental dinonaktifkan atau tingkat paralelisme adalah 1.

      SELECT * FROM source_table /*+ OPTIONS('server-id'='123456') */ ;
    • Skenario 2: Snapshot tambahan diaktifkan dan paralelisme lebih besar dari 1.

      Tentukan rentang ID server, dan pastikan jumlah ID server yang tersedia dalam rentang tidak kurang dari paralelisme. Misalnya, ketika paralelisme adalah 3, jalankan pernyataan berikut untuk mengatur rentang ID server:

      SELECT * FROM source_table /*+ OPTIONS('server-id'='123456-123458') */ ;
    • Sinkronisasi Data Menggunakan CTAS

      Jika Anda menggunakan CTAS untuk menyinkronkan data dan beberapa sumber MySQL CDC berbagi konfigurasi yang sama, tabel sumber secara otomatis digabungkan. Dalam hal ini, Anda dapat menentukan ID server yang sama untuk beberapa tabel sumber MySQL CDC. Untuk informasi lebih lanjut, lihat bagian "Contoh 4: Eksekusi Beberapa Pernyataan CREATE TABLE AS" dari topik Pernyataan CREATE TABLE AS.

    • Skenario 4: Pekerjaan berisi beberapa tabel sumber MySQL CDC, dan pernyataan CTAS tidak digunakan untuk sinkronisasi data.

      Jika suatu pekerjaan berisi beberapa tabel sumber CDC MySQL, tidak disinkronkan menggunakan pernyataan CTAS, dan Source reuse dinonaktifkan, Anda harus memberikan Server ID unik untuk setiap tabel sumber CDC (untuk informasi selengkapnya, lihat Aktifkan Source Reuse untuk Mengurangi Koneksi Data Binlog). Demikian pula, jika kerangka snapshot inkremental diaktifkan dan tingkat paralelisme lebih besar dari 1, Anda harus menentukan rentang Server ID.

      select * from 
        source_table1 /*+ OPTIONS('server-id'='123456-123457') */
      left join 
        source_table2 /*+ OPTIONS('server-id'='123458-123459') */
      on source_table1.id=source_table2.id;

Konfigurasikan opsi chunk untuk optimasi memori

Saat konektor sumber MySQL CDC dimulai, konektor memindai seluruh tabel yang datanya perlu dibaca, membagi tabel menjadi beberapa chunk berdasarkan kunci utama, lalu mencatat posisi file log biner pada saat itu. Kemudian, konektor sumber MySQL CDC menerapkan mekanisme snapshot tambahan untuk membaca data dari setiap chunk. Pekerjaan Flink secara berkala menghasilkan titik pemeriksaan untuk mencatat chunk yang datanya telah dibaca. Jika terjadi kegagalan, konektor MySQL CDC hanya perlu melanjutkan membaca data dari chunk yang belum dibaca. Setelah data dari semua chunk dibaca, catatan perubahan tambahan dibaca dari posisi file log biner sebelumnya. Pekerjaan Flink terus secara berkala menghasilkan titik pemeriksaan untuk mencatat posisi file log biner. Jika terjadi kegagalan, konektor MySQL CDC memproses data dari posisi file log biner sebelumnya. Dengan cara ini, semantik exactly-once diimplementasikan.

Untuk informasi lebih lanjut tentang algoritma savepoint tambahan, lihat konektor MySQL CDC dalam dokumentasi Apache Flink.

Tabel dengan kunci utama satu kolom dibagi menjadi chunk berdasarkan kunci tersebut secara default. Tabel fisik dengan kunci utama komposit di-chunk berdasarkan kolom pertama kunci secara default. Realtime Compute for Apache Flink yang menggunakan Ververica Runtime (VVR) 6.0.7 atau lebih baru mendukung membaca data dari tabel tanpa kunci utama. Data dalam tabel seperti itu didistribusikan ke dalam chunk berdasarkan kolom non-null yang ditentukan oleh scan.incremental.snapshot.chunk.key-column.

Optimasi parameter sharding

Data chunk dan metadata disimpan di memori. Jika terjadi kehabisan memori (OOM), sesuaikan opsi konektor terkait chunk berdasarkan situasi tertentu:

  • JobManager

    Jumlah chunk yang berlebihan dapat menyebabkan OOM JobManager, yang menyimpan data tentang semua chunk. Untuk menghindari OOM JobManager, kurangi jumlah chunk dengan menyetel scan.incremental.snapshot.chunk.size ke nilai yang lebih besar. Anda juga dapat mengonfigurasi jobmanager.memory.heap.size untuk meningkatkan ukuran memori heap JVM untuk JobManager. Untuk detailnya, lihat Konfigurasi Memori dalam dokumentasi Apache Flink.

  • TaskManager

    • Jumlah baris yang berlebihan dalam sebuah chunk dapat menyebabkan OOM TaskManager, yang membaca data dari setiap chunk. Untuk menghindari kesalahan OOM TaskManager, kurangi jumlah baris dalam setiap chunk dengan menyetel scan.incremental.snapshot.chunk.size ke nilai yang lebih kecil. Anda juga dapat menetapkan nilai yang lebih besar ke taskmanager.memory.framework.heap.size untuk meningkatkan ukuran memori heap JVM untuk TaskManager.

    • Dalam Realtime Compute for Apache Flink yang menggunakan VVR 8.0.8 atau lebih awal, ukuran data di chunk terakhir biasanya besar dan kemungkinan besar menyebabkan kesalahan OOM TaskManager. Untuk menyelesaikan masalah ini, tingkatkan ke VVR 8.0.9 atau lebih baru.

    • Ketika kolom pertama dari kunci komposit memiliki banyak nilai duplikat, mekanisme chunking default yang bergantung pada kolom kunci utama pertama dapat meningkatkan ukuran chunk dan mungkin menyebabkan kesalahan OOM TaskManager. Untuk menghindarinya, konfigurasikan scan.incremental.snapshot.chunk.key-column untuk membagi tabel berdasarkan kolom kunci utama lainnya.

Sesuaikan konfigurasi pekerjaan untuk mempercepat pembacaan pada fase penuh

Selama fase pembacaan penuh, tabel sumber MySQL membaca data snapshot melalui koneksi Java Database Connectivity (JDBC). Anda dapat mempercepat pembacaan pada fase penuh dengan cara berikut:

  1. Tingkatkan konkurensi sumber untuk mempercepat pembacaan pada fase penuh.

  2. Tingkatkan nilai scan.incremental.snapshot.chunk.size untuk meningkatkan volume data yang diambil oleh satu chunk.

  3. Jika tabel sink downstream memiliki kunci primer dan mendukung penulisan idempoten, Anda dapat mengaktifkan scan.incremental.snapshot.backfill.skip untuk melewati pembacaan log biner pada bagian backfill. Hal ini mempercepat pemrosesan pada fase penuh.

Aktifkan Source Reuse untuk Mengurangi Koneksi Data Binlog

Penggabungan sumber berguna untuk pekerjaan dengan beberapa tabel sumber MySQL CDC. Ini memungkinkan Flink mengakses log biner melalui koneksi minimum yang diperlukan ke MySQL, mengurangi beban pada basis data MySQL. Fitur ini hanya didukung oleh konektor MySQL CDC dari Realtime Compute for Apache Flink. Konektor MySQL CDC dari Apache Flink tidak mendukung ini.

Anda dapat menggunakan perintah SET dalam pekerjaan SQL untuk mengaktifkan fitur Source reuse:

SET 'table.optimizer.source-merge.enabled' = 'true';

Kami merekomendasikan mengaktifkan fitur Source reuse untuk pekerjaan yang baru dibuat. Jika Anda mengaktifkan fitur ini untuk pekerjaan yang sudah ada, Anda harus melakukan startup tanpa status (stateless start). Hal ini karena Source reuse mengubah topologi pekerjaan. Memulai pekerjaan dari status sebelumnya dapat menyebabkan kegagalan startup atau kehilangan data.

Setelah penggabungan sumber diaktifkan, tabel sumber MySQL CDC dengan konfigurasi yang sama digabungkan. Jika semua tabel sumber MySQL CDC berbagi konfigurasi yang sama, jumlah koneksi MySQL dalam pekerjaan Flink yang sesuai adalah sebagai berikut:

  • Selama pembacaan snapshot, jumlah koneksi sama dengan paralelisme sumber.

  • Selama pembacaan tambahan, jumlah koneksi adalah 1.

Penting
  • Pada VVR 8.0.8 dan 8.0.9, ketika Anda mengaktifkan penggabungan sumber CDC, Anda juga harus menetapkan SET 'sql-gateway.exec-plan.enabled' = 'false';.

  • Setelah Anda mengaktifkan penggabungan sumber CDC, kami tidak merekomendasikan menetapkan item konfigurasi pekerjaan pipeline.operator-chaining ke false. Jika rantai operator diputus, overhead serialisasi dan deserialisasi data yang dikirim dari sumber ke operator downstream akan meningkat. Semakin banyak sumber yang digabungkan, semakin besar overhead-nya.

  • Pada mesin komputasi waktu nyata VVR 8.0.7, jika Anda menetapkan pipeline.operator-chaining ke false, terjadi masalah serialisasi.

Konfigurasikan opsi penguraian binlog untuk mempercepat pembacaan tambahan

Jika konektor MySQL CDC digunakan sebagai tabel sumber atau sumber injeksi data, konektor MySQL CDC mengurai file log biner untuk menghasilkan pesan perubahan. File log biner mencatat data perubahan dari semua tabel. Anda dapat menggunakan metode berikut untuk mempercepat penguraian data log biner:

  • Aktifkan penguraian paralel dan filter penguraian. Fitur ini hanya didukung oleh konektor MySQL CDC dari Realtime Compute for Apache Flink di VVR 8.0.7 atau lebih baru. Ini tidak didukung oleh konektor MySQL CDC dari Apache Flink.

    • Aktifkan item konfigurasi scan.only.deserialize.captured.tables.changelog.enabled. Item ini hanya mengurai event perubahan pada tabel yang ditentukan.

    • Aktifkan item konfigurasi scan.parallel-deserialize-changelog.enabled. Item ini menggunakan beberapa thread untuk mengurai file log biner dan mengirimkan data yang telah diurai secara berurutan ke antrian konsumsi. Saat Anda mengaktifkan konfigurasi ini, Anda mungkin perlu menambahkan CPU Task Manager.

  • Optimalkan Opsi Terkait Debezium

    debezium.max.queue.size: 162580
    debezium.max.batch.size: 40960
    debezium.poll.interval.ms: 50
    • debezium.max.queue.size: Jumlah maksimum catatan yang dapat ditampung oleh antrian pemblokiran. Saat Debezium membaca aliran event dari database, event tersebut ditempatkan di antrian pemblokiran sebelum dikirim ke downstream. Nilai default-nya adalah 8192.

    • debezium.max.batch.size: Jumlah maksimum event yang dapat diproses oleh konektor dalam setiap iterasi. Nilai default-nya adalah 2048.

    • debezium.poll.interval.ms: Jumlah milidetik yang ditunggu konektor sebelum meminta event perubahan baru. Nilai default-nya adalah 1000 milidetik, yaitu 1 detik.

Contoh:

CREATE TABLE mysql_source (...) WITH (
    'connector' = 'mysql-cdc',
    -- Konfigurasikan opsi terkait Debezium.
    'debezium.max.queue.size' = '162580',
    'debezium.max.batch.size' = '40960',
    'debezium.poll.interval.ms' = '50',
    -- Aktifkan penguraian paralel dan konfigurasi filter penguraian.
    'scan.only.deserialize.captured.tables.changelog.enabled' = 'true', -- Mengurai hanya peristiwa perubahan dalam tabel yang ditentukan. 
    'scan.parallel-deserialize-changelog.enabled' = 'true' -- Menggunakan beberapa thread untuk mengurai peristiwa dalam file binlog. 
    ...
)

Analisis Latensi Data dan Optimalkan Throughput Pekerjaan

Jika terjadi latensi data selama fase inkremental, Anda dapat mengikuti langkah-langkah berikut untuk menganalisis masalah tersebut:

  1. Lihat metrik currentFetchEventTimeLag dan currentEmitEventTimeLag di Ikhtisar. Metrik currentFetchEventTimeLag merepresentasikan latensi pembacaan data dari Binlog, sedangkan metrik currentEmitEventTimeLag merepresentasikan latensi proses ini untuk tabel yang terkait dengan suatu pekerjaan.

    Deskripsi Skenario

    Detail

    currentFetchEventTimeLag relatif kecil sementara currentEmitEventTimeLag relatif besar dan tidak diperbarui secara sering.

    Nilai currentFetchEventTimeLag yang rendah menunjukkan latensi rendah saat menarik log biner dari database. Namun, jika log biner berisi sedikit data untuk tabel yang dibaca oleh pekerjaan, currentEmitEventTimeLag jarang diperbarui. Hal ini normal.

    Baik currentFetchEventTimeLag maupun currentEmitEventTimeLag besar.

    Hal ini menunjukkan bahwa performa tarik (pull performance) tabel Sumber buruk. Anda dapat mengikuti langkah-langkah dalam bagian ini untuk mengoptimalkannya.

  2. Periksa apakah tekanan balik ada, yang memperlambat streaming data ke operator hilir. Jika tekanan balik ada, nilai metrik sourceIdleTime mungkin meningkat secara periodik dan nilai metrik currentFetchEventTimeLag dan currentEmitEventTimeLag mungkin terus meningkat. Untuk menyelesaikan masalah ini, identifikasi operator lambat dan tingkatkan paralelismenya.

  3. Verifikasi apakah sumber daya CPU atau memori habis dengan memeriksa metrik Penggunaan CPU TM dan metrik Waktu GC TM JVM. Jika kehabisan sumber daya memang ada, pertimbangkan untuk meningkatkan sumber daya untuk mengoptimalkan kinerja baca. Anda juga dapat mengonfigurasi opsi miniBatch untuk meningkatkan throughput baca. Untuk informasi lebih lanjut, lihat Optimalkan Flink SQL.

  4. Ketika operator SinkUpsertMaterializer ada dalam pekerjaan dengan state besar, kinerja baca menurun. Dalam hal ini, tingkatkan paralelisme pekerjaan atau jangan gunakan operator SinkUpsertMaterializer. Untuk informasi lebih lanjut, lihat Hindari Menggunakan SinkUpsertMaterializer. Setelah menghapus operator SinkUpsertMaterializer, lakukan startup tanpa status. Ini diperlukan karena grafik pekerjaan telah berubah, yang dapat menyebabkan startup dengan status gagal atau mengakibatkan kehilangan data.

Aktifkan pembacaan log biner RDS untuk mencegah kedaluwarsa

Anda dapat menggunakan ApsaraDB RDS for MySQL sebagai sumber data. Ini memungkinkan Anda membaca cadangan log yang disimpan di OSS. Ketika file log biner yang diminta (ditentukan oleh timestamp atau posisi) disimpan di OSS, Flink mengunduh log biner ke klusternya sebelum memprosesnya. Ketika file log biner yang diminta tersedia secara lokal, Flink secara otomatis beralih ke koneksi basis data untuk membaca log biner. Fitur ini hanya didukung oleh konektor MySQL CDC dari Realtime Compute for Apache Flink. Konektor MySQL CDC dari Apache Flink tidak mendukung ini.

Untuk membaca log biner dari OSS, konfigurasikan opsi koneksi ApsaraDB RDS for MySQL sebagai berikut:

CREATE TABLE mysql_source (...) WITH (
    'connector' = 'mysql-cdc',
    'rds.region-id' = 'cn-beijing',
    'rds.access-key-id' = 'your_access_key_id',
    'rds.access-key-secret' = 'your_access_key_secret',
    'rds.db-instance-id' = 'rm-xxxxxxxx',  -- ID instance ApsaraDB RDS for MySQL. 
    'rds.main-db-id' = '12345678', -- ID basis data utama. 
    'rds.endpoint' = 'rds.aliyuncs.com'
    ...
)

Sinkronkan data basis data dan perubahan skema

Untuk tugas sinkronisasi data, kami merekomendasikan membuat penerapan ingesti data, yang dioptimalkan untuk skenario integrasi data. Untuk informasi selengkapnya, lihat Memulai Penerapan YAML untuk Ingesti Data dan Mengembangkan Pekerjaan Flink CDC (Beta).

Potongan kode berikut menunjukkan bagaimana penyebaran injeksi data menyinkronkan data dan perubahan skema dari basis data MySQL bernama app_db ke Hologres:

source:
  type: mysql
  hostname: <hostname>
  port: 3306
  username: ${secret_values.mysqlusername}
  password: ${secret_values.mysqlpassword}
  tables: app_db.\.*
  server-id: 5400-5404

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <endpoint>
  dbname: <database-name>
  username: ${secret_values.holousername}
  password: ${secret_values.holopassword}

pipeline:
  name: Sinkronkan Basis Data MySQL ke Hologres

Tambahkan fitur tabel untuk konektor ingesti data

Konektor ingesti data MySQL menyediakan item konfigurasi untuk mendukung tabel baru dalam dua skenario.

Item Konfigurasi

Deskripsi

Catatan

scan.newly-added-table.enabled

Menentukan apakah akan menyinkronkan tabel baru (yang tidak ditemukan pada startup sebelumnya) ketika penyebaran dimulai ulang dari titik pemeriksaan. Jika opsi diaktifkan, Flink membaca snapshot dan data tambahan dari tabel baru.

Item ini hanya didukung ketika item konfigurasi scan.startup.mode ditetapkan ke initial. Item konfigurasi ini tidak berlaku pada mode startup lainnya.

scan.binlog.newly-added-table.enabled

Menentukan apakah akan menyinkronkan tabel baru selama pembacaan tambahan.

  • Untuk menyinkronkan tabel baru selama pembacaan tambahan, kami menyarankan mengaktifkan opsi ini untuk startup penyebaran pertama kali. Dengan melakukannya, pekerjaan Flink akan secara otomatis mengurai pernyataan CREATE TABLE dan menyinkronkan data ke sistem hilir. Jika Anda memulai ulang penyebaran dengan opsi ini dikonfigurasi hanya setelah tabel dibuat, data dapat hilang.

  • Pada mode startup initial, operasi DDL tidak dapat disinkronkan ke sistem downstream sebelum fase penuh selesai. Jika sebuah tabel dibuat selama fase penuh, tabel tersebut tidak dapat disinkronkan secara otomatis meskipun scan.binlog.newly-added-table.enabled diaktifkan.

Penting

Kami tidak merekomendasikan mengaktifkan scan.newly-added-table.enabled dan scan.binlog.newly-added-table.enabled secara bersamaan. Jika Anda mengaktifkan kedua item konfigurasi tersebut, terjadi duplikasi data.