全部产品
Search
文档中心

Realtime Compute for Apache Flink:MySQL

更新时间:Mar 12, 2026

Topik ini menjelaskan cara menggunakan konektor MySQL.

Informasi latar belakang

Konektor MySQL mendukung semua database yang kompatibel dengan protokol MySQL, termasuk ApsaraDB RDS for MySQL, PolarDB for MySQL, OceanBase (mode MySQL), dan database MySQL yang dikelola sendiri.

Penting

Saat menggunakan konektor MySQL untuk membaca data dari OceanBase, pastikan binary logging diaktifkan dan dikonfigurasi dengan benar di OceanBase. Untuk informasi selengkapnya, lihat Operasi binary logging. Fitur ini sedang dalam pratinjau publik. Evaluasi kesesuaian dan gunakan dengan hati-hati sebelum penerapan.

Konektor MySQL mendukung fitur-fitur berikut.

Kategori

Detail

Jenis yang didukung

Tabel sumber, tabel dimensi, dan tabel sink. Juga mendukung ingesti data dari sumber data.

Mode eksekusi

Hanya mode streaming

Format data

Tidak berlaku

Metrik pemantauan kustom

Metrik pemantauan

  • Tabel sumber

    • currentFetchEventTimeLag: Selisih waktu antara saat catatan dihasilkan dan saat diambil oleh Source Operator.

      Metrik ini hanya berlaku selama fase log biner. Nilainya selalu 0 selama fase snapshot.

    • currentEmitEventTimeLag: Selisih waktu antara saat catatan dihasilkan dan saat meninggalkan Source Operator.

      Metrik ini hanya berlaku selama fase log biner. Nilainya selalu 0 selama fase snapshot.

    • sourceIdleTime: Berapa lama tabel sumber tidak menghasilkan data baru.

  • Tabel dimensi dan tabel sink: Tidak ada

Catatan

Untuk informasi selengkapnya tentang metrik ini, lihat Metrik pemantauan.

Jenis API

DataStream, SQL, dan YAML ingesti data

Dukungan untuk memperbarui atau menghapus data tabel sink

Ya

Fitur utama

Tabel sumber CDC MySQL pertama-tama membaca seluruh data historis dari database, lalu beralih secara mulus ke pembacaan log biner. Pendekatan ini memastikan tidak ada data yang terlewat atau duplikat. Bahkan jika terjadi kegagalan, pemrosesan data menjamin semantik tepat-sekali. Tabel sumber CDC MySQL mendukung pembacaan konkuren data lengkap dan menggunakan algoritma snapshot inkremental untuk mencapai operasi tanpa penguncian serta transfer yang dapat dilanjutkan. Untuk informasi selengkapnya, lihat Tentang tabel sumber CDC MySQL.

  • Pemrosesan streaming dan batch terpadu: membaca data lengkap dan inkremental tanpa memelihara dua alur kerja terpisah.

  • Pembacaan konkuren data lengkap untuk skalabilitas horizontal.

  • Transisi mulus dari pembacaan data lengkap ke pembacaan inkremental, dengan skala-masuk otomatis untuk menghemat sumber daya komputasi.

  • Transfer yang dapat dilanjutkan selama pembacaan data lengkap guna meningkatkan stabilitas.

  • Pembacaan data lengkap tanpa penguncian untuk menghindari dampak pada bisnis online.

  • Dukungan pembacaan log cadangan dari ApsaraDB RDS for MySQL.

  • Penguraian paralel file log biner untuk latensi yang lebih rendah.

Prasyarat

Sebelum menggunakan tabel sumber CDC MySQL, selesaikan langkah-langkah dalam Konfigurasi MySQL. Langkah-langkah tersebut memastikan lingkungan Anda memenuhi prasyarat untuk menggunakan tabel sumber CDC MySQL.

ApsaraDB RDS for MySQL

  • Lakukan pengujian konektivitas jaringan antara ApsaraDB RDS for MySQL dan Realtime Compute for Apache Flink untuk memverifikasi konektivitas jaringan.

  • Versi MySQL yang didukung: 5.6, 5.7, atau 8.0.x.

  • Aktifkan binary logging. (Diaktifkan secara default.)

  • Atur format log biner ke ROW. (ROW adalah format default.)

  • Atur binlog_row_image ke FULL. (FULL adalah pengaturan default.)

  • Nonaktifkan Binary Log Transaction Compression. (Diperkenalkan di MySQL 8.0.20 dan seterusnya. Dinonaktifkan secara default.)

  • Seorang pengguna MySQL telah dibuat dengan hak istimewa SELECT, SHOW DATABASES, REPLICATION SLAVE, dan REPLICATION CLIENT.

  • Buat database dan tabel MySQL. Untuk informasi selengkapnya, lihat Buat database dan akun untuk ApsaraDB RDS for MySQL. Gunakan akun istimewa untuk membuat database guna mencegah kegagalan akibat izin yang tidak mencukupi.

  • Konfigurasikan daftar putih alamat IP. Untuk informasi selengkapnya, lihat Konfigurasi daftar putih alamat IP untuk ApsaraDB RDS for MySQL.

PolarDB for MySQL

  • Lakukan probe jaringan ke Realtime Compute for Apache Flink untuk memastikan konektivitas jaringan.

  • Versi MySQL yang didukung: 5.6, 5.7, atau 8.0.x.

  • Aktifkan binary logging. (Dinonaktifkan secara default.)

  • Atur format log biner ke ROW. (ROW adalah format default.)

  • Atur binlog_row_image ke FULL. (FULL adalah pengaturan default.)

  • Nonaktifkan Binary Log Transaction Compression. (Diperkenalkan di MySQL 8.0.20 dan seterusnya. Dinonaktifkan secara default.)

  • Buat pengguna MySQL dan berikan izin SELECT, SHOW DATABASES, REPLICATION SLAVE, dan REPLICATION CLIENT.

  • Buat database dan tabel MySQL. Untuk informasi selengkapnya, lihat Buat database dan akun untuk PolarDB for MySQL. Gunakan akun istimewa untuk membuat database guna mencegah kegagalan akibat izin yang tidak mencukupi.

  • Konfigurasikan daftar putih alamat IP. Untuk informasi selengkapnya, lihat Konfigurasi daftar putih alamat IP untuk PolarDB for MySQL.

Database MySQL yang dikelola sendiri

  • Lakukan probe jaringan pada Realtime Compute for Apache Flink untuk memastikan konektivitas jaringan.

  • Versi MySQL yang didukung: 5.6, 5.7, atau 8.0.x.

  • Aktifkan binary logging. (Dinonaktifkan secara default.)

  • Atur format log biner ke ROW. (STATEMENT adalah format default.)

  • Atur binlog_row_image ke FULL. (FULL adalah pengaturan default.)

  • Nonaktifkan Binary Log Transaction Compression. (Diperkenalkan di MySQL 8.0.20 dan seterusnya. Dinonaktifkan secara default.)

  • Buat pengguna MySQL dan berikan izin SELECT, SHOW DATABASES, REPLICATION SLAVE, dan REPLICATION CLIENT.

  • Buat database dan tabel MySQL. Untuk informasi selengkapnya, lihat Buat database dan akun untuk database MySQL yang dikelola sendiri. Gunakan akun istimewa untuk membuat database guna mencegah kegagalan akibat izin yang tidak mencukupi.

  • Konfigurasikan daftar putih alamat IP. Untuk informasi selengkapnya, lihat Konfigurasi daftar putih alamat IP untuk database MySQL yang dikelola sendiri.

Batasan

Batasan umum

  • Tabel sumber CDC MySQL saat ini tidak mendukung pendefinisian watermark.

  • Pada pekerjaan CTAS dan CDAS, tabel sumber CDC MySQL dapat menyinkronkan beberapa perubahan skema. Untuk detail jenis perubahan yang didukung, lihat Kebijakan sinkronisasi evolusi skema.

  • Konektor CDC MySQL tidak mendukung Binary Log Transaction Compression. Saat mengonsumsi data inkremental, pastikan fitur ini dinonaktifkan. Jika tidak, data inkremental mungkin gagal dimuat.

ApsaraDB RDS for MySQL batasan

  • Jangan membaca data dari instans standby atau instans hanya baca ApsaraDB RDS for MySQL. Log biner pada instans ini disimpan dalam periode singkat secara default. Jika log biner kedaluwarsa dan dibersihkan, pekerjaan mungkin gagal mengonsumsinya.

  • ApsaraDB RDS for MySQL mengaktifkan replikasi paralel secara default tetapi tidak menjamin konsistensi urutan transaksi antara instans primer dan sekunder. Setelah alih bencana primer/sekunder, pemulihan checkpoint mungkin melewatkan beberapa data. Untuk menghindari hal ini, aktifkan opsi slave_preserve_commit_order di ApsaraDB RDS for MySQL.

PolarDB for MySQL batasan

Tabel sumber CDC MySQL tidak mendukung pembacaan log biner dari kluster multi-master di PolarDB for MySQL versi 1.0.19 dan sebelumnya. (Apa itu kluster multi-master?) Log biner dari kluster ini mungkin berisi ID tabel duplikat, yang dapat menyebabkan kesalahan pemetaan skema dan kegagalan penguraian.

MySQL open-source batasan

Secara default, MySQL mempertahankan urutan transaksi selama replikasi log biner primer-replika. Jika replikasi paralel diaktifkan pada replika MySQL (slave_parallel_workers > 1) tetapi slave_preserve_commit_order = ON tidak diaktifkan, urutan commit transaksi mungkin tidak sesuai dengan database primer. Perilaku commit yang tidak berurutan ini dapat menyebabkan kehilangan data ketika Flink CDC melanjutkan dari checkpoint. Untuk mencegah masalah ini, kami merekomendasikan menyetel slave_preserve_commit_order = ON pada replika MySQL. Atau, Anda dapat menyetel slave_parallel_workers = 1, yang mengorbankan kinerja replikasi.

Catatan penting

  • Setiap sumber data CDC MySQL harus memiliki ID server yang unik.

    Tujuan ID server

    Setiap sumber data CDC MySQL harus memiliki ID server yang unik. Jika beberapa sumber data berbagi ID server yang sama dan tidak dapat menggunakan koneksi ulang, offset log biner mungkin menjadi tidak konsisten, menyebabkan data hilang atau duplikat.

    Konfigurasi ID server untuk skenario berbeda

    Anda dapat menentukan ID server dalam pernyataan DDL, tetapi kami merekomendasikan menggunakan petunjuk dinamis daripada parameter DDL.

    • Tingkat paralelisme = 1 atau snapshot inkremental dinonaktifkan

      ## Tentukan ID server spesifik saat snapshot inkremental dinonaktifkan atau tingkat paralelisme adalah 1.
      SELECT * FROM source_table /*+ OPTIONS('server-id'='123456') */ ;
    • Tingkat paralelisme > 1 dan snapshot inkremental diaktifkan

      ## Tentukan rentang ID server. Pastikan jumlah ID yang tersedia dalam rentang tersebut minimal sama dengan tingkat paralelisme. Misalnya, jika tingkat paralelisme adalah 3:
      SELECT * FROM source_table /*+ OPTIONS('server-id'='123456-123458') */ ;
    • Sinkronisasi data menggunakan CTAS

      Saat menyinkronkan data menggunakan CTAS, jika sumber data CDC memiliki konfigurasi identik, sumber data tersebut akan digunakan ulang secara otomatis. Dalam kasus ini, Anda dapat memberikan ID server yang sama untuk beberapa sumber data CDC. Untuk informasi selengkapnya, lihat Contoh 4: Beberapa pernyataan CTAS.

    • Beberapa tabel sumber non-CTAS yang tidak dapat digunakan ulang

      Jika pekerjaan Anda berisi beberapa tabel sumber CDC MySQL dan tidak menggunakan CTAS, sumber data tidak dapat digunakan ulang. Tetapkan ID server unik untuk setiap tabel sumber CDC. Demikian pula, jika snapshot inkremental diaktifkan dan tingkat paralelisme lebih besar dari 1, tentukan rentang ID server.

      select * from 
        source_table1 /*+ OPTIONS('server-id'='123456-123457') */
      left join 
        source_table2 /*+ OPTIONS('server-id'='123458-123459') */
      on source_table1.id=source_table2.id;
  • Tabel sink

    • Primary key auto-increment tidak dideklarasikan dalam DDL. MySQL mengisinya secara otomatis selama penulisan data.

    • Minimal satu bidang non-primary-key harus dideklarasikan. Jika tidak, terjadi kesalahan.

    • NOT ENFORCED dalam DDL berarti Flink tidak menegakkan validasi primary key. Anda harus memastikan kebenaran dan integritas primary key. Untuk informasi selengkapnya, lihat Pemeriksaan validitas.

  • Tabel dimensi

    Untuk mempercepat kueri menggunakan indeks, bidang kondisi JOIN harus sesuai dengan urutan definisi indeks (aturan prefiks paling kiri). Misalnya, jika indeks adalah (a, b, c), kondisi JOIN harus ON t.a = x AND t.b = y.

    SQL yang dihasilkan Flink mungkin ditulis ulang oleh pengoptimal, sehingga mencegah penggunaan indeks. Untuk memastikan apakah indeks digunakan, periksa rencana eksekusi (EXPLAIN) atau log kueri lambat di MySQL untuk pernyataan SELECT yang sebenarnya dieksekusi.

SQL

Anda dapat menggunakan konektor MySQL dalam pekerjaan SQL sebagai tabel sumber, tabel dimensi, atau tabel sink.

Sintaks

CREATE TEMPORARY TABLE mysqlcdc_source (
   order_id INT,
   order_date TIMESTAMP(0),
   customer_name STRING,
   price DECIMAL(10, 5),
   product_id INT,
   order_status BOOLEAN,
   PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
  'connector' = 'mysql',
  'hostname' = '<yourHostname>',
  'port' = '3306',
  'username' = '<yourUsername>',
  'password' = '<yourPassword>',
  'database-name' = '<yourDatabaseName>',
  'table-name' = '<yourTableName>'
);

Catatan
  • Cara konektor menulis ke tabel sink: Setiap catatan masuk diubah menjadi pernyataan INSERT dan dieksekusi. SQL yang tepat tergantung pada skenario:

    • Untuk tabel sink tanpa primary key, konektor mengeksekusi INSERT INTO table_name (column1, column2, ...) VALUES (value1, value2, ...);.

    • Untuk tabel sink dengan primary key, konektor mengeksekusi INSERT INTO table_name (column1, column2, ...) VALUES (value1, value2, ...) ON DUPLICATE KEY UPDATE column1 = VALUES(column1), column2 = VALUES(column2), ...;. Catatan: Jika tabel fisik memiliki kendala indeks unik selain primary key, memasukkan dua catatan dengan primary key berbeda tetapi nilai indeks unik identik menyebabkan kehilangan data akibat konflik.

  • Jika database MySQL Anda mendefinisikan primary key auto-increment, jangan deklarasikan dalam DDL Flink. MySQL mengisi bidang ini secara otomatis selama penulisan data. Konektor mendukung penulisan dan penghapusan data dengan kolom auto-increment tetapi tidak mendukung pembaruan.

dengan parameter

  • Parameter umum

    Parameter

    Deskripsi

    Wajib

    Tipe data

    Nilai default

    Catatan

    connector

    Jenis tabel.

    Ya

    STRING

    Tidak ada

    Saat digunakan sebagai tabel sumber, Anda dapat menentukan mysql-cdc atau mysql. Keduanya setara. Saat digunakan sebagai tabel dimensi atau sink, nilainya tetap mysql.

    hostname

    Alamat IP atau hostname database MySQL.

    Ya

    STRING

    Tidak ada

    Kami merekomendasikan menentukan titik akhir virtual private cloud (VPC).

    Catatan

    Jika MySQL dan Realtime Compute for Apache Flink tidak berada dalam VPC yang sama, buat koneksi lintas-VPC atau gunakan titik akhir publik. Untuk informasi selengkapnya, lihat Manajemen dan operasi penyimpanan dan Bagaimana cara mengakses Internet dari kluster Flink yang sepenuhnya dikelola?.

    username

    Nama pengguna untuk layanan database MySQL.

    Ya

    STRING

    Tidak ada

    Tidak ada.

    password

    Kata sandi untuk layanan database MySQL.

    Ya

    STRING

    Tidak ada

    Tidak ada.

    database-name

    Nama database MySQL.

    Ya

    STRING

    Tidak ada

    • Saat digunakan sebagai tabel sumber, nama database mendukung ekspresi reguler untuk membaca data dari beberapa database.

    • Saat menggunakan ekspresi reguler, hindari penggunaan karakter ^ dan $ untuk mencocokkan awal dan akhir string. Lihat catatan untuk table-name untuk detailnya.

    table-name

    Nama tabel MySQL.

    Ya

    STRING

    Tidak ada

    • Saat digunakan sebagai tabel sumber, nama tabel mendukung ekspresi reguler untuk membaca data dari beberapa tabel.

      Saat membaca beberapa tabel MySQL, kirim beberapa pernyataan CTAS sebagai satu Pekerjaan untuk menghindari peluncuran beberapa Pendengar log biner. Hal ini meningkatkan performa dan efisiensi. Untuk informasi selengkapnya, lihat Beberapa pernyataan CTAS: Kirim sebagai satu Pekerjaan.

    • Saat menggunakan ekspresi reguler, hindari penggunaan karakter ^ dan $ untuk mencocokkan awal dan akhir string. Lihat catatan di bawah untuk detailnya.

    Catatan

    Saat mencocokkan nama tabel menggunakan ekspresi reguler, tabel sumber CDC MySQL menggabungkan nilai database-name dan table-name dengan string \\. (atau . untuk VVR 8.0.1 dan sebelumnya) untuk membentuk ekspresi reguler jalur lengkap. Ekspresi ini kemudian dicocokkan dengan nama tabel lengkap di MySQL.

    Misalnya, jika Anda mengonfigurasi 'database-name'='db_.*' dan 'table-name'='tb_.+', konektor menggunakan ekspresi reguler db_.*\\.tb_.+ (atau db_.*.tb_.+ untuk VVR 8.0.1 dan sebelumnya) untuk menentukan tabel mana yang akan dibaca.

    port

    Nomor port layanan database MySQL.

    Tidak

    INTEGER

    3306

    Tidak ada.

  • Hanya untuk tabel sumber

    Parameter

    Deskripsi

    Wajib

    Tipe data

    Nilai default

    Catatan

    server-id

    ID numerik untuk klien database.

    Tidak

    STRING

    Nilai acak antara 5400 dan 6400 dihasilkan secara default.

    ID ini harus unik secara global di seluruh kluster MySQL. Kami merekomendasikan memberikan ID berbeda untuk setiap pekerjaan yang mengakses database yang sama.

    Parameter ini juga mendukung format rentang, seperti 5400-5408. Saat pembacaan inkremental diaktifkan, beberapa pembaca konkuren didukung. Dalam kasus ini, tentukan rentang agar setiap pembaca menggunakan ID unik. Untuk informasi selengkapnya, lihat Menggunakan ID server.

    scan.incremental.snapshot.enabled

    Apakah akan mengaktifkan snapshot inkremental.

    Tidak

    BOOLEAN

    true

    Snapshot inkremental diaktifkan secara default. Snapshot inkremental adalah mekanisme baru untuk membaca snapshot data lengkap. Dibandingkan dengan snapshot tradisional, snapshot inkremental menawarkan beberapa keunggulan:

    • Pembacaan konkuren data lengkap.

    • Checkpointing pada level chunk selama pembacaan data lengkap.

    • Tidak memerlukan penguncian baca global (FLUSH TABLES WITH READ LOCK) selama pembacaan data lengkap.

    Jika Anda ingin sumber mendukung pembacaan konkuren, setiap pembaca memerlukan ID server unik. Oleh karena itu, server-id harus berupa rentang seperti 5400-6400, dan ukuran rentang harus minimal sama dengan tingkat paralelisme.

    Catatan

    Konfigurasi ini dihapus di VVR 11.1 dan seterusnya.

    scan.incremental.snapshot.chunk.size

    Jumlah baris per chunk.

    Tidak

    INTEGER

    8096

    Saat snapshot inkremental diaktifkan, tabel dibagi menjadi chunk untuk dibaca. Data dari setiap chunk di-cache dalam memori hingga chunk tersebut sepenuhnya dibaca.

    Ukuran chunk yang lebih kecil meningkatkan jumlah total chunk. Meskipun ini mengurangi granularitas pemulihan, hal ini dapat menyebabkan kesalahan kehabisan memori (OOM) dan mengurangi throughput keseluruhan. Pertimbangkan pertukaran ini saat memilih ukuran chunk.

    scan.snapshot.fetch.size

    Jumlah maksimum catatan yang diambil per bacaan saat memindai data tabel lengkap.

    Tidak

    INTEGER

    1024

    Tidak ada.

    scan.startup.mode

    Mode startup untuk mengonsumsi data.

    Tidak

    STRING

    initial

    Nilai yang valid:

    • initial (default): Saat startup pertama, memindai data historis lengkap lalu membaca data log biner terbaru.

    • latest-offset: Saat startup pertama, melewati pemindaian data lengkap dan mulai membaca dari akhir log biner (posisi terbaru). Hanya membaca perubahan yang terjadi setelah konektor dimulai.

    • earliest-offset: Melewati pemindaian data lengkap dan mulai membaca dari posisi log biner paling awal yang tersedia.

    • specific-offset: Melewati pemindaian data lengkap dan mulai membaca dari offset log biner tertentu. Tentukan offset menggunakan scan.startup.specific-offset.file dan scan.startup.specific-offset.pos, atau gunakan scan.startup.specific-offset.gtid-set untuk memulai dari set GTID.

    • timestamp: Melewati pemindaian data lengkap dan mulai membaca dari timestamp tertentu. Tentukan timestamp menggunakan scan.startup.timestamp-millis, dalam milidetik.

    Penting

    Saat menggunakan earliest-offset, specific-offset, atau timestamp, pastikan skema tabel tetap tidak berubah antara posisi log biner yang ditentukan dan startup pekerjaan. Perubahan skema dapat menyebabkan kesalahan.

    scan.startup.specific-offset.file

    Nama file log biner untuk offset startup tertentu.

    Tidak

    STRING

    Tidak ada

    Saat menggunakan konfigurasi ini, scan.startup.mode harus diatur ke specific-offset. Contoh nama file: mysql-bin.000003.

    scan.startup.specific-offset.pos

    Offset dalam file log biner yang ditentukan untuk posisi startup.

    Tidak

    INTEGER

    Tidak ada

    Saat menggunakan konfigurasi ini, scan.startup.mode harus diatur ke specific-offset.

    scan.startup.specific-offset.gtid-set

    Set GTID untuk posisi startup.

    Tidak

    STRING

    Tidak ada

    Saat menggunakan konfigurasi ini, scan.startup.mode harus diatur ke specific-offset. Contoh set GTID: 24DA167-0C0C-11E8-8442-00059A3C7B00:1-19.

    scan.startup.timestamp-millis

    Timestamp startup dalam milidetik.

    Tidak

    LONG

    Tidak ada

    Saat menggunakan konfigurasi ini, scan.startup.mode harus diatur ke timestamp. Satuan timestamp adalah milidetik.

    Penting

    Saat menggunakan timestamp, CDC MySQL mencoba membaca event awal dari setiap file log biner untuk menentukan timestamp-nya dan menemukan file yang sesuai. Pastikan timestamp yang ditentukan sesuai dengan file log biner yang ada dan dapat dibaca di database.

    server-time-zone

    Zona waktu sesi yang digunakan oleh database.

    Tidak

    STRING

    Jika Anda tidak menentukan parameter ini, sistem menggunakan zona waktu lingkungan runtime pekerjaan Flink (zona waktu zona yang dipilih) sebagai zona waktu server database.

    Misalnya, Asia/Shanghai. Parameter ini mengontrol bagaimana tipe TIMESTAMP MySQL dikonversi ke tipe STRING. Untuk informasi selengkapnya, lihat Nilai temporal Debezium.

    debezium.min.row.count.to.stream.results

    Saat jumlah baris tabel melebihi nilai ini, gunakan mode pembacaan batch.

    Tidak

    INTEGER

    1000

    Flink membaca data dari tabel sumber MySQL sebagai berikut:

    • Pembacaan lengkap: Memuat seluruh tabel ke memori. Cepat tetapi intensif memori. Tabel besar berisiko mengalami kesalahan OOM.

    • Pembacaan batch: Membaca data dalam batch. Hemat memori tetapi lebih lambat untuk tabel besar.

    connect.timeout

    Waktu maksimum menunggu sebelum mencoba koneksi ulang setelah timeout saat menghubungkan ke server database MySQL.

    Tidak

    DURATION

    30s

    Tidak ada.

    connect.max-retries

    Jumlah maksimum percobaan ulang setelah koneksi ke layanan database MySQL gagal.

    Tidak

    INTEGER

    3

    Tidak ada.

    connection.pool.size

    Ukuran kolam koneksi database.

    Tidak

    INTEGER

    20

    Kolam koneksi database menggunakan kembali koneksi untuk mengurangi jumlah koneksi database.

    jdbc.properties.*

    Parameter koneksi kustom untuk URL JDBC.

    Tidak

    STRING

    Tidak ada

    Anda dapat meneruskan parameter koneksi kustom. Misalnya, untuk menonaktifkan SSL, atur 'jdbc.properties.useSSL' = 'false'.

    Untuk parameter koneksi yang didukung, lihat Properti Konfigurasi MySQL.

    debezium.*

    Parameter Debezium kustom untuk membaca log biner.

    Tidak

    STRING

    Tidak ada

    Anda dapat meneruskan parameter Debezium kustom. Misalnya, gunakan 'debezium.event.deserialization.failure.handling.mode'='ignore' untuk menentukan cara menangani kesalahan penguraian.

    heartbeat.interval

    Interval di mana event heartbeat memajukan offset log biner di sumber.

    Tidak

    DURATION

    30s

    Event heartbeat memajukan offset log biner di sumber. Ini berguna untuk tabel dengan pembaruan jarang. Tanpa heartbeat, offset log biner mungkin macet, menyebabkan kedaluwarsa dan kegagalan pekerjaan. Heartbeat mencegah masalah ini.

    scan.incremental.snapshot.chunk.key-column

    Tentukan kolom untuk digunakan dalam membagi chunk selama fase snapshot.

    Lihat Catatan.

    STRING

    Tidak ada

    • Wajib untuk tabel tanpa primary key. Kolom yang dipilih harus non-null (NOT NULL).

    • Opsional untuk tabel dengan primary key. Hanya kolom dari primary key yang didukung.

    rds.region-id

    ID wilayah instans Alibaba Cloud ApsaraDB RDS for MySQL.

    Wajib saat membaca log arsip dari OSS.

    STRING

    Tidak ada

    Untuk ID wilayah, lihat Wilayah dan zona.

    rds.access-key-id

    ID AccessKey untuk akun Alibaba Cloud ApsaraDB RDS for MySQL.

    Wajib saat membaca log arsip dari OSS.

    STRING

    Tidak ada

    Untuk informasi selengkapnya, lihat Bagaimana cara melihat ID AccessKey dan rahasia AccessKey saya?.

    Penting

    Untuk mencegah eksposur informasi AccessKey Anda, kami merekomendasikan menggunakan manajemen kunci untuk mengonfigurasi ID AccessKey. Untuk informasi selengkapnya, lihat Manajemen variabel.

    rds.access-key-secret

    Rahasia AccessKey untuk akun Alibaba Cloud ApsaraDB RDS for MySQL.

    Wajib saat membaca log arsip dari OSS.

    STRING

    Tidak ada

    Untuk informasi selengkapnya, lihat Bagaimana cara melihat ID AccessKey dan rahasia AccessKey saya?.

    Penting

    Untuk mencegah eksposur informasi AccessKey Anda, kami merekomendasikan menggunakan manajemen kunci untuk mengonfigurasi rahasia AccessKey. Untuk informasi selengkapnya, lihat Manajemen variabel.

    rds.db-instance-id

    ID instans Alibaba Cloud ApsaraDB RDS for MySQL.

    Wajib saat membaca log arsip dari OSS.

    STRING

    Tidak ada

    Tidak ada.

    rds.main-db-id

    ID database primer instans Alibaba Cloud ApsaraDB RDS for MySQL.

    Tidak

    STRING

    Tidak ada

    rds.download.timeout

    Timeout untuk mengunduh satu log arsip dari OSS.

    Tidak

    DURATION

    60s

    Tidak ada.

    rds.endpoint

    Titik akhir untuk mengakses informasi log biner OSS.

    Tidak

    STRING

    Tidak ada

    scan.incremental.close-idle-reader.enabled

    Apakah akan menutup pembaca idle setelah penyelesaian snapshot.

    Tidak

    BOOLEAN

    false

    • Didukung hanya di VVR 8.0.1 dan seterusnya.

    • Pengaturan ini memerlukan execution.checkpointing.checkpoints-after-tasks-finish.enabled diatur ke true.

    scan.read-changelog-as-append-only.enabled

    Apakah akan mengonversi aliran data changelog menjadi aliran data append-only.

    Tidak

    BOOLEAN

    false

    Nilai yang valid:

    • true: Semua jenis pesan (INSERT, DELETE, UPDATE_BEFORE, UPDATE_AFTER) dikonversi menjadi pesan INSERT. Gunakan hanya dalam kasus khusus, seperti mempertahankan pesan penghapusan tabel upstream.

    • false (default): Semua jenis pesan diteruskan tanpa perubahan.

    Catatan

    Didukung hanya di VVR 8.0.8 dan seterusnya.

    scan.only.deserialize.captured.tables.changelog.enabled

    Selama fase inkremental, apakah hanya mendeserialisasi event perubahan untuk tabel yang ditentukan.

    Tidak

    BOOLEAN

    • Default adalah false di VVR 8.x.

    • Default adalah true di VVR 11.1 dan seterusnya.

    Nilai yang valid:

    • true: Hanya mendeserialisasi data perubahan untuk tabel target untuk mempercepat pembacaan log biner.

    • false (default): Mendeserialisasi data perubahan untuk semua tabel.

    Catatan
    • Didukung hanya di VVR 8.0.7 dan seterusnya.

    • Di VVR 8.0.8 dan sebelumnya, gunakan debezium.scan.only.deserialize.captured.tables.changelog.enable.

    scan.parse.online.schema.changes.enabled

    Selama fase inkremental, apakah mencoba mengurai event perubahan DDL lockless RDS.

    Tidak

    BOOLEAN

    false

    Nilai yang valid:

    • true: Mengurai event perubahan DDL lockless RDS.

    • false (default): Tidak mengurai event perubahan DDL lockless RDS.

    Fitur eksperimental. Sebelum melakukan perubahan lockless online, ambil snapshot pekerjaan Flink untuk pemulihan.

    Catatan

    Didukung hanya di VVR 11.1 dan seterusnya.

    scan.incremental.snapshot.backfill.skip

    Apakah akan melewati backfill selama pembacaan snapshot.

    Tidak

    BOOLEAN

    false

    Nilai yang valid:

    • true: Melewati backfill selama pembacaan snapshot.

    • false (default): Tidak melewati backfill selama pembacaan snapshot.

    Jika backfill dilewati, perubahan yang terjadi selama fase snapshot dibaca dalam fase inkremental berikutnya daripada digabungkan ke dalam snapshot.

    Penting

    Melewati backfill dapat menyebabkan inkonsistensi data karena perubahan yang terjadi selama fase snapshot mungkin diputar ulang. Ini hanya memberikan semantik at-least-once.

    Catatan

    Didukung hanya di VVR 11.1 dan seterusnya.

    scan.incremental.snapshot.unbounded-chunk-first.enabled

    Apakah akan mendistribusikan chunk tak terbatas terlebih dahulu selama pembacaan snapshot.

    Tidak

    BOOELEAN

    false

    Nilai yang valid:

    • true: Mendistribusikan chunk tak terbatas terlebih dahulu selama pembacaan snapshot.

    • false (default): Tidak mendistribusikan chunk tak terbatas terlebih dahulu selama pembacaan snapshot.

    Fitur eksperimental. Mengaktifkan ini mengurangi risiko kesalahan kehabisan memori (OOM) untuk TaskManager selama sinkronisasi chunk terakhir dalam fase snapshot. Kami merekomendasikan mengaktifkannya sebelum startup pekerjaan pertama.

    Catatan

    Didukung hanya di VVR 11.1 dan seterusnya.

    binlog.session.network.timeout

    Timeout jaringan untuk koneksi log biner.

    Tidak

    DURATION

    10m

    Menyetel ini ke 0s menggunakan timeout default server MySQL.

    Catatan

    Didukung hanya di VVR 11.5 dan seterusnya.

    scan.rate-limit.records-per-second

    Batasi jumlah maksimum catatan yang dipancarkan per detik oleh sumber.

    Tidak

    LONG

    Tidak ada

    Berguna untuk membatasi konsumsi data. Berlaku untuk fase lengkap dan inkremental.

    Metrik numRecordsOutPerSecond mencerminkan total catatan yang dipancarkan per detik. Sesuaikan parameter ini berdasarkan metrik tersebut.

    Selama pembacaan lengkap, kurangi ukuran batch untuk melengkapi batasan ini. Turunkan nilai parameter scan.incremental.snapshot.chunk.size.

    Catatan

    Didukung hanya di VVR 11.5 dan seterusnya.

    scan.binlog.tolerate.gtid-holes

    Mengaktifkan parameter ini mengabaikan celah dalam urutan GTID, memungkinkan pekerjaan melewati event diskontinu dan terus berjalan.

    Tidak

    BOOLEAN

    false

    Sebelum mengaktifkan parameter ini, pastikan offset startup pekerjaan belum kedaluwarsa. Jika pekerjaan dimulai dari offset GTID yang dibersihkan atau kedaluwarsa, mesin diam-diam melewati log yang hilang, mengakibatkan kehilangan data.

    Catatan

    Didukung hanya di VVR 11.6 dan seterusnya.

  • Parameter khusus tabel dimensi

    Parameter

    Deskripsi

    Wajib

    Tipe data

    Nilai default

    Catatan

    url

    URL JDBC MySQL.

    Tidak

    STRING

    Tidak ada

    Format URL adalah: jdbc:mysql://<endpoint>:<port>/<database-name>.

    lookup.max-retries

    Jumlah maksimum percobaan ulang setelah operasi baca gagal.

    Tidak

    INTEGER

    3

    Didukung hanya di VVR 6.0.7 dan seterusnya.

    lookup.cache.strategy

    Kebijakan cache.

    Tidak

    STRING

    Tidak ada

    Kebijakan yang didukung: None, LRU, dan ALL. Untuk deskripsi, lihat Informasi latar belakang.

    Catatan

    Saat menggunakan kebijakan cache LRU, Anda juga harus mengonfigurasi parameter lookup.cache.max-rows.

    lookup.cache.max-rows

    Jumlah maksimum baris yang di-cache.

    Tidak

    INTEGER

    100000

    • Saat kebijakan cache least recently used dipilih, ukuran cache harus diatur.

    • Opsional saat menggunakan kebijakan cache ALL.

    lookup.cache.ttl

    Waktu hidup (TTL) cache.

    Tidak

    DURATION

    10 s

    Pengaturan lookup.cache.ttl tergantung pada lookup.cache.strategy:

    • Jika lookup.cache.strategy adalah None, lookup.cache.ttl opsional dan menunjukkan tidak ada TTL.

    • Jika lookup.cache.strategy adalah LRU, lookup.cache.ttl adalah TTL cache. Default adalah tidak kedaluwarsa.

    • Jika lookup.cache.strategy adalah ALL, lookup.cache.ttl adalah interval reload cache. Default adalah tidak reload.

    Tentukan waktu dalam format seperti 1min atau 10s.

    lookup.max-join-rows

    Jumlah maksimum hasil yang dikembalikan saat menggabungkan setiap baris dari tabel utama dengan tabel dimensi.

    Tidak

    INTEGER

    1024

    Tidak ada.

    lookup.filter-push-down.enabled

    Apakah akan mengaktifkan filter pushdown untuk tabel dimensi.

    Tidak

    BOOLEAN

    false

    Nilai yang valid:

    • true: Aktifkan filter pushdown. Tabel dimensi memfilter data lebih awal berdasarkan kondisi yang ditentukan dalam pekerjaan SQL.

    • false (default): Nonaktifkan filter pushdown. Tabel dimensi memuat semua data.

    Catatan

    Didukung hanya di VVR 8.0.7 dan seterusnya.

    Penting

    Aktifkan filter pushdown hanya saat tabel Flink digunakan sebagai tabel dimensi. Tabel sumber MySQL tidak mendukung filter pushdown. Jika tabel Flink digunakan sebagai tabel sumber dan dimensi, dan filter pushdown diaktifkan untuk tabel dimensi, atur eksplisit konfigurasi ini ke false menggunakan petunjuk SQL saat menggunakannya sebagai tabel sumber. Jika tidak, eksekusi pekerjaan mungkin gagal.

  • Hanya untuk tabel sink

    Parameter

    Deskripsi

    Wajib

    Tipe data

    Nilai default

    Catatan

    url

    URL JDBC MySQL.

    Tidak

    STRING

    Tidak ada

    Format URL adalah: jdbc:mysql://<endpoint>:<port>/<database-name>.

    sink.max-retries

    Jumlah maksimum percobaan ulang setelah operasi tulis gagal.

    Tidak

    INTEGER

    3

    Tidak ada.

    sink.buffer-flush.batch-size

    Jumlah catatan yang ditulis dalam satu batch.

    Tidak

    INTEGER

    4096

    Tidak ada.

    sink.buffer-flush.max-rows

    Jumlah baris yang di-cache dalam memori.

    Tidak

    INTEGER

    10000

    Parameter ini hanya berlaku saat primary key ditentukan.

    sink.buffer-flush.interval

    Interval untuk flush buffer. Jika data yang di-buffer menunggu lebih lama dari interval ini tanpa memenuhi kondisi flush, sistem secara otomatis flush semua data yang di-buffer.

    Tidak

    DURATION

    1s

    Tidak ada.

    sink.ignore-delete

    Apakah akan mengabaikan operasi DELETE.

    Tidak

    BOOLEAN

    false

    Saat aliran SQL Flink berisi catatan DELETE atau UPDATE_BEFORE, pembaruan simultan ke bidang berbeda dari tabel yang sama oleh beberapa tugas dapat menyebabkan inkonsistensi data.

    Misalnya, jika catatan dihapus dan tugas lain hanya memperbarui beberapa bidang, bidang yang tidak diperbarui menjadi null atau nilai default, menyebabkan kesalahan data.

    Menyetel sink.ignore-delete ke true mengabaikan operasi DELETE dan UPDATE_BEFORE upstream, menghindari masalah ini.

    Catatan
    • UPDATE_BEFORE adalah bagian dari mekanisme retract Flink, digunakan untuk "menarik kembali" nilai lama selama pembaruan.

    • Saat ignoreDelete = true, semua catatan DELETE dan UPDATE_BEFORE dilewati. Hanya catatan INSERT dan UPDATE_AFTER yang diproses.

    sink.ignore-null-when-update

    Saat memperbarui data, apakah akan memperbarui bidang ke null atau melewati pembaruan jika nilai input adalah null.

    Tidak

    BOOLEAN

    false

    Nilai yang valid:

    • true: Lewati pembaruan bidang. Didukung hanya saat tabel Flink memiliki primary key. Saat diatur ke true:

      • Di VVR 8.0.6 dan sebelumnya, penulisan batch tidak didukung untuk tabel sink.

      • Di VVR 8.0.7 dan seterusnya, penulisan batch didukung untuk tabel sink.

        Penulisan batch meningkatkan efisiensi dan throughput penulisan tetapi memperkenalkan latensi dan risiko OOM. Pertimbangkan pertukaran ini berdasarkan kebutuhan bisnis Anda.

    • false: Perbarui bidang ke null.

    Catatan

    Didukung hanya di VVR 8.0.5 dan seterusnya.

Pemetaan tipe data

  • Tabel sumber CDC

    Tipe bidang CDC MySQL

    Tipe bidang Flink

    TINYINT

    TINYINT

    SMALLINT

    SMALLINT

    TINYINT UNSIGNED

    TINYINT UNSIGNED ZEROFILL

    INT

    INT

    MEDIUMINT

    SMALLINT UNSIGNED

    SMALLINT UNSIGNED ZEROFILL

    BIGINT

    BIGINT

    INT UNSIGNED

    INT UNSIGNED ZEROFILL

    MEDIUMINT UNSIGNED

    MEDIUMINT UNSIGNED ZEROFILL

    BIGINT UNSIGNED

    DECIMAL(20, 0)

    BIGINT UNSIGNED ZEROFILL

    SERIAL

    FLOAT [UNSIGNED] [ZEROFILL]

    FLOAT

    DOUBLE [UNSIGNED] [ZEROFILL]

    DOUBLE

    DOUBLE PRECISION [UNSIGNED] [ZEROFILL]

    REAL [UNSIGNED] [ZEROFILL]

    NUMERIC(p, s) [UNSIGNED] [ZEROFILL]

    DECIMAL(p, s)

    DECIMAL(p, s) [UNSIGNED] [ZEROFILL]

    BOOLEAN

    BOOLEAN

    TINYINT(1)

    DATE

    DATE

    TIME [(p)]

    TIME [(p)] [WITHOUT TIME ZONE]

    DATETIME [(p)]

    TIMESTAMP [(p)] [WITHOUT TIME ZONE]

    TIMESTAMP [(p)]

    TIMESTAMP [(p)]

    TIMESTAMP [(p)] WITH LOCAL TIME ZONE

    CHAR(n)

    STRING

    VARCHAR(n)

    TEXT

    BINARY

    BYTES

    VARBINARY

    BLOB

    Penting

    Jangan gunakan tipe TINYINT(1) di MySQL untuk menyimpan nilai selain 0 dan 1. Saat property-version diatur ke 0, tabel sumber CDC MySQL memetakan TINYINT(1) ke tipe BOOLEAN Flink secara default. Hal ini dapat menyebabkan data tidak akurat. Untuk menggunakan tipe TINYINT(1) untuk menyimpan nilai selain 0 dan 1, lihat parameter konfigurasi catalog.table.treat-tinyint1-as-boolean.

  • Tabel dimensi dan sink

    Tipe bidang MySQL

    Tipe bidang Flink

    TINYINT

    TINYINT

    SMALLINT

    SMALLINT

    TINYINT UNSIGNED

    INT

    INT

    MEDIUMINT

    SMALLINT UNSIGNED

    BIGINT

    BIGINT

    INT UNSIGNED

    BIGINT UNSIGNED

    DECIMAL(20, 0)

    FLOAT

    FLOAT

    DOUBLE

    DOUBLE

    DOUBLE PRECISION

    NUMERIC(p, s)

    DECIMAL(p, s)

    Catatan

    dengan p <= 38.

    DECIMAL(p, s)

    BOOLEAN

    BOOLEAN

    TINYINT(1)

    DATE

    DATE

    TIME [(p)]

    TIME [(p)] [WITHOUT TIME ZONE]

    DATETIME [(p)]

    TIMESTAMP [(p)] [WITHOUT TIME ZONE]

    TIMESTAMP [(p)]

    CHAR(n)

    CHAR(n)

    VARCHAR(n)

    VARCHAR(n)

    BIT(n)

    BINARY(⌈n/8⌉)

    BINARY(n)

    BINARY(n)

    VARBINARY(N)

    VARBINARY(N)

    TINYTEXT

    STRING

    TEXT

    MEDIUMTEXT

    LONGTEXT

    TINYBLOB

    BYTES

    Penting

    Flink mendukung catatan BLOB MySQL hingga 2.147.483.647 byte (231 − 1).

    BLOB

    MEDIUMBLOB

    LONGBLOB

Ingesti Data

Konektor MySQL dapat digunakan sebagai sumber data dalam pekerjaan ingesti data YAML.

Sintaks

source:
   type: mysql
   name: MySQL Source
   hostname: localhost
   port: 3306
   username: <username>
   password: <password>
   tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
   server-id: 5401-5404

sink:
  type: xxx

Konfigurasi

Parameter

Deskripsi

Wajib

Tipe data

Nilai default

Keterangan

type

Jenis sumber data.

Ya

STRING

Tidak ada

Nilainya tetap mysql.

name

Nama sumber data.

Tidak

STRING

Tidak ada

Tidak ada.

hostname

Alamat IP atau hostname database MySQL.

Ya

STRING

Tidak ada

Tentukan alamat virtual private cloud (VPC).

Catatan

Jika database MySQL dan layanan Flink real-time Anda tidak berada dalam VPC yang sama, Anda perlu terlebih dahulu membuat koneksi jaringan lintas-VPC atau mengakses database melalui jaringan publik. Untuk informasi selengkapnya, lihat Manajemen dan Operasi Penyimpanan dan Bagaimana kluster Flink yang sepenuhnya dikelola mengakses jaringan publik?.

username

Nama pengguna untuk layanan database MySQL.

Ya

STRING

Tidak ada

Tidak ada.

password

Kata sandi untuk layanan database MySQL.

Ya

STRING

Tidak ada

Tidak ada.

tables

Tabel MySQL yang akan disinkronkan.

Ya

STRING

Tidak ada

  • Nama tabel mendukung ekspresi reguler untuk membaca data dari beberapa tabel.

  • Anda dapat memisahkan beberapa ekspresi reguler dengan koma.

Catatan
  • Jangan gunakan anchor awal-string (^) dan akhir-string ($) dalam ekspresi reguler. Di versi 11.2, titik (.) memisahkan nama database dari nama tabel dalam ekspresi. Menggunakan anchor awal dan akhir membuat ekspresi untuk database tidak valid. Misalnya, ubah ^db.user_[0-9]+$ menjadi db.user_[0-9]+.

  • Titik (.) memisahkan nama database dan nama tabel. Untuk menggunakan titik untuk mencocokkan karakter apa pun, escape dengan backslash (\). Misalnya: db0.\.*, db1.user_table_[0-9]+, db[1-2].[app|web]order_\.*.

tables.exclude

Tabel yang dikecualikan dari sinkronisasi.

Tidak

STRING

Tidak ada

  • Nama tabel mendukung ekspresi reguler untuk mengecualikan data dari beberapa tabel.

  • Anda dapat memisahkan beberapa ekspresi reguler dengan koma.

Catatan

Titik (.) memisahkan nama database dan nama tabel. Untuk menggunakan titik untuk mencocokkan karakter apa pun, escape dengan backslash (\). Misalnya: db0.\.*, db1.user_table_[0-9]+, db[1-2].[app|web]order_\.*.

port

Nomor port layanan database MySQL.

Tidak

INTEGER

3306

Tidak ada.

schema-change.enabled

Menentukan apakah akan mengirimkan event evolusi skema.

Tidak

BOOLEAN

true

Tidak ada.

server-id

ID numerik atau rentang ID untuk klien database yang digunakan untuk sinkronisasi.

Tidak

STRING

Nilai acak antara 5400 dan 6400 dihasilkan.

ID ini harus unik secara global dalam kluster MySQL. Tetapkan ID berbeda untuk setiap pekerjaan yang terhubung ke database yang sama.

Parameter ini juga mendukung rentang ID, seperti 5400-5408. Saat pembacaan inkremental diaktifkan, beberapa pembacaan konkuren didukung. Dalam kasus ini, tetapkan rentang ID agar setiap pembacaan konkuren menggunakan ID berbeda.

jdbc.properties.*

Parameter koneksi kustom dalam URL Java Database Connectivity (JDBC).

Tidak

STRING

Tidak ada

Anda dapat meneruskan parameter koneksi kustom. Misalnya, untuk menonaktifkan protokol SSL, atur 'jdbc.properties.useSSL' = 'false'.

Untuk informasi selengkapnya tentang parameter koneksi yang didukung, lihat Properti Konfigurasi MySQL.

debezium.*

Parameter kustom untuk Debezium membaca data logging biner (binlog).

Tidak

STRING

Tidak ada

Anda dapat meneruskan parameter Debezium kustom. Misalnya, gunakan 'debezium.event.deserialization.failure.handling.mode'='ignore' untuk menentukan cara menangani kesalahan deserialisasi.

scan.incremental.snapshot.chunk.size

Ukuran setiap chunk dalam jumlah baris.

Tidak

INTEGER

8096

Tabel MySQL dibagi menjadi beberapa chunk untuk dibaca. Data chunk di-cache dalam memori sebelum sepenuhnya dibaca.

Mengurangi jumlah baris per chunk meningkatkan jumlah total chunk dalam tabel. Meskipun ini mengurangi granularitas pemulihan kesalahan, hal ini dapat memicu kesalahan kehabisan memori (OOM) dan mengurangi throughput keseluruhan. Oleh karena itu, Anda harus menyeimbangkan faktor-faktor ini dan menetapkan ukuran chunk yang sesuai.

scan.snapshot.fetch.size

Jumlah maksimum catatan yang diambil sekaligus saat membaca data lengkap tabel.

Tidak

INTEGER

1024

Tidak ada.

scan.startup.mode

Mode startup untuk mengonsumsi data.

Tidak

STRING

initial

Nilai yang valid:

  • initial (default): Saat startup pertama, konektor memindai semua data historis lalu membaca data binlog terbaru.

  • latest-offset: Saat startup pertama, konektor tidak memindai data historis. Mulai membaca dari akhir binlog, artinya hanya membaca perubahan yang terjadi setelah konektor dimulai.

  • earliest-offset: Tidak memindai data historis. Mulai membaca dari binlog paling awal yang tersedia.

  • specific-offset: Tidak memindai data historis. Mulai dari offset binlog tertentu yang Anda tentukan. Anda dapat menentukan offset dengan mengatur parameter scan.startup.specific-offset.file dan scan.startup.specific-offset.pos untuk memulai dari nama file binlog dan offset tertentu. Anda juga dapat menentukan offset dengan hanya mengatur parameter scan.startup.specific-offset.gtid-set untuk memulai dari set GTID tertentu.

  • timestamp: Tidak memindai data historis. Mulai membaca binlog dari timestamp tertentu. Timestamp ditentukan dalam milidetik oleh parameter scan.startup.timestamp-millis.

Penting

Untuk mode startup earliest-offset, specific-offset, dan timestamp, jika skema tabel pada waktu startup pekerjaan berbeda dari skema pada offset awal yang ditentukan, pekerjaan gagal. Dengan kata lain, jika Anda menggunakan ketiga mode startup ini, pastikan skema tabel yang sesuai tidak berubah antara posisi konsumsi binlog yang ditentukan dan waktu startup pekerjaan.

scan.startup.specific-offset.file

Nama file binlog untuk offset awal saat mode startup diatur ke specific-offset.

Tidak

STRING

Tidak ada

Parameter ini hanya digunakan saat scan.startup.mode diatur ke specific-offset. Format contoh: mysql-bin.000003.

scan.startup.specific-offset.pos

Offset dalam file binlog yang ditentukan untuk memulai saat mode startup diatur ke specific-offset.

Tidak

INTEGER

Tidak ada

Parameter ini hanya digunakan saat scan.startup.mode diatur ke specific-offset.

scan.startup.specific-offset.gtid-set

Set GTID untuk memulai saat mode startup diatur ke specific-offset.

Tidak

STRING

Tidak ada

Parameter ini hanya digunakan saat scan.startup.mode diatur ke specific-offset. Format contoh set GTID: 24DA167-0C0C-11E8-8442-00059A3C7B00:1-19.

scan.startup.timestamp-millis

Timestamp dalam milidetik untuk memulai saat mode startup diatur ke timestamp.

Tidak

LONG

Tidak ada

Parameter ini hanya digunakan saat scan.startup.mode diatur ke timestamp. Timestamp dalam milidetik.

Penting

Saat Anda menentukan waktu, CDC MySQL mencoba membaca event awal dari setiap file binlog untuk menentukan timestamp-nya dan menemukan file binlog yang sesuai dengan waktu yang ditentukan. Pastikan file binlog yang sesuai dengan timestamp yang ditentukan belum dibersihkan dari database dan dapat dibaca.

server-time-zone

Zona waktu sesi yang digunakan oleh database.

Tidak

STRING

Jika Anda tidak menentukan parameter ini, sistem menggunakan zona waktu lingkungan runtime pekerjaan Flink sebagai zona waktu server database. Ini adalah zona waktu zona yang Anda pilih.

Misalnya, Asia/Shanghai. Parameter ini mengontrol bagaimana tipe TIMESTAMP MySQL dikonversi ke tipe STRING. Untuk informasi selengkapnya, lihat Tipe temporal Debezium.

scan.startup.specific-offset.skip-events

Jumlah event binlog yang dilewati saat membaca dari offset tertentu.

Tidak

INTEGER

Tidak ada

Saat menggunakan konfigurasi ini, Anda harus mengatur scan.startup.mode ke specific-offset.

scan.startup.specific-offset.skip-rows

Jumlah perubahan baris yang dilewati saat membaca dari offset tertentu. Satu event binlog mungkin sesuai dengan beberapa perubahan baris.

Tidak

INTEGER

Tidak ada

Saat menggunakan konfigurasi ini, Anda harus mengatur scan.startup.mode ke specific-offset.

connect.timeout

Waktu maksimum menunggu sebelum mencoba koneksi ulang ke server database MySQL setelah timeout.

Tidak

DURATION

30s

Tidak ada.

connect.max-retries

Jumlah maksimum percobaan ulang setelah koneksi ke layanan database MySQL gagal.

Tidak

INTEGER

3

Tidak ada.

connection.pool.size

Ukuran kolam koneksi database.

Tidak

INTEGER

20

Kolam koneksi database digunakan untuk menggunakan kembali koneksi, yang dapat mengurangi jumlah koneksi database.

heartbeat.interval

Interval di mana sumber menggunakan event heartbeat untuk memajukan offset binlog.

Tidak

DURATION

30s

Event heartbeat digunakan untuk memajukan offset binlog di sumber. Ini berguna untuk tabel di MySQL yang jarang diperbarui. Untuk tabel tersebut, offset binlog tidak maju secara otomatis. Event heartbeat dapat mendorong offset binlog maju, yang mencegah offset kedaluwarsa. Offset binlog yang kedaluwarsa menyebabkan pekerjaan gagal dan tidak dapat dipulihkan. Pekerjaan hanya dapat dimulai ulang tanpa status.

scan.incremental.snapshot.chunk.key-column

Menentukan kolom untuk digunakan dalam sharding selama fase snapshot.

Tidak.

STRING

Tidak ada

Hanya kolom dari primary key yang dapat dipilih.

rds.region-id

ID wilayah instans Alibaba Cloud RDS for MySQL.

Wajib saat Anda membaca log arsip dari Object Storage Service (OSS).

STRING

Tidak ada

Untuk informasi selengkapnya tentang ID wilayah, lihat Wilayah dan zona.

rds.access-key-id

ID AccessKey akun Alibaba Cloud untuk instans RDS for MySQL.

Wajib saat Anda membaca log arsip dari OSS.

STRING

Tidak ada

Untuk informasi selengkapnya, lihat Bagaimana cara melihat ID AccessKey dan rahasia AccessKey?

Penting

Untuk mencegah informasi AccessKey Anda bocor, gunakan manajemen rahasia untuk menentukan ID AccessKey. Untuk informasi selengkapnya, lihat Manajemen variabel.

rds.access-key-secret

Rahasia AccessKey akun Alibaba Cloud untuk instans RDS for MySQL.

Wajib saat Anda membaca log arsip dari OSS.

STRING

Tidak ada

Untuk informasi selengkapnya, lihat Bagaimana cara melihat ID AccessKey dan rahasia AccessKey?

Penting

Untuk mencegah informasi AccessKey Anda bocor, gunakan manajemen rahasia untuk menentukan rahasia AccessKey. Untuk informasi selengkapnya, lihat Manajemen variabel.

rds.db-instance-id

ID instans Alibaba Cloud RDS for MySQL.

Wajib saat Anda membaca log arsip dari OSS.

STRING

Tidak ada

Tidak ada.

rds.main-db-id

ID database primer instans Alibaba Cloud RDS for MySQL.

Tidak

STRING

Tidak ada

Untuk informasi selengkapnya tentang cara mendapatkan ID database primer, lihat Cadangan log RDS for MySQL.

rds.download.timeout

Periode timeout untuk mengunduh satu log arsip dari OSS.

Tidak

DURATION

60s

Tidak ada.

rds.endpoint

Titik akhir layanan untuk mendapatkan informasi binlog OSS.

Tidak

STRING

Tidak ada

Untuk informasi selengkapnya tentang nilai yang tersedia, lihat Titik Akhir.

rds.binlog-directory-prefix

Awalan direktori untuk menyimpan file binlog.

Tidak

STRING

rds-binlog-

Tidak ada.

rds.use-intranet-link

Menentukan apakah akan menggunakan tautan jaringan internal untuk mengunduh file binlog.

Tidak

BOOLEAN

true

Tidak ada.

rds.binlog-directories-parent-path

Jalur mutlak direktori induk tempat file binlog disimpan.

Tidak

STRING

Tidak ada

Tidak ada.

chunk-meta.group.size

Ukuran metadata chunk.

Tidak

INTEGER

1000

Jika metadata lebih besar dari nilai ini, metadata diteruskan dalam beberapa bagian.

chunk-key.even-distribution.factor.lower-bound

Batas bawah faktor distribusi chunk untuk sharding merata.

Tidak

DOUBLE

0.05

Jika faktor distribusi kurang dari nilai ini, sharding tidak merata digunakan.

Faktor distribusi chunk = (MAX(chunk-key) - MIN(chunk-key) + 1) / Total jumlah baris data.

chunk-key.even-distribution.factor.upper-bound

Batas atas faktor distribusi chunk untuk sharding merata.

Tidak

DOUBLE

1000.0

Jika faktor distribusi lebih besar dari nilai ini, sharding tidak merata digunakan.

Faktor distribusi chunk = (MAX(chunk-key) - MIN(chunk-key) + 1) / Total jumlah baris data.

scan.incremental.close-idle-reader.enabled

Menentukan apakah akan menutup pembaca idle setelah snapshot selesai.

Tidak

BOOLEAN

false

Agar konfigurasi ini berlaku, atur execution.checkpointing.checkpoints-after-tasks-finish.enabled ke true.

scan.only.deserialize.captured.tables.changelog.enabled

Menentukan apakah hanya mendeserialisasi event perubahan tabel yang ditentukan selama fase inkremental.

Tidak

BOOLEAN

  • Nilai default adalah false untuk Ververica Runtime (VVR) 8.x.

  • Nilai default adalah true untuk VVR 11.1 dan seterusnya.

Nilai yang valid:

  • true: Hanya mendeserialisasi data perubahan tabel target untuk mempercepat pembacaan binlog.

  • false (default): Mendeserialisasi data perubahan semua tabel.

scan.parallel-deserialize-changelog.enabled

Menentukan apakah akan menggunakan beberapa thread untuk mengurai event perubahan selama fase inkremental.

Tidak

BOOLEAN

false

Nilai yang valid:

  • true: Menggunakan beberapa thread untuk deserialisasi event perubahan sambil mempertahankan urutan event binlog. Ini mempercepat proses pembacaan.

  • false (default): Menggunakan satu thread untuk deserialisasi.

Catatan

Fitur ini hanya didukung oleh mesin komputasi Flink VVR 8.0.11 dan seterusnya.

scan.parallel-deserialize-changelog.handler.size

Jumlah handler event saat menggunakan beberapa thread untuk mengurai event perubahan.

Tidak

INTEGER

2

Catatan

Fitur ini hanya didukung oleh mesin komputasi Flink VVR 8.0.11 dan seterusnya.

metadata-column.include-list

Kolom metadata untuk diteruskan ke sink downstream.

Tidak

STRING

Tidak ada

Kolom metadata yang tersedia meliputi op_ts, es_ts, query_log, file, dan pos. Anda dapat memisahkan beberapa kolom metadata dengan koma.

Catatan

Konektor YAML CDC MySQL tidak memerlukan atau mendukung penambahan kolom metadata untuk nama database, nama tabel, atau op_type. Anda dapat langsung menggunakan __data_event_type__ dalam ekspresi Transform untuk mendapatkan tipe data perubahan, atau menggunakan __schema_name__ dan __table_name__ untuk mendapatkan nama database dan tabel.

Penting
  • Kolom metadata file merepresentasikan file binlog yang berisi data. Selama fase lengkap, nilainya adalah "". Selama fase inkremental, nilainya adalah nama file binlog. Kolom metadata pos merepresentasikan offset data dalam file binlog. Selama fase lengkap, nilainya adalah "0". Selama fase inkremental, nilainya adalah offset data dalam file binlog. Dua kolom metadata ini didukung mulai dari mesin komputasi Flink VVR 11.5.

  • Kolom metadata es_ts merepresentasikan waktu mulai transaksi yang sesuai dengan changelog di MySQL. Ini hanya didukung untuk MySQL 8.0.x. Jangan tambahkan kolom metadata ini jika Anda menggunakan versi MySQL sebelumnya.

  • Timestamp op_ts memiliki presisi detik. Timestamp es_ts memiliki presisi milidetik.

scan.newly-added-table.enabled

Menentukan apakah akan menyinkronkan tabel yang baru ditambahkan yang tidak cocok selama startup sebelumnya atau menghapus tabel dari status yang saat ini tidak cocok saat memulai ulang dari checkpoint.

Tidak

BOOLEAN

false

Ini berlaku saat memulai ulang dari checkpoint atau titik simpan.

scan.binlog.newly-added-table.enabled

Menentukan apakah akan mengirimkan data untuk tabel yang baru cocok selama fase inkremental.

Tidak

BOOLEAN

false

Tidak dapat diaktifkan bersamaan dengan scan.newly-added-table.enabled.

scan.incremental.snapshot.chunk.key-column

Menentukan kolom untuk digunakan dalam sharding selama fase snapshot untuk tabel tertentu.

Tidak

STRING

Tidak ada

  • Hubungkan nama tabel dan nama kolom dengan titik dua (:) untuk mendefinisikan aturan. Nama tabel dapat berupa ekspresi reguler. Anda dapat mendefinisikan beberapa aturan yang dipisahkan dengan titik koma (;). Misalnya: db1.user_table_[0-9]+:col1;db[1-2].[app|web]_order_\\.*:col2.

  • Parameter ini wajib untuk tabel tanpa primary key. Kolom yang dipilih harus bertipe non-null (NOT NULL). Untuk tabel dengan primary key, parameter ini opsional dan hanya satu kolom dari primary key yang dapat dipilih.

scan.parse.online.schema.changes.enabled

Menentukan apakah akan mencoba mengurai event DDL perubahan lockless RDS selama fase inkremental.

Tidak

BOOLEAN

false

Nilai yang valid:

  • true: Mengurai event DDL perubahan lockless RDS.

  • false (default): Tidak mengurai event DDL perubahan lockless RDS.

Ini adalah fitur eksperimental. Sebelum melakukan perubahan lockless online, buat snapshot pekerjaan Flink untuk pemulihan.

Catatan

Fitur ini hanya didukung oleh mesin komputasi Flink VVR 11.0 dan seterusnya.

scan.incremental.snapshot.backfill.skip

Menentukan apakah akan melewati backfill selama fase pembacaan snapshot.

Tidak

BOOLEAN

false

Nilai yang valid:

  • true: Melewati backfill selama fase pembacaan snapshot.

  • false (default): Tidak melewati backfill selama fase pembacaan snapshot.

Jika backfill dilewati, perubahan pada tabel selama fase snapshot dibaca dalam fase inkremental berikutnya daripada digabungkan ke dalam snapshot.

Penting

Melewati backfill dapat menyebabkan inkonsistensi data karena perubahan yang terjadi selama fase snapshot mungkin diputar ulang. Hanya semantik at-least-once yang dijamin.

Catatan

Fitur ini hanya didukung oleh mesin komputasi Flink VVR 11.1 dan seterusnya.

treat-tinyint1-as-boolean.enabled

Menentukan apakah akan memperlakukan tipe TINYINT(1) sebagai tipe BOOLEAN.

Tidak

BOOLEAN

true

Nilai yang valid:

  • true (default): Memperlakukan tipe TINYINT(1) sebagai tipe BOOLEAN.

  • false: Tidak memperlakukan tipe TINYINT(1) sebagai tipe BOOLEAN.

treat-timestamp-as-datetime-enabled

Menentukan apakah akan memperlakukan tipe TIMESTAMP sebagai tipe DATETIME.

Tidak

BOOLEAN

false

Nilai yang valid:

  • true: Memperlakukan tipe TIMESTAMP MySQL sebagai tipe DATETIME dan memetakannya ke tipe CDC TIMESTAMP.

  • false (default): Memetakan tipe TIMESTAMP MySQL ke tipe CDC TIMESTAMP_LTZ.

Tipe TIMESTAMP MySQL menyimpan waktu UTC dan dipengaruhi oleh zona waktu. Tipe DATETIME MySQL menyimpan waktu literal dan tidak dipengaruhi oleh zona waktu.

Jika ini diaktifkan, data tipe TIMESTAMP MySQL dikonversi ke tipe DATETIME berdasarkan server-time-zone.

include-comments.enabled

Menentukan apakah akan menyinkronkan komentar tabel dan kolom.

Tidak

BOOELEAN

false

Nilai yang valid:

  • true: Menyinkronkan komentar tabel dan kolom.

  • false (default): Tidak menyinkronkan komentar tabel dan kolom.

Mengaktifkan ini meningkatkan penggunaan memori pekerjaan.

scan.incremental.snapshot.unbounded-chunk-first.enabled

Menentukan apakah akan mendistribusikan chunk tak terbatas terlebih dahulu selama fase pembacaan snapshot.

Tidak

BOOELEAN

false

Nilai yang valid:

  • true: Mendistribusikan chunk tak terbatas terlebih dahulu selama fase pembacaan snapshot.

  • false (default): Tidak mendistribusikan chunk tak terbatas terlebih dahulu selama fase pembacaan snapshot.

Ini adalah fitur eksperimental. Mengaktifkan ini mengurangi risiko kesalahan OOM saat TaskManager menyinkronkan chunk terakhir selama fase snapshot. Tambahkan parameter ini sebelum startup pertama pekerjaan.

Catatan

Fitur ini hanya didukung oleh mesin komputasi Flink VVR 11.1 dan seterusnya.

binlog.session.network.timeout

Timeout jaringan untuk koneksi binlog.

Tidak

DURATION

10m

Jika ini diatur ke 0s, timeout default server MySQL digunakan.

Catatan

Fitur ini hanya didukung oleh mesin komputasi Flink VVR 11.5 dan seterusnya.

scan.rate-limit.records-per-second

Membatasi jumlah maksimum catatan yang dapat dikirim sumber per detik.

Tidak

LONG

Tidak ada

Ini berlaku untuk skenario di mana pembacaan data perlu dibatasi. Batasan ini efektif dalam fase lengkap dan inkremental.

Metrik numRecordsOutPerSecond dari sumber mencerminkan jumlah catatan yang seluruh aliran data keluarkan per detik. Anda dapat menyesuaikan parameter ini berdasarkan metrik tersebut.

Selama fase baca lengkap, Anda biasanya perlu mengurangi jumlah entri data yang dibaca dalam setiap batch. Anda dapat mengurangi nilai parameter scan.incremental.snapshot.chunk.size.

Catatan

Fitur ini hanya didukung oleh mesin komputasi Flink VVR 11.5 dan seterusnya.

include-binlog-meta.enable

Menentukan apakah akan menyertakan informasi binlog MySQL asli, seperti GTID dan offset binlog, dalam pesan.

Tidak

Boolean

false

Ini cocok untuk skenario sinkronisasi binlog asli, seperti mengganti tautan sinkronisasi canal yang ada.

Catatan

Fitur ini hanya didukung oleh mesin komputasi Flink VVR 11.6 dan seterusnya.

scan.binlog.tolerate.gtid-holes

Mengaktifkan parameter ini mengabaikan celah dalam urutan GTID, yang memungkinkan pekerjaan melewati event diskontinu dan terus berjalan.

Tidak

Boolean

false

Sebelum Anda mengaktifkan parameter ini, Anda harus memastikan bahwa offset awal pekerjaan belum kedaluwarsa. Jika pekerjaan dimulai dari offset GTID yang dibersihkan atau kedaluwarsa, mesin diam-diam melewatkan log yang hilang, yang mengakibatkan kehilangan data.

Catatan

Parameter ini hanya didukung oleh mesin komputasi Flink VVR 11.6 dan seterusnya.

Pemetaan tipe

Tabel berikut menunjukkan pemetaan tipe data untuk ingesti data.

Tipe bidang CDC MySQL

Tipe bidang CDC

TINYINT(n)

TINYINT

SMALLINT

SMALLINT

TINYINT UNSIGNED

TINYINT UNSIGNED ZEROFILL

YEAR

INT

INT

MEDIUMINT

MEDIUMINT UNSIGNED

MEDIUMINT UNSIGNED ZEROFILL

SMALLINT UNSIGNED

SMALLINT UNSIGNED ZEROFILL

BIGINT

BIGINT

INT UNSIGNED

INT UNSIGNED ZEROFILL

BIGINT UNSIGNED

DECIMAL(20, 0)

BIGINT UNSIGNED ZEROFILL

SERIAL

FLOAT [UNSIGNED] [ZEROFILL]

FLOAT

DOUBLE [UNSIGNED] [ZEROFILL]

DOUBLE

DOUBLE PRECISION [UNSIGNED] [ZEROFILL]

REAL [UNSIGNED] [ZEROFILL]

NUMERIC(p, s) [UNSIGNED] [ZEROFILL] dengan p <= 38

DECIMAL(p, s)

DECIMAL(p, s) [UNSIGNED] [ZEROFILL] dengan p <= 38

FIXED(p, s) [UNSIGNED] [ZEROFILL] dengan p <= 38

BOOLEAN

BOOLEAN

BIT(1)

TINYINT(1)

DATE

DATE

TIME [(p)]

TIME [(p)]

DATETIME [(p)]

TIMESTAMP [(p)]

TIMESTAMP [(p)]

Pemetaan tergantung pada nilai parameter treat-timestamp-as-datetime-enabled:

true:TIMESTAMP[(p)]

false:TIMESTAMP_LTZ[(p)]

CHAR(n)

CHAR(n)

VARCHAR(n)

VARCHAR(n)

BIT(n)

BINARY(⌈(n + 7) / 8⌉)

BINARY(n)

BINARY(n)

VARBINARY(N)

VARBINARY(N)

NUMERIC(p, s) [UNSIGNED] [ZEROFILL] dengan 38 < p <= 65

STRING

Catatan

Dalam MySQL, presisi tipe data desimal dapat mencapai 65. Dalam Flink, presisi dibatasi hingga 38. Oleh karena itu, jika Anda mendefinisikan kolom desimal dengan presisi lebih dari 38, Anda harus memetakannya ke string untuk menghindari kehilangan presisi.

DECIMAL(p, s) [UNSIGNED] [ZEROFILL] dengan 38 < p <= 65

FIXED(p, s) [UNSIGNED] [ZEROFILL] dengan 38 < p <= 65

TINYTEXT

STRING

TEXT

MEDIUMTEXT

LONGTEXT

ENUM

JSON

STRING

Catatan

Tipe data JSON dikonversi ke string berformat JSON di Flink.

GEOMETRY

STRING

Catatan

Tipe data spasial di MySQL dikonversi ke string dalam format JSON tetap. Untuk informasi selengkapnya, lihat Pemetaan tipe data spasial MySQL.

POINT

LINESTRING

POLYGON

MULTIPOINT

MULTILINESTRING

MULTIPOLYGON

GEOMETRYCOLLECTION

TINYBLOB

BYTES

Catatan

Untuk tipe data BLOB di MySQL, hanya blob dengan panjang tidak lebih dari 2.147.483.647 (2^31 - 1) yang didukung.

BLOB

MEDIUMBLOB

LONGBLOB

Contoh penggunaan

  • Tabel sumber CDC

    CREATE TEMPORARY TABLE mysqlcdc_source (
       order_id INT,
       order_date TIMESTAMP(0),
       customer_name STRING,
       price DECIMAL(10, 5),
       product_id INT,
       order_status BOOLEAN,
       PRIMARY KEY(order_id) NOT ENFORCED
    ) WITH (
      'connector' = 'mysql',
      'hostname' = '<yourHostname>',
      'port' = '3306',
      'username' = '<yourUsername>',
      'password' = '<yourPassword>',
      'database-name' = '<yourDatabaseName>',
      'table-name' = '<yourTableName>'
    );
    
    CREATE TEMPORARY TABLE blackhole_sink(
      order_id INT,
      customer_name STRING
    ) WITH (
      'connector' = 'blackhole'
    );
    
    INSERT INTO blackhole_sink
    SELECT order_id, customer_name FROM mysqlcdc_source;
  • Tabel dimensi

    CREATE TEMPORARY TABLE datagen_source(
      a INT,
      b BIGINT,
      c STRING,
      `proctime` AS PROCTIME()
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE mysql_dim (
      a INT,
      b VARCHAR,
      c VARCHAR
    ) WITH (
      'connector' = 'mysql',
      'hostname' = '<yourHostname>',
      'port' = '3306',
      'username' = '<yourUsername>',
      'password' = '<yourPassword>',
      'database-name' = '<yourDatabaseName>',
      'table-name' = '<yourTableName>'
    );
    
    CREATE TEMPORARY TABLE blackhole_sink(
      a INT,
      b STRING
    ) WITH (
      'connector' = 'blackhole'
    );
    
    INSERT INTO blackhole_sink
    SELECT T.a, H.b
    FROM datagen_source AS T JOIN mysql_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H ON T.a = H.a;
  • Tabel sink

    CREATE TEMPORARY TABLE datagen_source (
      `name` VARCHAR,
      `age` INT
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE mysql_sink (
      `name` VARCHAR,
      `age` INT
    ) WITH (
      'connector' = 'mysql',
      'hostname' = '<yourHostname>',
      'port' = '3306',
      'username' = '<yourUsername>',
      'password' = '<yourPassword>',
      'database-name' = '<yourDatabaseName>',
      'table-name' = '<yourTableName>'
    );
    
    INSERT INTO mysql_sink
    SELECT * FROM datagen_source;
  • Sumber ingesti data

    source:
      type: mysql
      name: MySQL Source
      hostname: ${mysql.hostname}
      port: ${mysql.port}
      username: ${mysql.username}
      password: ${mysql.password}
      tables: ${mysql.source.table}
      server-id: 7601-7604
    
    sink:
      type: values
      name: Values Sink
      print.enabled: true
      sink.print.logger: true

Tentang tabel sumber CDC MySQL

  • Prinsip implementasi

    Saat tabel sumber CDC MySQL dimulai, tabel tersebut memindai seluruh tabel, membaginya menjadi beberapa chunk berdasarkan primary key, dan mencatat offset log biner. Kemudian membaca data dari setiap chunk secara berurutan menggunakan algoritma snapshot inkremental dengan pernyataan SELECT. Pekerjaan secara berkala melakukan checkpoint, mencatat chunk yang telah selesai. Jika terjadi failover, pekerjaan melanjutkan pembacaan dari chunk yang belum selesai. Setelah semua chunk dibaca, pekerjaan membaca catatan perubahan inkremental dari offset log biner yang sebelumnya dicatat. Pekerjaan Flink terus melakukan checkpoint secara berkala, mencatat offset log biner. Jika pekerjaan gagal, pekerjaan melanjutkan pemrosesan dari offset log biner terakhir yang dicatat, memastikan semantik tepat-sekali.

    Untuk penjelasan lebih rinci tentang algoritma snapshot inkremental, lihat Konektor CDC MySQL.

  • Metadata

    Metadata berguna dalam skenario sharding tempat Anda menggabungkan dan menyinkronkan data dari beberapa database dan tabel. Dalam kasus seperti itu, Anda sering perlu membedakan database dan tabel sumber untuk setiap catatan. Kolom metadata memberikan akses ke informasi nama database dan tabel sumber, sehingga memudahkan penggabungan beberapa tabel sharded menjadi satu tabel tujuan.

    Tabel Sumber CDC MySQL mendukung sintaks kolom metadata, memungkinkan Anda mengakses metadata berikut:

    Kunci metadata

    Tipe metadata

    Deskripsi

    database_name

    STRING NOT NULL

    Nama database yang berisi catatan.

    table_name

    STRING NOT NULL

    Nama tabel yang berisi catatan.

    op_ts

    TIMESTAMP_LTZ(3) NOT NULL

    Waktu perubahan catatan di database. Jika catatan berasal dari data historis daripada log biner, nilai ini selalu 0.

    op_type

    STRING NOT NULL

    Jenis perubahan catatan.

    • +I: Pesan INSERT

    • -D: Pesan DELETE

    • -U: Pesan UPDATE_BEFORE

    • +U: Pesan UPDATE_AFTER

    Catatan

    Didukung hanya di VVR 8.0.7 dan seterusnya.

    query_log

    STRING NOT NULL

    Catatan log kueri MySQL yang sesuai dengan baris tersebut.

    Catatan

    MySQL harus memiliki parameter binlog_rows_query_log_events diaktifkan untuk mencatat log kueri.

    Contoh kode berikut menggabungkan beberapa tabel orders sharded dari database berbeda dalam satu instans MySQL ke satu tabel holo_orders di Hologres.

    CREATE TEMPORARY TABLE mysql_orders (
      db_name STRING METADATA FROM 'database_name' VIRTUAL,  -- Baca nama database.
      table_name STRING METADATA  FROM 'table_name' VIRTUAL, -- Baca nama tabel.
      operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL, -- Baca waktu perubahan.
      op_type STRING METADATA FROM 'op_type' VIRTUAL, -- Baca jenis perubahan.
      order_id INT,
      order_date TIMESTAMP(0),
      customer_name STRING,
      price DECIMAL(10, 5),
      product_id INT,
      order_status BOOLEAN,
      PRIMARY KEY(order_id) NOT ENFORCED
    ) WITH (
      'connector' = 'mysql-cdc',
      'hostname' = 'localhost',
      'port' = '3306',
      'username' = 'flinkuser',
      'password' = 'flinkpw',
      'database-name' = 'mydb_.*', -- Regex cocokkan beberapa database sharded.
      'table-name' = 'orders_.*'   -- Regex cocokkan beberapa tabel sharded.
    );
    
    INSERT INTO holo_orders SELECT * FROM mysql_orders;

    Berdasarkan kode di atas, jika Anda mengatur parameter scan.read-changelog-as-append-only.enabled ke true dalam klausa WITH, output bervariasi berdasarkan konfigurasi primary key tabel sink:

    • Jika primary key tabel sink adalah order_id, output hanya berisi perubahan terakhir untuk setiap primary key dari tabel sumber. Untuk primary key yang perubahan terakhirnya adalah operasi penghapusan, tabel sink menampilkan catatan dengan primary key yang sama dan op_type -D.

    • Jika primary key tabel sink adalah order_id, operation_ts, dan op_type, output berisi riwayat perubahan lengkap untuk setiap primary key dari tabel sumber.

  • Dukungan ekspresi reguler

    Tabel sumber CDC MySQL mendukung penggunaan ekspresi reguler dalam nama tabel atau nama database untuk mencocokkan beberapa tabel atau database. Contoh kode berikut menggunakan ekspresi reguler untuk menentukan beberapa tabel.

    CREATE TABLE products (
      db_name STRING METADATA FROM 'database_name' VIRTUAL,
      table_name STRING METADATA  FROM 'table_name' VIRTUAL,
      operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
      order_id INT,
      order_date TIMESTAMP(0),
      customer_name STRING,
      price DECIMAL(10, 5),
      product_id INT,
      order_status BOOLEAN,
      PRIMARY KEY(order_id) NOT ENFORCED
    ) WITH (
      'connector' = 'mysql-cdc',
      'hostname' = 'localhost',
      'port' = '3306',
      'username' = 'root',
      'password' = '123456',
      'database-name' = '(^(test).*|^(tpc).*|txc|.*[p$]|t{2})', -- Regex cocokkan beberapa database.
      'table-name' = '(t[5-8]|tt)' -- Regex cocokkan beberapa tabel.
    );

    Penjelasan ekspresi reguler dalam contoh:

    • ^(test).* adalah contoh pencocokan prefiks. Ekspresi ini mencocokkan nama database yang dimulai dengan "test", seperti test1 atau test2.

    • .*[p$] adalah contoh pencocokan sufiks. Ekspresi ini mencocokkan nama database yang diakhiri dengan "p", seperti cdcp atau edcp.

    • txc adalah pencocokan eksak. Ini mencocokkan nama database "txc".

    Saat mencocokkan nama tabel jalur lengkap, CDC MySQL mengidentifikasi tabel secara unik menggunakan nama database dan nama tabel, yaitu database-name.table-name sebagai pola pencocokan. Misalnya, pola (^(test).*|^(tpc).*|txc|.*[p$]|t{2}).(t[5-8]|tt) dapat mencocokkan tabel txc.tt dan test2.test5 dalam database.

    Penting

    Dalam konfigurasi pekerjaan SQL, table-name dan database-name tidak mendukung daftar yang dipisahkan koma untuk menentukan beberapa tabel atau database.

    • Untuk mencocokkan beberapa tabel atau menggunakan beberapa ekspresi reguler, hubungkan dengan bilah vertikal (|) dan tutup dalam tanda kurung. Misalnya, untuk membaca tabel 'user' dan 'product', atur table-name ke (user|product).

    • Jika ekspresi reguler berisi koma, tulis ulang menggunakan operator bilah vertikal (|). Misalnya, tulis ulang mytable_\d{1, 2} sebagai (mytable_\d{1}|mytable_\d{2}) yang setara untuk menghindari penggunaan koma.

  • Kontrol konkurensi

    Konektor MySQL mendukung pembacaan konkuren data lengkap untuk meningkatkan efisiensi pemuatan data. Dikombinasikan dengan fitur auto-tuning Autopilot di konsol Realtime Compute Flink, secara otomatis menskala turun selama fase inkremental setelah pembacaan data lengkap konkuren, menghemat sumber daya komputasi.

    Di Konsol pengembangan Realtime Compute, Anda dapat mengatur tingkat paralelisme pekerjaan dalam mode dasar atau ahli. Perbedaan dalam pengaturan konkurensi adalah sebagai berikut:

    • Konkurensi yang diatur dalam mode dasar berlaku secara global untuk pekerjaan.基础模式

    • Dalam mode ahli, Anda dapat mengatur tingkat paralelisme untuk vertex tertentu.vertex并发

    Untuk informasi selengkapnya tentang konfigurasi sumber daya, lihat Konfigurasi informasi penerapan pekerjaan.

    Penting

    Terlepas dari modusnya, saat mengatur konkurensi, rentang server-id yang dideklarasikan dalam tabel harus lebih besar dari atau sama dengan tingkat paralelisme pekerjaan. Misalnya, jika rentang server-id adalah 5404-5412, terdapat 8 ID server unik, sehingga pekerjaan dapat memiliki maksimal 8 tugas konkuren. Pekerjaan berbeda yang mengakses instans MySQL yang sama harus memiliki rentang server-id yang tidak tumpang tindih; setiap pekerjaan harus secara eksplisit mengonfigurasi server-id unik.

  • Autopilot Autoscaling

    Fase data lengkap mengakumulasi banyak data historis. Untuk meningkatkan efisiensi pembacaan, biasanya digunakan pembacaan konkuren. Dalam fase inkremental log biner, karena volume data log biner kecil dan untuk memastikan urutan global, pembacaan single-threaded biasanya cukup. Persyaratan sumber daya yang berbeda untuk fase lengkap dan inkremental dapat diseimbangkan secara otomatis oleh fitur auto-tuning.

    Auto-tuning memantau trafik setiap tugas di Sumber CDC MySQL. Saat pekerjaan memasuki fase log biner, jika hanya satu tugas yang bertanggung jawab atas pembacaan log biner dan tugas lainnya idle, auto-tuning secara otomatis mengurangi jumlah CU dan konkurensi Sumber. Untuk mengaktifkan auto-tuning, atur mode auto-tuning ke Active di halaman operasi pekerjaan.

    Catatan

    Interval pemicu minimum untuk mengurangi konkurensi adalah 24 jam secara default. Untuk informasi selengkapnya tentang parameter dan detail auto-tuning, lihat Konfigurasi auto-tuning.

  • Mode startup

    Gunakan item konfigurasi scan.startup.mode untuk menentukan mode startup untuk tabel sumber CDC MySQL. Nilai yang valid:

    • initial (default): Saat startup pertama, melakukan pembacaan lengkap tabel database, lalu beralih ke pembacaan log biner inkremental.

    • earliest-offset: Melewati fase snapshot dan mulai membaca dari offset log biner paling awal yang tersedia.

    • latest-offset: Melewati fase snapshot dan mulai membaca dari akhir log biner. Dalam mode ini, tabel sumber hanya membaca perubahan data yang terjadi setelah pekerjaan dimulai.

    • specific-offset: Melewati fase snapshot dan mulai membaca dari offset log biner tertentu. Offset dapat ditentukan berdasarkan nama file log biner dan posisi, atau berdasarkan set GTID.

    • timestamp: Melewati fase snapshot dan mulai membaca event log biner dari timestamp tertentu.

    Contoh penggunaan:

    CREATE TABLE mysql_source (...) WITH (
        'connector' = 'mysql-cdc',
        'scan.startup.mode' = 'earliest-offset', -- Mulai dari offset paling awal.
        'scan.startup.mode' = 'latest-offset', -- Mulai dari offset terbaru.
        'scan.startup.mode' = 'specific-offset', -- Mulai dari offset tertentu.
        'scan.startup.mode' = 'timestamp', -- Mulai dari timestamp tertentu.
        'scan.startup.specific-offset.file' = 'mysql-bin.000003', -- Tentukan nama file log biner untuk mode specific-offset.
        'scan.startup.specific-offset.pos' = '4', -- Tentukan posisi log biner untuk mode specific-offset.
        'scan.startup.specific-offset.gtid-set' = '24DA167-0C0C-11E8-8442-00059A3C7B00:1-19', -- Tentukan set GTID untuk mode specific-offset.
        'scan.startup.timestamp-millis' = '1667232000000' -- Tentukan timestamp startup untuk mode timestamp.
        ...
    )
    Penting
    • Sumber MySQL mencetak offset saat ini pada waktu checkpoint dengan logging level INFO. Awalan log adalah Binlog offset on checkpoint {checkpoint-id}. Log ini membantu Anda memulai pekerjaan dari offset checkpoint tertentu.

    • Jika tabel yang dibaca telah mengalami perubahan skema, memulai dari mode earliest-offset, specific-offset, atau timestamp dapat menyebabkan kesalahan. Hal ini karena pembaca Debezium secara internal mempertahankan skema tabel terbaru, dan data sebelumnya dengan skema yang tidak cocok tidak dapat diurai dengan benar.

  • Tentang tabel sumber CDC tanpa kunci

    • Menggunakan tabel tanpa kunci memerlukan pengaturan scan.incremental.snapshot.chunk.key-column, dan kolom yang dipilih harus non-null.

    • Semantik pemrosesan tabel sumber CDC tanpa kunci ditentukan oleh perilaku kolom yang ditentukan dalam scan.incremental.snapshot.chunk.key-column:

      • Jika kolom yang ditentukan tidak diperbarui, semantik tepat-sekali dijamin.

      • Jika kolom yang ditentukan diperbarui, hanya semantik at-least-once yang dijamin. Namun, Anda dapat memastikan kebenaran data dengan menentukan primary key di sistem downstream dan menggunakan operasi idempoten.

  • Membaca log cadangan Alibaba Cloud ApsaraDB RDS for MySQL

    Tabel sumber CDC MySQL mendukung pembacaan log cadangan dari Alibaba Cloud ApsaraDB RDS for MySQL. Ini berguna ketika fase data lengkap memakan waktu lama, file log biner lokal secara otomatis dibersihkan, tetapi file cadangan yang diunggah secara otomatis atau manual masih ada.

    Contoh penggunaan:

    CREATE TABLE mysql_source (...) WITH (
        'connector' = 'mysql-cdc',
        'rds.region-id' = 'cn-beijing',
        'rds.access-key-id' = 'xxxxxxxxx', 
        'rds.access-key-secret' = 'xxxxxxxxx', 
        'rds.db-instance-id' = 'rm-xxxxxxxxxxxxxxxxx', 
        'rds.main-db-id' = '12345678',
        'rds.download.timeout' = '60s'
        ...
    )
  • Mengaktifkan penggunaan ulang Sumber CDC

    Dalam satu pekerjaan, beberapa tabel sumber CDC MySQL memulai beberapa klien Binlog. Saat semua tabel sumber berada dalam instans yang sama, hal ini meningkatkan beban pada database. Untuk informasi selengkapnya, lihat FAQ CDC MySQL.

    Solusi

    VVR 8.0.7 dan seterusnya mendukung penggunaan ulang Sumber CDC MySQL. Penggunaan ulang menggabungkan tabel sumber CDC MySQL yang dapat digabungkan. Penggabungan terjadi saat konfigurasi tabel sumber identik, kecuali untuk nama database, nama tabel, dan server-id. Mesin secara otomatis menggabungkan sumber CDC MySQL dalam pekerjaan yang sama.

    Prosedur

    1. Gunakan perintah SET dalam pekerjaan SQL Anda:

      SET 'table.optimizer.source-merge.enabled' = 'true';
      
      # (VVR 8.0.8 dan 8.0.9) Tambahan atur ini:
      SET 'sql-gateway.exec-plan.enabled' = 'false';
      VVR 11.1 dan seterusnya memiliki penggunaan ulang diaktifkan secara default.
    2. Jalankan pekerjaan tanpa status. Memodifikasi konfigurasi penggunaan ulang Sumber mengubah topologi pekerjaan. Anda harus menjalankan pekerjaan tanpa status. Jika tidak, pekerjaan mungkin gagal dimulai atau kehilangan data. Jika Sumber digabung, Anda akan melihat node MergetableSourceScan.

    Penting
    • Setelah mengaktifkan penggunaan ulang, kami merekomendasikan untuk tidak menonaktifkan operator chaining. Mengatur pipeline.operator-chaining ke false meningkatkan overhead serialisasi dan deserialisasi data. Semakin banyak Sumber yang digabung, semakin besar overhead-nya.

    • Di VVR 8.0.7, menonaktifkan operator chaining menyebabkan masalah serialisasi.

Mempercepat pembacaan log biner

Saat konektor MySQL digunakan sebagai tabel sumber atau sumber ingesti data, konektor tersebut mengurai file log biner untuk menghasilkan berbagai pesan perubahan selama fase inkremental. File log biner mencatat semua perubahan tabel dalam format biner. Percepat penguraian file log biner menggunakan metode berikut:

  • Aktifkan konfigurasi filter penguraian

    • Gunakan item konfigurasi scan.only.deserialize.captured.tables.changelog.enabled: Hanya uraikan event perubahan untuk tabel yang ditentukan.

  • Optimalkan parameter Debezium

    debezium.max.queue.size: 162580
    debezium.max.batch.size: 40960
    debezium.poll.interval.ms: 50
    • debezium.max.queue.size: Jumlah maksimum catatan yang dapat ditampung oleh antrian pemblokiran. Saat Debezium membaca aliran event dari database, event ditempatkan ke antrian pemblokiran sebelum ditulis ke downstream. Nilai default adalah 8192.

    • debezium.max.batch.size: Jumlah maksimum event yang diproses konektor dalam setiap iterasi. Nilai default adalah 2048.

    • debezium.poll.interval.ms: Jumlah milidetik yang ditunggu konektor sebelum meminta event perubahan baru. Nilai default adalah 1000 milidetik (1 detik).

Contoh penggunaan:

CREATE TABLE mysql_source (...) WITH (
    'connector' = 'mysql-cdc',
    -- Konfigurasi Debezium
    'debezium.max.queue.size' = '162580',
    'debezium.max.batch.size' = '40960',
    'debezium.poll.interval.ms' = '50',
    -- Aktifkan filter penguraian
    'scan.only.deserialize.captured.tables.changelog.enabled' = 'true',  -- Hanya uraikan event perubahan untuk tabel yang ditentukan.
    ...
)
source:
  type: mysql
  name: MySQL Source
  hostname: ${mysql.hostname}
  port: ${mysql.port}
  username: ${mysql.username}
  password: ${mysql.password}
  tables: ${mysql.source.table}
  server-id: 7601-7604
  # Konfigurasi Debezium
  debezium.max.queue.size: 162580
  debezium.max.batch.size: 40960
  debezium.poll.interval.ms: 50
  # Aktifkan filter penguraian
  scan.only.deserialize.captured.tables.changelog.enabled: true

Versi enterprise CDC MySQL memiliki kapasitas konsumsi log biner 85 MB/detik, kira-kira dua kali lipat versi komunitas open-source. Jika laju pembuatan log biner melebihi 85 MB/detik (yaitu, file 512 MB setiap 6 detik), latensi pekerjaan Flink terus meningkat. Latensi secara bertahap menurun saat laju pembuatan log biner turun. Jika file log biner berisi transaksi besar, latensi pemrosesan mungkin meningkat secara singkat, lalu menurun setelah log transaksi diproses.

API DataStream CDC MySQL

Penting

Untuk membaca dan menulis data menggunakan DataStream, gunakan konektor DataStream yang sesuai untuk terhubung ke Flink. Untuk petunjuk pengaturan konektor DataStream, lihat Cara menggunakan konektor DataStream.

Buat program API DataStream dan gunakan MySqlSource. Berikut ini contoh kode dan dependensi pom:

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
public class MySqlSourceExample {
  public static void main(String[] args) throws Exception {
    MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
        .hostname("yourHostname")
        .port(yourPort)
        .databaseList("yourDatabaseName") // set database yang ditangkap
        .tableList("yourDatabaseName.yourTableName") // set tabel yang ditangkap
        .username("yourUsername")
        .password("yourPassword")
        .deserializer(new JsonDebeziumDeserializationSchema()) // mengonversi SourceRecord ke String JSON
        .build();
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    // aktifkan checkpoint
    env.enableCheckpointing(3000);
    env
      .fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
      // set 4 tugas sumber paralel
      .setParallelism(4)
      .print().setParallelism(1); // gunakan paralelisme 1 untuk sink agar menjaga urutan pesan
    env.execute("Print MySQL Snapshot + Binlog");
  }
}
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-core</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-base</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-common</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-java-bridge</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>ververica-connector-mysql</artifactId>
    <version>${vvr.version}</version>
</dependency>

Saat membuat MySqlSource, Anda harus menentukan parameter berikut:

Parameter

Deskripsi

hostname

Alamat IP atau hostname database MySQL.

port

Nomor port layanan database MySQL.

databaseList

Nama database MySQL.

Catatan

Nama database mendukung ekspresi reguler untuk membaca data dari beberapa database. Anda dapat menggunakan .* untuk mencocokkan semua database.

username

Nama pengguna untuk layanan database MySQL.

password

Kata sandi untuk layanan database MySQL.

deserializer

Deserializer mengonversi catatan tipe SourceRecord ke tipe yang ditentukan. Nilai yang valid:

  • RowDataDebeziumDeserializeSchema: Mengonversi SourceRecord ke struktur data internal RowData Flink Table atau SQL.

  • JsonDebeziumDeserializationSchema: Mengonversi SourceRecord ke String berformat JSON.

Dependensi pom harus menentukan parameter berikut:

${vvr.version}

Versi mesin Alibaba Cloud Realtime Compute for Apache Flink, misalnya: 1.17-vvr-8.0.4-3.

Catatan

Merujuk pada versi yang ditampilkan di Maven, karena versi hotfix mungkin dirilis secara berkala tanpa pemberitahuan lainnya.

${flink.version}

Versi Apache Flink, misalnya: 1.17.2.

Penting

Gunakan versi Apache Flink yang sesuai dengan versi mesin Alibaba Cloud Realtime Compute for Apache Flink Anda untuk menghindari masalah ketidakcocokan selama eksekusi pekerjaan. Untuk pemetaan versi, lihat Mesin.

FAQ

Untuk masalah umum yang dihadapi saat menggunakan tabel sumber CDC, lihat Masalah CDC.