全部产品
Search
文档中心

Realtime Compute for Apache Flink:MySQL

更新时间:Feb 13, 2026

Topik ini menjelaskan cara menggunakan konektor MySQL.

Informasi latar belakang

Konektor MySQL mendukung semua database yang kompatibel dengan protokol MySQL, seperti RDS MySQL, PolarDB for MySQL, OceanBase (mode MySQL), dan MySQL yang dikelola sendiri.

Penting

Saat menggunakan konektor MySQL untuk membaca dari OceanBase, pastikan binary logging (binlog) OceanBase diaktifkan dan dikonfigurasi dengan benar. Untuk informasi selengkapnya, lihat Operasi binary logging. Fitur ini berada dalam pratinjau publik. Kami menyarankan agar Anda mengevaluasinya secara menyeluruh dan menggunakannya dengan hati-hati.

Tabel berikut menjelaskan dukungan untuk konektor MySQL.

Kategori

Detail

Jenis yang didukung

Tabel sumber, tabel dimensi, tabel sink, dan sumber data ingestion

Mode runtime

Hanya mode streaming

Format data

Tidak berlaku

Metrik pemantauan spesifik

Metrik pemantauan

  • Tabel sumber

    • currentFetchEventTimeLag: Interval waktu antara saat data dihasilkan dan saat data ditarik oleh Operator Sumber.

      Metrik ini hanya berlaku selama fase binlog. Nilainya selalu 0 selama fase snapshot.

    • currentEmitEventTimeLag: Interval waktu antara saat data dihasilkan dan saat data meninggalkan Operator Sumber.

      Metrik ini hanya berlaku selama fase binlog. Nilainya selalu 0 selama fase snapshot.

    • sourceIdleTime: Berapa lama tabel sumber tidak menghasilkan data baru.

  • Tabel dimensi dan tabel sink: Tidak ada.

Catatan

Untuk informasi selengkapnya tentang metrik ini, lihat Metrik pemantauan.

Jenis API

DataStream, SQL, dan YAML data ingestion

Dukungan untuk memperbarui atau menghapus data tabel sink

Ya

Fitur

Tabel sumber MySQL Change Data Capture (CDC) pertama-tama membaca seluruh data historis database, lalu beralih secara mulus ke pembacaan event binary log (binlog). Proses ini menjamin semantik tepat-sekali (exactly-once semantics), artinya tidak ada data yang terlewat atau diduplikasi, bahkan jika terjadi kegagalan. Tabel sumber CDC MySQL mendukung pembacaan data lengkap secara konkuren serta menerapkan pembacaan tanpa penguncian (lock-free) dan transfer data yang dapat dilanjutkan (resumable) menggunakan algoritma snapshot inkremental. Untuk informasi selengkapnya, lihat Tentang tabel sumber MySQL CDC.

  • Pemrosesan batch dan streaming terpadu: Membaca data lengkap dan inkremental tanpa perlu memelihara pipeline terpisah.

  • Pembacaan data lengkap secara konkuren: Meningkatkan kinerja secara horizontal.

  • Peralihan mulus dari pembacaan lengkap ke inkremental: Secara otomatis mengurangi skala untuk menghemat sumber daya komputasi.

  • Transfer data yang dapat dilanjutkan: Mendukung kelanjutan transfer data selama pembacaan data lengkap guna meningkatkan stabilitas.

  • Pembacaan tanpa penguncian: Membaca data lengkap tanpa mengganggu operasi bisnis online.

  • Pembacaan log cadangan: Mendukung pembacaan log cadangan dari RDS MySQL.

  • Penguraian binlog paralel: Mengurangi latensi baca dengan mengurai file binlog secara paralel.

Prasyarat

Sebelum menggunakan tabel sumber MySQL CDC, Anda harus mengonfigurasi database MySQL seperti yang dijelaskan dalam Konfigurasi MySQL. Konfigurasi berikut wajib dilakukan.

RDS MySQL

  • Lakukan probe jaringan dengan Realtime Compute for Apache Flink untuk memastikan konektivitas jaringan.

  • Versi MySQL yang didukung: 5.6, 5.7, dan 8.0.x.

  • Aktifkan binary logging (binlog). Ini diaktifkan secara default.

  • Atur format binlog ke ROW. Ini adalah format default.

  • Atur binlog_row_image ke FULL. Ini adalah pengaturan default.

  • Nonaktifkan Binary Log Transaction Compression. Fitur ini diperkenalkan di MySQL 8.0.20 dan dinonaktifkan secara default.

  • Pengguna MySQL telah dibuat dengan izin SELECT, SHOW DATABASES, REPLICATION SLAVE, dan REPLICATION CLIENT.

  • Buat database dan tabel MySQL. Untuk informasi selengkapnya, lihat Buat database dan akun untuk RDS MySQL. Kami menyarankan agar Anda menggunakan akun istimewa untuk membuat database MySQL guna menghindari kegagalan operasional akibat izin yang tidak mencukupi.

  • Konfigurasikan daftar putih IP. Untuk informasi selengkapnya, lihat Konfigurasi daftar putih untuk RDS MySQL.

PolarDB for MySQL

  • Lakukan probe jaringan dengan Realtime Compute for Apache Flink untuk memastikan konektivitas jaringan.

  • Versi MySQL yang didukung: 5.6, 5.7, dan 8.0.x.

  • Aktifkan binary logging (binlog). Ini dinonaktifkan secara default.

  • Atur format binlog ke ROW. Ini adalah format default.

  • Atur binlog_row_image ke FULL. Ini adalah pengaturan default.

  • Nonaktifkan Binary Log Transaction Compression. Fitur ini diperkenalkan di MySQL 8.0.20 dan dinonaktifkan secara default.

  • Anda telah membuat pengguna MySQL dan memberikan izin SELECT, SHOW DATABASES, REPLICATION SLAVE, dan REPLICATION CLIENT.

  • Buat database dan tabel MySQL. Untuk informasi selengkapnya, lihat Buat database dan akun untuk PolarDB for MySQL. Kami menyarankan agar Anda menggunakan akun istimewa untuk membuat database MySQL guna menghindari kegagalan operasional akibat izin yang tidak mencukupi.

  • Konfigurasikan daftar putih IP. Untuk informasi selengkapnya, lihat Konfigurasi daftar putih untuk PolarDB for MySQL.

Self-managed MySQL

  • Lakukan probe jaringan dengan Realtime Compute for Apache Flink untuk memastikan konektivitas jaringan.

  • Versi MySQL yang didukung: 5.6, 5.7, dan 8.0.x.

  • Aktifkan binary logging (binlog). Ini dinonaktifkan secara default.

  • Atur format binlog ke ROW. Format default adalah STATEMENT.

  • Atur binlog_row_image ke FULL. Ini adalah pengaturan default.

  • Nonaktifkan Binary Log Transaction Compression. Fitur ini diperkenalkan di MySQL 8.0.20 dan dinonaktifkan secara default.

  • Pengguna MySQL telah dibuat dan diberikan izin SELECT, SHOW DATABASES, REPLICATION SLAVE, dan REPLICATION CLIENT.

  • Buat database dan tabel MySQL. Untuk informasi selengkapnya, lihat Buat database dan akun untuk instans MySQL yang dikelola sendiri. Kami menyarankan agar Anda menggunakan akun istimewa untuk membuat database MySQL guna menghindari kegagalan operasional akibat izin yang tidak mencukupi.

  • Konfigurasikan daftar putih IP. Untuk informasi selengkapnya, lihat Konfigurasi daftar putih untuk instans MySQL yang dikelola sendiri.

Batasan

Batasan umum

  • Tabel sumber MySQL CDC tidak mendukung pendefinisian watermark.

  • Pada pekerjaan CREATE TABLE AS SELECT (CTAS) dan CREATE DATABASE AS SELECT (CDAS), tabel sumber MySQL CDC dapat menyinkronkan perubahan skema parsial. Untuk informasi selengkapnya tentang jenis perubahan yang didukung, lihat Kebijakan sinkronisasi evolusi skema.

  • Konektor MySQL CDC tidak mendukung Binary Log Transaction Compression. Oleh karena itu, saat menggunakan konektor MySQL CDC untuk mengonsumsi data inkremental, Anda harus memastikan fitur ini dinonaktifkan. Jika tidak, data inkremental mungkin gagal diambil.

RDS MySQL batasan

  • Kami tidak menyarankan membaca data dari database secondary atau replika read-only untuk RDS MySQL. Secara default, periode retensi binlog untuk instans ini singkat. Jika binlog kedaluwarsa dan dihapus, pekerjaan mungkin gagal mengonsumsi data binlog dan kemudian melaporkan error.

  • RDS MySQL secara default mengaktifkan sinkronisasi paralel antara database primary dan secondary, tetapi tidak menjamin konsistensi urutan transaksi. Hal ini dapat menyebabkan beberapa data terlewat selama alih bencana primary/secondary dan pemulihan checkpoint. Untuk mengatasi masalah ini, Anda dapat mengaktifkan opsi slave_preserve_commit_order secara manual di RDS MySQL.

PolarDB for MySQL batasan

Tabel sumber MySQL CDC tidak mendukung pembacaan dari Kluster Multi-master PolarDB for MySQL versi 1.0.19 atau lebih lama. Untuk informasi selengkapnya, lihat Apa itu Kluster Multi-master?. Binlog yang dihasilkan oleh kluster ini mungkin berisi ID tabel duplikat. Hal ini dapat menyebabkan kesalahan pemetaan skema pada tabel sumber CDC dan mengakibatkan kesalahan penguraian binlog.

Open-source MySQL batasan

Secara default, MySQL mempertahankan urutan transaksi selama replikasi binlog primary-replika. Jika replika MySQL mengaktifkan replikasi paralel (slave_parallel_workers > 1) tetapi tidak memiliki slave_preserve_commit_order=ON, urutan commit transaksinya mungkin berbeda dari database primary. Saat Flink CDC pulih dari checkpoint, data mungkin terlewat karena ketidakkonsistenan urutan ini. Kami menyarankan agar Anda mengatur slave_preserve_commit_order = ON pada replika MySQL atau mengatur slave_parallel_workers = 1. Perlu diperhatikan bahwa mengatur slave_parallel_workers ke 1 dapat mengurangi kinerja replikasi.

Catatan

  • Konfigurasikan secara eksplisit Server ID yang berbeda untuk setiap sumber data MySQL CDC

    Tujuan Server ID

    Anda harus mengonfigurasikan secara eksplisit Server ID yang berbeda untuk setiap sumber data MySQL CDC. Jika beberapa sumber data MySQL CDC berbagi Server ID yang sama dan tidak dapat menggunakan kembali koneksi, offset binlog akan menjadi tidak teratur. Hal ini mengakibatkan pembacaan berlebih atau kurang.

    Konfigurasi Server ID dalam berbagai skenario

    Anda dapat menentukan Server ID dalam pernyataan DDL, tetapi kami menyarankan agar Anda mengonfigurasinya menggunakan dynamic hints daripada dalam opsi DDL.

    • Paralelisme = 1 atau snapshot inkremental dinonaktifkan

      ## Tentukan Server ID spesifik saat framework snapshot inkremental tidak diaktifkan atau paralelisme adalah 1.
      SELECT * FROM source_table /*+ OPTIONS('server-id'='123456') */ ;
    • Paralelisme > 1 dan snapshot inkremental diaktifkan

      ## Tentukan rentang Server ID. Pastikan jumlah Server ID yang tersedia dalam rentang tersebut minimal sama dengan tingkat paralelisme. Misalnya, paralelisme adalah 3.
      SELECT * FROM source_table /*+ OPTIONS('server-id'='123456-123458') */ ;
    • Sinkronisasi data dengan CTAS

      Saat Anda menggunakan CTAS untuk sinkronisasi data dan sumber data CDC memiliki konfigurasi identik, sumber data akan digunakan kembali secara otomatis. Dalam kasus ini, Anda dapat mengonfigurasi Server ID yang sama untuk beberapa sumber data CDC. Untuk informasi selengkapnya, lihat Contoh 4: Beberapa pernyataan CTAS.

    • Beberapa tabel sumber non-CTAS yang tidak dapat digunakan kembali

      Jika pekerjaan berisi beberapa tabel sumber MySQL CDC dan tidak menggunakan pernyataan CTAS untuk sinkronisasi, sumber data tidak dapat digunakan kembali. Dalam kasus ini, Anda harus memberikan Server ID yang berbeda untuk setiap tabel sumber CDC. Demikian pula, jika framework snapshot inkremental diaktifkan dan paralelisme lebih besar dari 1, Anda harus menentukan rentang Server ID.

      select * from 
        source_table1 /*+ OPTIONS('server-id'='123456-123457') */
      left join 
        source_table2 /*+ OPTIONS('server-id'='123458-123459') */
      on source_table1.id=source_table2.id;
  • Tabel sink

    • Jangan deklarasikan primary key auto-increment dalam pernyataan DDL. MySQL akan mengisi bidang ini secara otomatis saat menulis data.

    • Anda harus mendeklarasikan setidaknya satu bidang non-primary key. Jika tidak, akan terjadi error.

    • NOT ENFORCED dalam pernyataan DDL menunjukkan bahwa Flink tidak menegakkan pemeriksaan validitas pada primary key. Anda harus memastikan kebenaran dan integritas primary key. Untuk informasi selengkapnya, lihat Pemeriksaan Validitas.

  • Tabel dimensi

    Untuk menggunakan indeks guna mempercepat kueri, urutan bidang dalam klausa JOIN harus sesuai dengan urutan definisi indeks. Ini dikenal sebagai aturan prefiks paling kiri (leftmost prefix rule). Misalnya, jika indeks berada pada (a, b, c), kondisi JOIN harus berupa ON t.a = x AND t.b = y.

    SQL yang dihasilkan Flink mungkin ditulis ulang oleh pengoptimal, sehingga mencegah penggunaan indeks selama kueri database aktual. Untuk memverifikasi apakah indeks digunakan, Anda dapat memeriksa rencana eksekusi (EXPLAIN) atau log kueri lambat di MySQL untuk melihat pernyataan SELECT aktual yang dieksekusi.

SQL

Anda dapat menggunakan konektor MySQL dalam pekerjaan SQL sebagai tabel sumber, tabel dimensi, atau tabel sink.

Sintaks

CREATE TEMPORARY TABLE mysqlcdc_source (
   order_id INT,
   order_date TIMESTAMP(0),
   customer_name STRING,
   price DECIMAL(10, 5),
   product_id INT,
   order_status BOOLEAN,
   PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
  'connector' = 'mysql',
  'hostname' = '<yourHostname>',
  'port' = '3306',
  'username' = '<yourUsername>',
  'password' = '<yourPassword>',
  'database-name' = '<yourDatabaseName>',
  'table-name' = '<yourTableName>'
);

Catatan
  • Cara konektor menulis ke tabel sink: Untuk setiap record yang diterima, konektor membuat dan mengeksekusi satu pernyataan SQL. Pernyataan tepatnya bergantung pada struktur tabel:

    • Untuk tabel sink tanpa primary key, sistem membuat dan mengeksekusi pernyataan SQL berikut: INSERT INTO table_name (column1, column2, ...) VALUES (value1, value2, ...);

    • Untuk tabel hasil dengan primary key, sistem mengeksekusi pernyataan SQL berikut: INSERT INTO table_name (column1, column2, ...) VALUES (value1, value2, ...) ON DUPLICATE KEY UPDATE column1 = VALUES(column1), column2 = VALUES(column2), ...; Catatan: Jika tabel fisik memiliki kendala indeks unik selain primary key, memasukkan dua record dengan primary key berbeda tetapi nilai identik pada kolom yang dicakup oleh indeks unik menyebabkan konflik indeks unik. Konflik ini memicu penimpaan data, yang mengakibatkan kehilangan data dalam output.

  • Jika Anda mendefinisikan primary key auto-increment di database MySQL, jangan deklarasikan kolom auto-increment dalam pernyataan DDL Flink. Database akan mengisi bidang ini secara otomatis selama penyisipan data. Konektor mendukung penulisan dan penghapusan data dengan kolom auto-increment tetapi tidak mendukung pembaruan.

DENGAN parameter

  • Umum

    Parameter

    Deskripsi

    Wajib

    Tipe data

    Nilai default

    Keterangan

    connector

    Jenis tabel.

    Ya

    STRING

    Tidak ada

    Saat digunakan sebagai tabel sumber, atur opsi ini ke mysql-cdc atau mysql. Keduanya setara. Saat digunakan sebagai tabel dimensi atau sink, atur opsi ini ke mysql.

    hostname

    Alamat IP atau hostname database MySQL.

    Ya

    STRING

    Tidak ada

    Kami menyarankan memasukkan alamat VPC.

    Catatan

    Jika database MySQL dan Realtime Compute for Apache Flink tidak berada dalam VPC yang sama, buat koneksi jaringan cross-VPC atau gunakan Internet untuk akses. Untuk informasi selengkapnya, lihat Kelola dan operasikan ruang kerja dan Bagaimana kluster Flink yang sepenuhnya dikelola mengakses Internet?.

    username

    Username untuk layanan database MySQL.

    Ya

    STRING

    Tidak ada

    Tidak ada.

    password

    Password untuk layanan database MySQL.

    Ya

    STRING

    Tidak ada

    Tidak ada.

    database-name

    Nama database MySQL.

    Ya

    STRING

    Tidak ada

    • Saat digunakan sebagai tabel sumber, opsi ini mendukung ekspresi reguler untuk membaca data dari beberapa database.

    • Saat menggunakan ekspresi reguler, hindari penggunaan ^ dan $ untuk mencocokkan awal dan akhir string. Lihat kolom Keterangan untuk table-name untuk detailnya.

    table-name

    Nama tabel MySQL.

    Ya

    STRING

    Tidak ada

    • Saat digunakan sebagai tabel sumber, opsi ini mendukung ekspresi reguler untuk membaca data dari beberapa tabel.

      Saat membaca data dari beberapa tabel MySQL, kirimkan beberapa pernyataan CTAS sebagai satu pekerjaan tunggal. Hal ini menghindari pengaktifan beberapa pendengar binlog dan meningkatkan kinerja serta efisiensi. Untuk informasi selengkapnya, lihat Beberapa pernyataan CTAS: Kirimkan sebagai satu pekerjaan tunggal.

    • Saat menggunakan ekspresi reguler, hindari penggunaan ^ dan $ untuk mencocokkan awal dan akhir string. Lihat catatan di bawah untuk detailnya.

    Catatan

    Saat tabel sumber MySQL CDC mencocokkan nama tabel, sistem menggabungkan database-name dan table-name yang Anda tentukan menjadi ekspresi reguler jalur lengkap menggunakan string \\. (Dalam versi VVR sebelum 8.0.1, karakter . digunakan.) Sistem kemudian menggunakan ekspresi reguler ini untuk mencocokkan nama tabel yang memenuhi syarat penuh di database MySQL.

    Misalnya, jika Anda mengatur 'database-name'='db_.*' dan 'table-name'='tb_.+', konektor menggunakan ekspresi reguler db_.*\\.tb_.+ (atau db_.*.tb_.+ dalam versi sebelum 8.0.1) untuk mencocokkan nama tabel yang memenuhi syarat penuh dan menentukan tabel mana yang akan dibaca.

    port

    Nomor port layanan database MySQL.

    Tidak

    INTEGER

    3306

    Tidak ada.

  • Khusus sumber

    Parameter

    Deskripsi

    Wajib

    Tipe data

    Nilai default

    Keterangan

    server-id

    ID numerik untuk klien database.

    Tidak

    STRING

    Nilai acak antara 5400 dan 6400 dihasilkan.

    ID ini harus unik secara global dalam kluster MySQL. Kami menyarankan memberikan ID berbeda untuk setiap pekerjaan yang terhubung ke database yang sama.

    Opsi ini juga mendukung rentang ID, seperti 5400-5408. Saat pembacaan inkremental diaktifkan, pembacaan konkuren didukung. Dalam kasus ini, kami menyarankan menentukan rentang ID agar setiap pembaca konkuren menggunakan ID berbeda. Untuk informasi selengkapnya, lihat Menggunakan server ID.

    scan.incremental.snapshot.enabled

    Menentukan apakah snapshot inkremental diaktifkan.

    Tidak

    BOOLEAN

    true

    Snapshot inkremental diaktifkan secara default. Snapshot inkremental adalah mekanisme baru untuk membaca snapshot data lengkap. Dibandingkan dengan metode snapshot lama, snapshot inkremental menawarkan beberapa keunggulan:

    • Pembacaan data lengkap dapat dilakukan secara paralel.

    • Pembacaan data lengkap mendukung checkpoint tingkat chunk.

    • Pembacaan data lengkap tidak memerlukan penguncian baca global (FLUSH TABLES WITH READ LOCK).

    Jika Anda ingin sumber mendukung pembacaan konkuren, setiap pembaca konkuren memerlukan server ID unik. Oleh karena itu, server-id harus berupa rentang seperti 5400-6400, dan rentang tersebut harus minimal sebesar tingkat paralelisme.

    Catatan

    Item konfigurasi ini dihapus di Ververica Runtime (VVR) 11.1 dan versi selanjutnya.

    scan.incremental.snapshot.chunk.size

    Ukuran setiap chunk dalam baris.

    Tidak

    INTEGER

    8096

    Saat pembacaan snapshot inkremental diaktifkan, tabel dibagi menjadi beberapa chunk untuk dibaca. Data chunk disimpan dalam memori hingga sepenuhnya dibaca.

    Semakin sedikit baris dalam setiap chunk, semakin banyak total chunk dalam tabel. Meskipun hal ini mengurangi granularitas pemulihan kesalahan, hal ini dapat menyebabkan error Out Of Memory (OOM) dan penurunan throughput keseluruhan. Oleh karena itu, Anda perlu membuat pertimbangan dan mengatur ukuran chunk yang wajar.

    scan.snapshot.fetch.size

    Jumlah maksimum record yang diambil sekaligus saat membaca data tabel lengkap.

    Tidak

    INTEGER

    1024

    Tidak ada.

    scan.startup.mode

    Mode startup untuk konsumsi data.

    Tidak

    STRING

    initial

    Nilai yang valid:

    • initial (default): Memindai data historis lengkap terlebih dahulu, lalu membaca data binlog terbaru saat startup pertama kali.

    • latest-offset: Tidak memindai data historis saat startup pertama kali. Mulai membaca dari akhir binlog, artinya hanya membaca perubahan terbaru setelah konektor dimulai.

    • earliest-offset: Tidak memindai data historis. Mulai membaca dari offset binlog paling awal yang tersedia.

    • specific-offset: Tidak memindai data historis. Mulai dari offset binlog tertentu yang Anda tentukan. Anda dapat menentukan offset dengan mengonfigurasi scan.startup.specific-offset.file dan scan.startup.specific-offset.pos, atau hanya mengonfigurasi scan.startup.specific-offset.gtid-set untuk memulai dari set GTID tertentu.

    • timestamp: Tidak memindai data historis. Mulai membaca event binlog dari timestamp tertentu. Tentukan timestamp menggunakan scan.startup.timestamp-millis, dalam milidetik.

    Penting

    Saat menggunakan earliest-offset, specific-offset, atau timestamp, pastikan skema tabel tidak berubah antara posisi konsumsi binlog yang ditentukan dan waktu startup pekerjaan untuk menghindari error akibat ketidakcocokan skema.

    scan.startup.specific-offset.file

    Nama file binlog untuk offset awal saat menggunakan mode startup offset spesifik.

    Tidak

    STRING

    Tidak ada

    Saat menggunakan konfigurasi ini, atur scan.startup.mode ke specific-offset. Contoh nama file: mysql-bin.000003.

    scan.startup.specific-offset.pos

    Offset dalam file binlog yang ditentukan untuk offset awal saat menggunakan mode startup offset spesifik.

    Tidak

    INTEGER

    Tidak ada

    Saat menggunakan konfigurasi ini, atur scan.startup.mode ke specific-offset.

    scan.startup.specific-offset.gtid-set

    Set GTID untuk offset awal saat menggunakan mode startup offset spesifik.

    Tidak

    STRING

    Tidak ada

    Saat menggunakan konfigurasi ini, atur scan.startup.mode ke specific-offset. Contoh set GTID: 24DA167-0C0C-11E8-8442-00059A3C7B00:1-19.

    scan.startup.timestamp-millis

    Offset awal sebagai timestamp milidetik saat menggunakan mode startup timestamp.

    Tidak

    LONG

    Tidak ada

    Saat menggunakan konfigurasi ini, atur scan.startup.mode ke timestamp. Timestamp dalam milidetik.

    Penting

    Saat menggunakan timestamp, MySQL CDC mencoba membaca event awal setiap file binlog untuk menentukan timestamp-nya dan menemukan file binlog yang sesuai. Pastikan file binlog untuk timestamp yang ditentukan belum dihapus dari database dan masih dapat dibaca.

    server-time-zone

    Zona waktu sesi yang digunakan oleh database.

    Tidak

    STRING

    Jika Anda tidak menentukan opsi ini, sistem menggunakan zona waktu lingkungan runtime pekerjaan Flink sebagai zona waktu server database—zona waktu wilayah yang Anda pilih.

    Contoh: Asia/Shanghai. Opsi ini mengontrol bagaimana tipe TIMESTAMP MySQL dikonversi ke tipe STRING. Untuk informasi selengkapnya, lihat Debezium temporal values.

    debezium.min.row.count.to.stream.results

    Gunakan mode pembacaan batch saat jumlah baris dalam tabel melebihi nilai ini.

    Tidak

    INTEGER

    1000

    Flink membaca data dari tabel sumber MySQL dengan cara berikut:

    • Baca lengkap: Memuat seluruh data tabel langsung ke memori. Ini cepat tetapi mengonsumsi memori sebanding dengan volume data. Jika tabel sumber sangat besar, hal ini dapat menyebabkan masalah OOM.

    • Baca batch: Membaca data secara batch, mengambil jumlah baris tetap per batch hingga semua data terbaca. Ini menghindari risiko OOM untuk tabel besar tetapi lebih lambat.

    connect.timeout

    Waktu maksimum menunggu koneksi ke server database MySQL hingga timeout sebelum mencoba lagi.

    Tidak

    DURATION

    30s

    Tidak ada.

    connect.max-retries

    Jumlah maksimum percobaan ulang setelah koneksi ke layanan database MySQL gagal.

    Tidak

    INTEGER

    3

    Tidak ada.

    connection.pool.size

    Ukuran kolam koneksi database.

    Tidak

    INTEGER

    20

    Kolam koneksi database menggunakan kembali koneksi untuk mengurangi jumlah koneksi database.

    jdbc.properties.*

    Opsi koneksi kustom dalam URL JDBC.

    Tidak

    STRING

    Tidak ada

    Anda dapat meneruskan opsi koneksi kustom. Misalnya, untuk menonaktifkan SSL, atur 'jdbc.properties.useSSL' = 'false'.

    Untuk informasi selengkapnya tentang opsi koneksi yang didukung, lihat MySQL Configuration Properties.

    debezium.*

    Opsi Debezium kustom untuk membaca binlog.

    Tidak

    STRING

    Tidak ada

    Anda dapat meneruskan opsi Debezium kustom. Misalnya, gunakan 'debezium.event.deserialization.failure.handling.mode'='ignore' untuk menentukan cara menangani error penguraian.

    heartbeat.interval

    Interval di mana sumber menggunakan event heartbeat untuk memajukan offset binlog.

    Tidak

    DURATION

    30s

    Event heartbeat memajukan offset binlog di sumber. Ini berguna untuk tabel MySQL yang diperbarui secara lambat. Untuk tabel tersebut, offset binlog tidak maju secara otomatis. Event heartbeat mendorong offset binlog maju, mencegah masalah di mana offset binlog yang kedaluwarsa menyebabkan kegagalan pekerjaan dan memerlukan restart tanpa status.

    scan.incremental.snapshot.chunk.key-column

    Kolom yang digunakan untuk membagi chunk selama fase snapshot.

    Lihat Keterangan.

    STRING

    Tidak ada

    • Wajib untuk tabel tanpa primary key. Kolom yang dipilih harus non-null (NOT NULL).

    • Opsional untuk tabel dengan primary key. Anda hanya dapat memilih satu kolom dari primary key.

    rds.region-id

    ID wilayah instans Alibaba Cloud RDS MySQL.

    Wajib saat membaca log arsip dari OSS.

    STRING

    Tidak ada

    Untuk ID wilayah, lihat Wilayah dan zona.

    rds.access-key-id

    ID AccessKey untuk akun Alibaba Cloud RDS MySQL.

    Wajib saat membaca log arsip dari OSS.

    STRING

    Tidak ada

    Untuk informasi selengkapnya, lihat Bagaimana cara melihat ID AccessKey dan rahasia AccessKey?.

    Penting

    Untuk mencegah kebocoran informasi AccessKey Anda, kelola ID AccessKey Anda menggunakan manajemen rahasia. Untuk informasi selengkapnya, lihat Manajemen variabel.

    rds.access-key-secret

    Rahasia AccessKey untuk akun Alibaba Cloud RDS MySQL.

    Wajib saat membaca log arsip dari OSS.

    STRING

    Tidak ada

    Untuk informasi selengkapnya, lihat Bagaimana cara melihat ID AccessKey dan rahasia AccessKey?.

    Penting

    Untuk mencegah kebocoran informasi AccessKey Anda, kelola rahasia AccessKey Anda menggunakan manajemen rahasia. Untuk informasi selengkapnya, lihat Manajemen variabel.

    rds.db-instance-id

    ID instans Alibaba Cloud RDS MySQL.

    Wajib saat membaca log arsip dari OSS.

    STRING

    Tidak ada

    Tidak ada.

    rds.main-db-id

    ID database utama instans Alibaba Cloud RDS MySQL.

    Tidak

    STRING

    Tidak ada

    • Untuk informasi selengkapnya tentang cara mendapatkan ID database utama, lihat Cadangan log RDS MySQL.

    • Hanya didukung di VVR 8.0.7 dan versi selanjutnya.

    rds.download.timeout

    Timeout untuk mengunduh satu log arsip dari OSS.

    Tidak

    DURATION

    60s

    Tidak ada.

    rds.endpoint

    Titik akhir layanan untuk mengambil informasi binlog OSS.

    Tidak

    STRING

    Tidak ada

    • Untuk nilai yang valid, lihat Titik akhir layanan.

    • Hanya didukung di VVR 8.0.8 dan versi selanjutnya.

    scan.incremental.close-idle-reader.enabled

    Menentukan apakah pembaca idle ditutup setelah fase snapshot berakhir.

    Tidak

    BOOLEAN

    false

    • Hanya didukung di VVR 8.0.1 dan versi selanjutnya.

    • Konfigurasi ini hanya berlaku saat execution.checkpointing.checkpoints-after-tasks-finish.enabled diatur ke true.

    scan.read-changelog-as-append-only.enabled

    Menentukan apakah aliran changelog diubah menjadi aliran append-only.

    Tidak

    BOOLEAN

    false

    Nilai yang valid:

    • true: Mengubah semua jenis pesan (termasuk INSERT, DELETE, UPDATE_BEFORE, UPDATE_AFTER) menjadi pesan INSERT. Aktifkan hanya dalam kasus khusus, seperti mempertahankan pesan penghapusan tabel upstream.

    • false (default): Semua jenis pesan diteruskan tanpa perubahan.

    Catatan

    Hanya didukung di VVR 8.0.8 dan versi selanjutnya.

    scan.only.deserialize.captured.tables.changelog.enabled

    Selama fase inkremental, menentukan apakah hanya mendeserialisasi event perubahan untuk tabel yang ditentukan.

    Tidak

    BOOLEAN

    • Nilai default adalah false di VVR 8.x.

    • Nilai default adalah true di VVR 11.1 dan versi selanjutnya.

    Nilai yang valid:

    • true: Hanya mendeserialisasi data perubahan untuk tabel target guna mempercepat pembacaan binlog.

    • false (default): Mendeserialisasi data perubahan untuk semua tabel.

    Catatan
    • Hanya didukung di VVR 8.0.7 dan versi selanjutnya.

    • Di VVR 8.0.8 dan versi sebelumnya, ubah nama parameter ini menjadi debezium.scan.only.deserialize.captured.tables.changelog.enable.

    scan.parse.online.schema.changes.enabled

    Selama fase inkremental, menentukan apakah mencoba mengurai event DDL lockless RDS.

    Tidak

    BOOLEAN

    false

    Nilai yang valid:

    • true: Mengurai event DDL lockless RDS.

    • false (default): Tidak mengurai event DDL lockless RDS.

    Ini adalah fitur eksperimental. Sebelum melakukan perubahan lockless online, ambil snapshot pekerjaan Flink untuk pemulihan.

    Catatan

    Hanya didukung di VVR 11.1 dan versi selanjutnya.

    scan.incremental.snapshot.backfill.skip

    Menentukan apakah melewati backfill selama fase pembacaan snapshot.

    Tidak

    BOOLEAN

    false

    Nilai yang valid:

    • true: Melewati backfill selama fase pembacaan snapshot.

    • false (default): Tidak melewati backfill selama fase pembacaan snapshot.

    Jika backfill dilewati, perubahan pada tabel selama fase snapshot dibaca dalam fase inkremental berikutnya, bukan digabungkan ke dalam snapshot.

    Penting

    Melewati backfill dapat menyebabkan ketidakkonsistenan data karena perubahan selama fase snapshot mungkin diputar ulang. Hanya semantik at-least-once yang dijamin.

    Catatan

    Hanya didukung di VVR 11.1 dan versi selanjutnya.

    scan.incremental.snapshot.unbounded-chunk-first.enabled

    Menentukan apakah mendistribusikan chunk tak terbatas terlebih dahulu selama fase pembacaan snapshot.

    Tidak

    BOOELEAN

    false

    Nilai yang valid:

    • true: Mendistribusikan chunk tak terbatas terlebih dahulu selama fase pembacaan snapshot.

    • false (default): Tidak mendistribusikan chunk tak terbatas terlebih dahulu selama fase pembacaan snapshot.

    Ini adalah fitur eksperimental. Mengaktifkannya mengurangi risiko error OOM saat Pengelola Tugas menyinkronkan chunk terakhir selama fase snapshot. Kami menyarankan menambahkan ini sebelum startup pertama pekerjaan.

    Catatan

    Hanya didukung di VVR 11.1 dan versi selanjutnya.

    binlog.session.network.timeout

    Timeout jaringan untuk operasi baca/tulis koneksi binlog.

    Tidak

    DURATION

    10m

    Mengatur ini ke 0s menggunakan timeout default server MySQL.

    Catatan

    Hanya didukung di VVR 11.5 dan versi selanjutnya.

    scan.rate-limit.records-per-second

    Membatasi jumlah maksimum record yang dipancarkan per detik oleh sumber.

    Tidak

    LONG

    Tidak ada

    Berguna untuk membatasi pembacaan data. Batasan ini berlaku untuk fase lengkap dan inkremental.

    Metrik numRecordsOutPerSecond mencerminkan jumlah record yang dipancarkan per detik di seluruh aliran data. Sesuaikan parameter ini berdasarkan metrik tersebut.

    Selama pembacaan lengkap, kurangi jumlah baris per batch dengan menurunkan nilai scan.incremental.snapshot.chunk.size.

    Catatan

    Hanya didukung di VVR 11.5 dan versi selanjutnya.

  • Khusus tabel dimensi

    Parameter

    Deskripsi

    Wajib

    Tipe data

    Nilai default

    Keterangan

    url

    URL JDBC MySQL.

    Tidak

    STRING

    Tidak ada

    Format URL adalah jdbc:mysql://<endpoint>:<port>/<database_name>.

    lookup.max-retries

    Jumlah maksimum percobaan ulang setelah pembacaan data gagal.

    Tidak

    INTEGER

    3

    Hanya didukung di VVR 6.0.7 dan versi selanjutnya.

    lookup.cache.strategy

    Kebijakan cache.

    Tidak

    STRING

    Tidak ada

    Mendukung tiga kebijakan cache: None, LRU, dan ALL. Untuk informasi selengkapnya, lihat Informasi latar belakang.

    Catatan

    Saat menggunakan kebijakan cache LRU, Anda juga harus mengonfigurasi opsi lookup.cache.max-rows.

    lookup.cache.max-rows

    Jumlah maksimum baris yang dicache.

    Tidak

    INTEGER

    100000

    • Jika Anda memilih kebijakan cache least recently used, Anda harus menentukan ukuran cache.

    • Opsional saat memilih kebijakan cache ALL.

    lookup.cache.ttl

    Waktu hidup (TTL) cache.

    Tidak

    DURATION

    10 s

    Konfigurasi lookup.cache.ttl bergantung pada lookup.cache.strategy:

    • Jika lookup.cache.strategy diatur ke None, lookup.cache.ttl opsional dan berarti cache tidak pernah kedaluwarsa.

    • Jika lookup.cache.strategy diatur ke LRU, lookup.cache.ttl adalah TTL cache. Secara default, tidak pernah kedaluwarsa.

    • Jika lookup.cache.strategy diatur ke ALL, lookup.cache.ttl adalah waktu reload cache. Secara default, tidak direload.

    Tentukan waktu dalam format seperti 1min atau 10s.

    lookup.max-join-rows

    Jumlah maksimum hasil yang dikembalikan saat mengkueri tabel dimensi untuk setiap baris di tabel utama.

    Tidak

    INTEGER

    1024

    Tidak ada.

    lookup.filter-push-down.enabled

    Menentukan apakah mengaktifkan filter pushdown untuk tabel dimensi.

    Tidak

    BOOLEAN

    false

    Nilai yang valid:

    • true: Mengaktifkan filter pushdown untuk tabel dimensi. Saat memuat data dari tabel database MySQL, tabel dimensi memfilter data terlebih dahulu berdasarkan kondisi yang ditetapkan dalam pekerjaan SQL.

    • false (default): Menonaktifkan filter pushdown untuk tabel dimensi. Saat memuat data dari tabel database MySQL, tabel dimensi memuat semua data.

    Catatan

    Hanya didukung di VVR 8.0.7 dan versi selanjutnya.

    Penting

    Filter pushdown harus diaktifkan hanya saat tabel Flink digunakan sebagai tabel dimensi. Tabel sumber MySQL tidak mendukung pengaktifan filter pushdown. Jika tabel Flink digunakan sebagai tabel sumber dan dimensi, dan filter pushdown diaktifkan untuk tabel dimensi, atur secara eksplisit opsi ini ke false untuk tabel sumber menggunakan SQL hints. Jika tidak, pekerjaan mungkin berjalan tidak normal.

  • Khusus sink

    Parameter

    Deskripsi

    Wajib

    Tipe data

    Nilai default

    Keterangan

    url

    URL JDBC MySQL.

    Tidak

    STRING

    Tidak ada

    Format URL adalah jdbc:mysql://<endpoint>:<port>/<database_name>.

    sink.max-retries

    Jumlah maksimum percobaan ulang setelah penulisan data gagal.

    Tidak

    INTEGER

    3

    Tidak ada.

    sink.buffer-flush.batch-size

    Jumlah record yang ditulis dalam satu batch.

    Tidak

    INTEGER

    4096

    Tidak ada.

    sink.buffer-flush.max-rows

    Jumlah record data yang dibuffer di memori.

    Tidak

    INTEGER

    10000

    Opsi ini hanya berlaku setelah primary key ditentukan.

    sink.buffer-flush.interval

    Interval waktu untuk flushing buffer. Jika data dalam buffer tidak memenuhi kondisi output setelah menunggu waktu yang ditentukan, sistem secara otomatis mengeluarkan semua data yang dibuffer.

    Tidak

    DURATION

    1s

    Tidak ada.

    sink.ignore-delete

    Menentukan apakah mengabaikan operasi DELETE.

    Tidak

    BOOLEAN

    false

    Saat aliran yang dihasilkan oleh Flink SQL mencakup record delete atau update-before, pembaruan simultan ke bidang berbeda dari tabel yang sama oleh beberapa tugas output dapat menyebabkan ketidakkonsistenan data.

    Misalnya, setelah record dihapus, tugas lain hanya memperbarui beberapa bidang. Bidang yang tidak diperbarui menjadi null atau nilai default-nya, menyebabkan error data.

    Atur sink.ignore-delete ke true untuk mengabaikan operasi DELETE dan UPDATE_BEFORE upstream dan menghindari masalah tersebut.

    Catatan
    • UPDATE_BEFORE adalah bagian dari mekanisme retraksi Flink, digunakan untuk "menarik kembali" nilai lama dalam operasi pembaruan.

    • Saat ignoreDelete = true, semua record DELETE dan UPDATE_BEFORE dilewati. Hanya record INSERT dan UPDATE_AFTER yang diproses.

    sink.ignore-null-when-update

    Saat memperbarui data, menentukan apakah mengatur bidang yang sesuai ke null atau melewati pembaruan jika nilai bidang input adalah null.

    Tidak

    BOOLEAN

    false

    Nilai yang valid:

    • true: Melewati pembaruan bidang. Hanya didukung saat tabel Flink memiliki primary key. Saat diatur ke true:

      • Di VVR 8.0.6 dan versi sebelumnya, eksekusi batch tidak didukung untuk menulis data ke tabel sink.

      • Di VVR 8.0.7 dan versi selanjutnya, eksekusi batch didukung untuk menulis data ke tabel sink.

        Meskipun penulisan batch meningkatkan efisiensi penulisan dan throughput keseluruhan, hal ini memperkenalkan latensi data dan risiko OOM. Seimbangkan pertimbangan ini berdasarkan skenario bisnis Anda.

    • false: Mengatur bidang ke null.

    Catatan

    Hanya didukung di VVR 8.0.5 dan versi selanjutnya.

Pemetaan tipe

  • Tabel sumber CDC

    Tipe bidang MySQL CDC

    Tipe bidang Flink

    TINYINT

    TINYINT

    SMALLINT

    SMALLINT

    TINYINT UNSIGNED

    TINYINT UNSIGNED ZEROFILL

    INT

    INT

    MEDIUMINT

    SMALLINT UNSIGNED

    SMALLINT UNSIGNED ZEROFILL

    BIGINT

    BIGINT

    INT UNSIGNED

    INT UNSIGNED ZEROFILL

    MEDIUMINT UNSIGNED

    MEDIUMINT UNSIGNED ZEROFILL

    BIGINT UNSIGNED

    DECIMAL(20, 0)

    BIGINT UNSIGNED ZEROFILL

    SERIAL

    FLOAT [UNSIGNED] [ZEROFILL]

    FLOAT

    DOUBLE [UNSIGNED] [ZEROFILL]

    DOUBLE

    DOUBLE PRECISION [UNSIGNED] [ZEROFILL]

    REAL [UNSIGNED] [ZEROFILL]

    NUMERIC(p, s) [UNSIGNED] [ZEROFILL]

    DECIMAL(p, s)

    DECIMAL(p, s) [UNSIGNED] [ZEROFILL]

    BOOLEAN

    BOOLEAN

    TINYINT(1)

    DATE

    DATE

    TIME [(p)]

    TIME [(p)] [WITHOUT TIME ZONE]

    DATETIME [(p)]

    TIMESTAMP [(p)] [WITHOUT TIME ZONE]

    TIMESTAMP [(p)]

    TIMESTAMP [(p)]

    TIMESTAMP [(p)] WITH LOCAL TIME ZONE

    CHAR(n)

    STRING

    VARCHAR(n)

    TEXT

    BINARY

    BYTES

    VARBINARY

    BLOB

    Penting

    Kami menyarankan agar Anda tidak menggunakan tipe TINYINT(1) di MySQL untuk menyimpan nilai selain 0 dan 1. Saat property-version=0, tabel sumber MySQL CDC memetakan TINYINT(1) ke tipe BOOLEAN Flink secara default. Hal ini dapat menyebabkan ketidakakuratan data. Untuk menggunakan TINYINT(1) guna menyimpan nilai selain 0 dan 1, lihat opsi konfigurasi catalog.table.treat-tinyint1-as-boolean.

  • Tabel dimensi dan tabel sink

    Tipe bidang MySQL

    Tipe bidang Flink

    TINYINT

    TINYINT

    SMALLINT

    SMALLINT

    TINYINT UNSIGNED

    INT

    INT

    MEDIUMINT

    SMALLINT UNSIGNED

    BIGINT

    BIGINT

    INT UNSIGNED

    BIGINT UNSIGNED

    DECIMAL(20, 0)

    FLOAT

    FLOAT

    DOUBLE

    DOUBLE

    DOUBLE PRECISION

    NUMERIC(p, s)

    DECIMAL(p, s)

    Catatan

    p harus ≤ 38.

    DECIMAL(p, s)

    BOOLEAN

    BOOLEAN

    TINYINT(1)

    DATE

    DATE

    TIME [(p)]

    TIME [(p)] [WITHOUT TIME ZONE]

    DATETIME [(p)]

    TIMESTAMP [(p)] [WITHOUT TIME ZONE]

    TIMESTAMP [(p)]

    CHAR(n)

    CHAR(n)

    VARCHAR(n)

    VARCHAR(n)

    BIT(n)

    BINARY(⌈n/8⌉)

    BINARY(n)

    BINARY(n)

    VARBINARY(N)

    VARBINARY(N)

    TINYTEXT

    STRING

    TEXT

    MEDIUMTEXT

    LONGTEXT

    TINYBLOB

    BYTES

    Penting

    Flink mendukung record BLOB MySQL dengan ukuran maksimum 2.147.483.647 (2^31 - 1).

    BLOB

    MEDIUMBLOB

    LONGBLOB

Data ingestion

Anda dapat menggunakan konektor MySQL sebagai sumber data dalam pekerjaan YAML data ingestion.

Sintaks

source:
   type: mysql
   name: MySQL Source
   hostname: localhost
   port: 3306
   username: <username>
   password: <password>
   tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
   server-id: 5401-5404

sink:
  type: xxx

Item Konfigurasi

Parameter

Deskripsi

Wajib

Tipe data

Nilai default

Keterangan

type

Jenis sumber data.

Ya

STRING

Tidak ada

Atur opsi ini ke mysql.

name

Nama sumber data.

Tidak

STRING

Tidak ada

Tidak ada.

hostname

Alamat IP atau hostname database MySQL.

Ya

STRING

Tidak ada

Kami menyarankan memasukkan alamat Virtual Private Cloud (VPC).

Catatan

Jika database MySQL dan Realtime Compute for Apache Flink Anda tidak berada dalam VPC yang sama, buat koneksi jaringan cross-VPC atau gunakan Internet untuk akses. Untuk informasi selengkapnya, lihat Kelola dan operasikan ruang kerja dan Bagaimana kluster Flink yang sepenuhnya dikelola mengakses Internet?.

username

Username untuk layanan database MySQL.

Ya

STRING

Tidak ada

Tidak ada.

password

Password untuk layanan database MySQL.

Ya

STRING

Tidak ada

Tidak ada.

tables

Tabel data MySQL yang akan disinkronkan.

Ya

STRING

Tidak ada

  • Nama tabel mendukung ekspresi reguler untuk membaca data dari beberapa tabel.

  • Gunakan koma untuk memisahkan beberapa ekspresi reguler.

Catatan
  • Jangan gunakan karakter pencocokan awal dan akhir ^ dan $ dalam ekspresi reguler. Di versi 11.2, ekspresi reguler database diperoleh dengan memisahkan titik. Karakter pencocokan awal dan akhir membuat ekspresi reguler database yang dihasilkan tidak valid. Misalnya, ubah ^db.user_[0-9]+$ menjadi db.user_[0-9]+.

  • Titik memisahkan nama database dan nama tabel. Untuk mencocokkan karakter apa pun dengan titik, escape dengan backslash. Contoh: db0.\.*, db1.user_table_[0-9]+, db[1-2].[app|web]order_\.*.

tables.exclude

Tabel yang dikecualikan dari sinkronisasi.

Tidak

STRING

Tidak ada

  • Nama tabel mendukung ekspresi reguler untuk mengecualikan data dari beberapa tabel.

  • Gunakan koma untuk memisahkan beberapa ekspresi reguler.

Catatan

Titik memisahkan nama database dan nama tabel. Untuk mencocokkan karakter apa pun dengan titik, escape dengan backslash. Contoh: db0.\.*, db1.user_table_[0-9]+, db[1-2].[app|web]order_\.*.

port

Nomor port layanan database MySQL.

Tidak

INTEGER

3306

Tidak ada.

schema-change.enabled

Menentukan apakah mengirim event perubahan skema.

Tidak

BOOLEAN

true

Tidak ada.

server-id

ID numerik atau rentang yang digunakan oleh klien database untuk sinkronisasi.

Tidak

STRING

Nilai acak antara 5400 dan 6400 dihasilkan.

ID ini harus unik secara global dalam kluster MySQL. Kami menyarankan memberikan ID berbeda untuk setiap pekerjaan yang terhubung ke database yang sama.

Opsi ini juga mendukung rentang ID, seperti 5400-5408. Saat pembacaan inkremental diaktifkan, pembacaan konkuren didukung. Dalam kasus ini, kami menyarankan menentukan rentang ID agar setiap pembaca konkuren menggunakan ID berbeda.

jdbc.properties.*

Parameter koneksi kustom dalam URL JDBC.

Tidak

STRING

Tidak ada

Anda dapat meneruskan parameter koneksi kustom. Misalnya, untuk menonaktifkan SSL, atur 'jdbc.properties.useSSL' = 'false'.

Untuk informasi selengkapnya tentang parameter koneksi yang didukung, lihat MySQL Configuration Properties.

debezium.*

Parameter Debezium kustom untuk membaca log biner.

Tidak

STRING

Tidak ada

Anda dapat meneruskan parameter Debezium kustom. Misalnya, gunakan 'debezium.event.deserialization.failure.handling.mode'='ignore' untuk menentukan cara menangani error penguraian.

scan.incremental.snapshot.chunk.size

Ukuran setiap chunk dalam baris.

Tidak

INTEGER

8096

Tabel MySQL dibagi menjadi beberapa chunk untuk dibaca. Data chunk disimpan dalam memori hingga sepenuhnya dibaca.

Semakin sedikit baris dalam setiap chunk, semakin banyak total chunk dalam tabel. Meskipun hal ini mengurangi granularitas pemulihan kesalahan, hal ini dapat menyebabkan masalah kehabisan memori (OOM) dan mengurangi throughput keseluruhan. Oleh karena itu, Anda perlu menyeimbangkan faktor-faktor ini dan mengatur ukuran chunk yang sesuai.

scan.snapshot.fetch.size

Jumlah maksimum record yang diambil sekaligus saat membaca data tabel lengkap.

Tidak

INTEGER

1024

Tidak ada.

scan.startup.mode

Mode startup untuk konsumsi data.

Tidak

STRING

initial

Nilai yang valid:

  • initial (default): Memindai data historis lengkap terlebih dahulu, lalu membaca data binlog terbaru saat startup pertama kali.

  • latest-offset: Tidak memindai data historis saat startup pertama kali. Mulai membaca dari akhir binlog, artinya hanya membaca perubahan yang terjadi setelah konektor dimulai.

  • earliest-offset: Tidak memindai data historis. Mulai membaca dari binlog paling awal yang tersedia.

  • specific-offset: Tidak memindai data historis. Mulai dari offset binlog tertentu yang Anda tentukan. Anda dapat menentukan offset menggunakan scan.startup.specific-offset.file dan scan.startup.specific-offset.pos, atau hanya mengonfigurasi scan.startup.specific-offset.gtid-set untuk memulai dari set GTID tertentu.

  • timestamp: Tidak memindai data historis. Mulai membaca event binlog dari timestamp tertentu. Tentukan timestamp menggunakan scan.startup.timestamp-millis, dalam milidetik.

Penting

Untuk earliest-offset, specific-offset, dan timestamp, jika skema tabel berbeda antara waktu startup dan waktu offset awal yang ditentukan, pekerjaan gagal karena ketidakcocokan skema. Dengan kata lain, saat menggunakan ketiga mode startup ini, pastikan skema tabel tidak berubah antara posisi konsumsi binlog yang ditentukan dan waktu startup pekerjaan.

scan.startup.specific-offset.file

Nama file binlog untuk offset awal saat menggunakan mode startup offset spesifik.

Tidak

STRING

Tidak ada

Saat menggunakan konfigurasi ini, atur scan.startup.mode ke specific-offset. Contoh nama file: mysql-bin.000003.

scan.startup.specific-offset.pos

Offset dalam file binlog yang ditentukan untuk offset awal saat menggunakan mode startup offset spesifik.

Tidak

INTEGER

Tidak ada

Saat menggunakan konfigurasi ini, atur scan.startup.mode ke specific-offset.

scan.startup.specific-offset.gtid-set

Set GTID untuk offset awal saat menggunakan mode startup offset spesifik.

Tidak

STRING

Tidak ada

Saat menggunakan konfigurasi ini, atur scan.startup.mode ke specific-offset. Contoh set GTID: 24DA167-0C0C-11E8-8442-00059A3C7B00:1-19.

scan.startup.timestamp-millis

Offset awal sebagai timestamp milidetik saat menggunakan mode startup timestamp.

Tidak

LONG

Tidak ada

Saat menggunakan konfigurasi ini, atur scan.startup.mode ke timestamp. Timestamp dalam milidetik.

Penting

Saat menggunakan timestamp, MySQL CDC mencoba membaca event awal setiap file binlog untuk menentukan timestamp-nya dan menemukan file binlog yang sesuai. Pastikan file binlog untuk timestamp yang ditentukan belum dihapus dari database dan masih dapat dibaca.

server-time-zone

Zona waktu sesi yang digunakan oleh database.

Tidak

STRING

Jika Anda tidak menentukan opsi ini, sistem menggunakan zona waktu lingkungan runtime pekerjaan Flink sebagai zona waktu server database—zona waktu wilayah yang Anda pilih.

Contoh: Asia/Shanghai. Opsi ini mengontrol bagaimana tipe TIMESTAMP MySQL dikonversi ke tipe STRING. Untuk informasi selengkapnya, lihat Debezium temporal values.

scan.startup.specific-offset.skip-events

Jumlah event binlog yang dilewati saat membaca dari offset tertentu.

Tidak

INTEGER

Tidak ada

Saat menggunakan konfigurasi ini, atur scan.startup.mode ke specific-offset.

scan.startup.specific-offset.skip-rows

Jumlah perubahan baris yang dilewati saat membaca dari offset tertentu. Satu event binlog mungkin sesuai dengan beberapa perubahan baris.

Tidak

INTEGER

Tidak ada

Saat menggunakan konfigurasi ini, atur scan.startup.mode ke specific-offset.

connect.timeout

Waktu maksimum menunggu koneksi ke server database MySQL hingga timeout sebelum mencoba lagi.

Tidak

DURATION

30s

Tidak ada.

connect.max-retries

Jumlah maksimum percobaan ulang setelah koneksi ke layanan database MySQL gagal.

Tidak

INTEGER

3

Tidak ada.

connection.pool.size

Ukuran kolam koneksi database.

Tidak

INTEGER

20

Kolam koneksi database menggunakan kembali koneksi untuk mengurangi jumlah koneksi database.

heartbeat.interval

Interval di mana sumber menggunakan event heartbeat untuk memajukan offset binlog.

Tidak

DURATION

30s

Event heartbeat memajukan offset binlog di sumber. Ini berguna untuk tabel MySQL yang diperbarui secara lambat. Untuk tabel tersebut, offset binlog tidak maju secara otomatis. Event heartbeat mendorong offset binlog maju, mencegah masalah di mana offset binlog yang kedaluwarsa menyebabkan kegagalan pekerjaan dan memerlukan restart tanpa status.

scan.incremental.snapshot.chunk.key-column

Kolom yang digunakan untuk membagi chunk selama fase snapshot.

Tidak.

STRING

Tidak ada

Anda hanya dapat memilih satu kolom dari primary key.

rds.region-id

ID wilayah instans Alibaba Cloud RDS MySQL.

Wajib saat membaca log arsip dari OSS.

STRING

Tidak ada

Untuk ID wilayah, lihat Wilayah dan zona.

rds.access-key-id

ID AccessKey untuk akun Alibaba Cloud RDS MySQL.

Wajib saat membaca log arsip dari OSS.

STRING

Tidak ada

Untuk informasi selengkapnya, lihat Bagaimana cara melihat ID AccessKey dan rahasia AccessKey?.

Penting

Untuk mencegah kebocoran informasi AccessKey Anda, kelola ID AccessKey Anda menggunakan manajemen rahasia. Untuk informasi selengkapnya, lihat Manajemen variabel.

rds.access-key-secret

Rahasia AccessKey untuk akun Alibaba Cloud RDS MySQL.

Wajib saat membaca log arsip dari OSS.

STRING

Tidak ada

Untuk informasi selengkapnya, lihat Bagaimana cara melihat ID AccessKey dan rahasia AccessKey?.

Penting

Untuk mencegah kebocoran informasi AccessKey Anda, kelola rahasia AccessKey Anda menggunakan manajemen rahasia. Untuk informasi selengkapnya, lihat Manajemen variabel.

rds.db-instance-id

ID instans Alibaba Cloud RDS MySQL.

Wajib saat membaca log arsip dari OSS.

STRING

Tidak ada

Tidak ada.

rds.main-db-id

ID database utama instans Alibaba Cloud RDS MySQL.

Tidak

STRING

Tidak ada

Untuk informasi selengkapnya tentang cara mendapatkan ID database utama, lihat Cadangan log RDS MySQL.

rds.download.timeout

Timeout untuk mengunduh satu log arsip dari OSS.

Tidak

DURATION

60s

Tidak ada.

rds.endpoint

Titik akhir layanan untuk mengambil informasi binlog OSS.

Tidak

STRING

Tidak ada

Untuk nilai yang valid, lihat Titik akhir layanan.

rds.binlog-directory-prefix

Awalan direktori untuk menyimpan file binlog.

Tidak

STRING

rds-binlog-

Tidak ada.

rds.use-intranet-link

Menentukan apakah menggunakan jaringan internal untuk mengunduh file binlog.

Tidak

BOOLEAN

true

Tidak ada.

rds.binlog-directories-parent-path

Jalur mutlak direktori induk untuk menyimpan file binlog.

Tidak

STRING

Tidak ada

Tidak ada.

chunk-meta.group.size

Ukuran metadata chunk.

Tidak

INTEGER

1000

Jika metadata melebihi ukuran ini, metadata dikirimkan dalam beberapa bagian.

chunk-key.even-distribution.factor.lower-bound

Batas bawah faktor distribusi chunk untuk sharding merata.

Tidak

DOUBLE

0.05

Faktor distribusi chunk yang kurang dari nilai ini menghasilkan sharding tidak merata.

Faktor distribusi chunk = (MAX(chunk-key) - MIN(chunk-key) + 1) / Jumlah total baris.

chunk-key.even-distribution.factor.upper-bound

Batas atas faktor distribusi chunk untuk sharding merata.

Tidak

DOUBLE

1000.0

Faktor distribusi chunk yang lebih besar dari nilai ini menghasilkan sharding tidak merata.

Faktor distribusi chunk = (MAX(chunk-key) - MIN(chunk-key) + 1) / Jumlah total baris.

scan.incremental.close-idle-reader.enabled

Menentukan apakah pembaca idle ditutup setelah fase snapshot berakhir.

Tidak

BOOLEAN

false

Agar konfigurasi ini berlaku, atur execution.checkpointing.checkpoints-after-tasks-finish.enabled ke true.

scan.only.deserialize.captured.tables.changelog.enabled

Selama fase inkremental, menentukan apakah hanya mendeserialisasi event perubahan untuk tabel yang ditentukan.

Tidak

BOOLEAN

  • Nilai default adalah false di VVR 8.x.

  • Nilai default adalah true di VVR 11.1 dan versi selanjutnya.

Nilai yang valid:

  • true: Hanya mendeserialisasi data perubahan untuk tabel target guna mempercepat pembacaan binlog.

  • false (default): Mendeserialisasi data perubahan untuk semua tabel.

scan.parallel-deserialize-changelog.enabled

Selama fase inkremental, menentukan apakah menggunakan multithreading untuk mengurai event perubahan.

Tidak

BOOLEAN

false

Nilai yang valid:

  • true: Menggunakan multithreading selama deserialisasi sambil mempertahankan urutan event binlog untuk mempercepat pembacaan.

  • false (default): Menggunakan satu thread selama deserialisasi.

Catatan

Hanya didukung di VVR 8.0.11 dan versi selanjutnya.

scan.parallel-deserialize-changelog.handler.size

Jumlah handler event saat menggunakan multithreading untuk mengurai event perubahan.

Tidak

INTEGER

2

Catatan

Hanya didukung di VVR 8.0.11 dan versi selanjutnya.

metadata-column.include-list

Kolom metadata yang diteruskan ke downstream.

Tidak

STRING

Tidak ada

Metadata yang tersedia meliputi op_ts, es_ts, query_log, file, dan pos. Anda dapat menggunakan koma untuk memisahkannya.

Catatan

Konektor MySQL CDC YAML tidak perlu dan tidak mendukung penambahan kolom metadata nama database, nama tabel, dan op_type. Anda dapat langsung menggunakan __data_event_type__ dalam ekspresi Transform untuk mendapatkan tipe data perubahan, atau menggunakan __schema_name__ dan __table_name__ dalam ekspresi Transform untuk mendapatkan nama database dan nama tabel.

Penting

Kolom metadata file merepresentasikan file binlog yang berisi data. Kolom ini kosong ("") selama fase lengkap dan berisi nama file binlog selama fase inkremental. Kolom metadata pos merepresentasikan offset data dalam file binlog. Kolom ini bernilai "0" selama fase lengkap dan berisi offset aktual selama fase inkremental. Kolom metadata ini didukung mulai dari VVR 11.5.

es_ts merepresentasikan waktu mulai transaksi yang terkait dengan changelog di MySQL. Hanya didukung untuk MySQL 8.0.x. Jangan tambahkan kolom metadata ini saat menggunakan versi MySQL sebelumnya.

scan.newly-added-table.enabled

Saat memulai ulang dari checkpoint, menentukan apakah menyinkronkan tabel yang baru ditambahkan yang tidak cocok pada proses sebelumnya atau menghapus tabel yang saat ini tidak cocok yang disimpan dalam state.

Tidak

BOOLEAN

false

Ini berlaku saat memulai ulang dari checkpoint atau titik simpan.

scan.binlog.newly-added-table.enabled

Selama fase inkremental, menentukan apakah mengirim data dari tabel yang baru ditambahkan yang cocok dengan pola.

Tidak

BOOLEAN

false

Ini tidak dapat diaktifkan secara bersamaan dengan scan.newly-added-table.enabled.

scan.incremental.snapshot.chunk.key-column

Tentukan kolom untuk tabel tertentu yang akan digunakan sebagai kunci pemisahan chunk selama fase snapshot.

Tidak

STRING

Tidak ada

  • Gunakan titik dua (:) untuk menggabungkan nama tabel dan nama bidang, membentuk aturan. Nama tabel dapat menggunakan ekspresi reguler. Definisikan beberapa aturan yang dipisahkan dengan titik koma (;). Contoh: db1.user_table_[0-9]+:col1;db[1-2].[app|web]_order_\\.*:col2.

  • Wajib untuk tabel tanpa primary key. Kolom yang dipilih harus non-null (NOT NULL). Opsional untuk tabel dengan primary key. Anda hanya dapat memilih satu kolom dari primary key.

scan.parse.online.schema.changes.enabled

Selama fase inkremental, menentukan apakah mencoba mengurai event DDL lockless RDS.

Tidak

BOOLEAN

false

Nilai yang valid:

  • true: Mengurai event DDL lockless RDS.

  • false (default): Tidak mengurai event DDL lockless RDS.

Ini adalah fitur eksperimental. Sebelum melakukan perubahan lockless online, ambil snapshot pekerjaan Flink untuk pemulihan.

Catatan

Hanya didukung di VVR 11.0 dan versi selanjutnya.

scan.incremental.snapshot.backfill.skip

Menentukan apakah melewati backfill selama fase pembacaan snapshot.

Tidak

BOOLEAN

false

Nilai yang valid:

  • true: Melewati backfill selama fase pembacaan snapshot.

  • false (default): Tidak melewati backfill selama fase pembacaan snapshot.

Jika backfill dilewati, perubahan pada tabel selama fase snapshot dibaca dalam fase inkremental berikutnya, bukan digabungkan ke dalam snapshot.

Penting

Melewati backfill dapat menyebabkan ketidakkonsistenan data karena perubahan selama fase snapshot mungkin diputar ulang. Hanya semantik at-least-once yang dijamin.

Catatan

Hanya didukung di VVR 11.1 dan versi selanjutnya.

treat-tinyint1-as-boolean.enabled

Menentukan apakah memperlakukan tipe TINYINT(1) sebagai tipe Boolean.

Tidak

BOOLEAN

true

Nilai yang valid:

  • true (default): Memperlakukan TINYINT(1) sebagai tipe Boolean.

  • false: Tidak memperlakukan TINYINT(1) sebagai tipe Boolean.

treat-timestamp-as-datetime-enabled

Menentukan apakah memproses TIMESTAMP sebagai DATETIME.

Tidak

BOOLEAN

false

Nilai yang valid:

  • true: Memproses data TIMESTAMP MySQL sebagai data DATETIME dan memetakannya ke tipe TIMESTAMP CDC.

  • false (default): Memetakan data TIMESTAMP MySQL ke tipe TIMESTAMP_LTZ CDC.

TIMESTAMP MySQL menyimpan waktu UTC dan dipengaruhi oleh zona waktu. DATETIME MySQL menyimpan waktu literal dan tidak dipengaruhi oleh zona waktu.

Mengaktifkan ini mengonversi data TIMESTAMP MySQL ke DATETIME berdasarkan server-time-zone.

include-comments.enabled

Menentukan apakah menyinkronkan komentar tabel dan kolom.

Tidak

BOOELEAN

false

Nilai yang valid:

  • true: Menyinkronkan komentar tabel dan kolom.

  • false (default): Tidak menyinkronkan komentar tabel dan kolom.

Mengaktifkan ini meningkatkan penggunaan memori pekerjaan.

scan.incremental.snapshot.unbounded-chunk-first.enabled

Menentukan apakah mendistribusikan chunk tak terbatas terlebih dahulu selama fase pembacaan snapshot.

Tidak

BOOELEAN

false

Nilai yang valid:

  • true: Mendistribusikan chunk tak terbatas terlebih dahulu selama fase pembacaan snapshot.

  • false (default): Tidak mendistribusikan chunk tak terbatas terlebih dahulu selama fase pembacaan snapshot.

Ini adalah fitur eksperimental. Mengaktifkannya mengurangi risiko error OOM saat Pengelola Tugas menyinkronkan chunk terakhir selama fase snapshot. Kami menyarankan menambahkan ini sebelum startup pertama pekerjaan.

Catatan

Hanya didukung di VVR 11.1 dan versi selanjutnya.

binlog.session.network.timeout

Timeout jaringan untuk koneksi binlog.

Tidak

DURATION

10m

Mengatur ini ke 0s menggunakan timeout default server MySQL.

Catatan

Hanya didukung di VVR 11.5 dan versi selanjutnya.

scan.rate-limit.records-per-second

Membatasi jumlah maksimum record yang dipancarkan per detik oleh sumber.

Tidak

LONG

Tidak ada

Berguna untuk membatasi pembacaan data. Batasan ini berlaku untuk fase lengkap dan inkremental.

Metrik numRecordsOutPerSecond mencerminkan jumlah record yang dipancarkan per detik di seluruh aliran data. Sesuaikan parameter ini berdasarkan metrik tersebut.

Selama pembacaan lengkap, kurangi jumlah baris per batch dengan menurunkan nilai scan.incremental.snapshot.chunk.size.

Catatan

Hanya didukung di VVR 11.5 dan versi selanjutnya.

Pemetaan tipe

Tabel berikut menunjukkan pemetaan tipe untuk data ingestion.

Tipe bidang MySQL CDC

Tipe bidang CDC

TINYINT(n)

TINYINT

SMALLINT

SMALLINT

TINYINT UNSIGNED

TINYINT UNSIGNED ZEROFILL

YEAR

INT

INT

MEDIUMINT

MEDIUMINT UNSIGNED

MEDIUMINT UNSIGNED ZEROFILL

SMALLINT UNSIGNED

SMALLINT UNSIGNED ZEROFILL

BIGINT

BIGINT

INT UNSIGNED

INT UNSIGNED ZEROFILL

BIGINT UNSIGNED

DECIMAL(20, 0)

BIGINT UNSIGNED ZEROFILL

SERIAL

FLOAT [UNSIGNED] [ZEROFILL]

FLOAT

DOUBLE [UNSIGNED] [ZEROFILL]

DOUBLE

DOUBLE PRECISION [UNSIGNED] [ZEROFILL]

REAL [UNSIGNED] [ZEROFILL]

NUMERIC(p, s) [UNSIGNED] [ZEROFILL] where p ≤ 38

DECIMAL(p, s)

DECIMAL(p, s) [UNSIGNED] [ZEROFILL] where p ≤ 38

FIXED(p, s) [UNSIGNED] [ZEROFILL] where p ≤ 38

BOOLEAN

BOOLEAN

BIT(1)

TINYINT(1)

DATE

DATE

TIME [(p)]

TIME [(p)]

DATETIME [(p)]

TIMESTAMP [(p)]

TIMESTAMP [(p)]

Pemetaan bidang bergantung pada opsi treat-timestamp-as-datetime-enabled:

true: TIMESTAMP[(p)]

false: TIMESTAMP_LTZ[(p)]

CHAR(n)

CHAR(n)

VARCHAR(n)

VARCHAR(n)

BIT(n)

BINARY(⌈(n + 7) / 8⌉)

BINARY(n)

BINARY(n)

VARBINARY(N)

VARBINARY(N)

NUMERIC(p, s) [UNSIGNED] [ZEROFILL] where 38 < p ≤ 65

STRING

Catatan

Di MySQL, presisi desimal dapat mencapai hingga 65. Flink membatasi presisi desimal hingga 38. Jika Anda mendefinisikan kolom desimal dengan presisi > 38, petakan ke string untuk menghindari kehilangan presisi.

DECIMAL(p, s) [UNSIGNED] [ZEROFILL] where 38 < p ≤ 65

FIXED(p, s) [UNSIGNED] [ZEROFILL] where 38 < p ≤ 65

TINYTEXT

STRING

TEXT

MEDIUMTEXT

LONGTEXT

ENUM

JSON

STRING

Catatan

Tipe data JSON dikonversi ke string berformat JSON di Flink.

GEOMETRY

STRING

Catatan

Tipe data spasial MySQL dikonversi ke string dengan format JSON tetap. Untuk informasi selengkapnya, lihat pemetaan tipe data spasial MySQL.

POINT

LINESTRING

POLYGON

MULTIPOINT

MULTILINESTRING

MULTIPOLYGON

GEOMETRYCOLLECTION

TINYBLOB

BYTES

Catatan

MySQL mendukung BLOB dengan panjang maksimum 2.147.483.647 (2**31-1) byte.

BLOB

MEDIUMBLOB

LONGBLOB

Contoh

  • Tabel sumber CDC

    CREATE TEMPORARY TABLE mysqlcdc_source (
       order_id INT,
       order_date TIMESTAMP(0),
       customer_name STRING,
       price DECIMAL(10, 5),
       product_id INT,
       order_status BOOLEAN,
       PRIMARY KEY(order_id) NOT ENFORCED
    ) WITH (
      'connector' = 'mysql',
      'hostname' = '<yourHostname>',
      'port' = '3306',
      'username' = '<yourUsername>',
      'password' = '<yourPassword>',
      'database-name' = '<yourDatabaseName>',
      'table-name' = '<yourTableName>'
    );
    
    CREATE TEMPORARY TABLE blackhole_sink(
      order_id INT,
      customer_name STRING
    ) WITH (
      'connector' = 'blackhole'
    );
    
    INSERT INTO blackhole_sink
    SELECT order_id, customer_name FROM mysqlcdc_source;
  • Tabel dimensi

    CREATE TEMPORARY TABLE datagen_source(
      a INT,
      b BIGINT,
      c STRING,
      `proctime` AS PROCTIME()
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE mysql_dim (
      a INT,
      b VARCHAR,
      c VARCHAR
    ) WITH (
      'connector' = 'mysql',
      'hostname' = '<yourHostname>',
      'port' = '3306',
      'username' = '<yourUsername>',
      'password' = '<yourPassword>',
      'database-name' = '<yourDatabaseName>',
      'table-name' = '<yourTableName>'
    );
    
    CREATE TEMPORARY TABLE blackhole_sink(
      a INT,
      b STRING
    ) WITH (
      'connector' = 'blackhole'
    );
    
    INSERT INTO blackhole_sink
    SELECT T.a, H.b
    FROM datagen_source AS T JOIN mysql_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H ON T.a = H.a;
  • Tabel sink

    CREATE TEMPORARY TABLE datagen_source (
      `name` VARCHAR,
      `age` INT
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE mysql_sink (
      `name` VARCHAR,
      `age` INT
    ) WITH (
      'connector' = 'mysql',
      'hostname' = '<yourHostname>',
      'port' = '3306',
      'username' = '<yourUsername>',
      'password' = '<yourPassword>',
      'database-name' = '<yourDatabaseName>',
      'table-name' = '<yourTableName>'
    );
    
    INSERT INTO mysql_sink
    SELECT * FROM datagen_source;
  • Sumber data ingestion

    source:
      type: mysql
      name: MySQL Source
      hostname: ${mysql.hostname}
      port: ${mysql.port}
      username: ${mysql.username}
      password: ${mysql.password}
      tables: ${mysql.source.table}
      server-id: 7601-7604
    
    sink:
      type: values
      name: Values Sink
      print.enabled: true
      sink.print.logger: true

Tentang tabel sumber MySQL CDC

  • Cara kerja

    Tabel sumber MySQL CDC memindai tabel lengkap saat startup dan membaginya menjadi beberapa chunk berdasarkan primary key. Sistem mencatat offset binlog saat ini lalu menggunakan algoritma snapshot inkremental untuk membaca data setiap chunk menggunakan pernyataan SELECT. Pekerjaan melakukan checkpoint berkala untuk mencatat chunk yang telah selesai. Jika terjadi failover, pekerjaan melanjutkan membaca hanya chunk yang belum selesai. Setelah semua chunk terbaca, pekerjaan membaca record perubahan inkremental dari offset binlog yang sebelumnya dicatat. Pekerjaan Flink terus melakukan checkpoint berkala untuk mencatat offset binlog. Jika terjadi failover, pekerjaan melanjutkan pemrosesan dari offset binlog terakhir yang dicatat. Proses ini mencapai semantik tepat-sekali (exactly-once semantics).

    Untuk detail lebih lanjut tentang algoritma snapshot inkremental, lihat Konektor MySQL CDC.

  • Metadata

    Metadata sangat berguna dalam skenario penggabungan database dan tabel yang di-shard. Setelah digabung, bisnis sering kali masih perlu mengidentifikasi database dan tabel sumber untuk setiap baris data. Kolom metadata memungkinkan Anda mengakses informasi ini. Oleh karena itu, Anda dapat dengan mudah menggabungkan beberapa tabel yang di-shard menjadi satu tabel tujuan menggunakan kolom metadata.

    Sumber MySQL CDC mendukung sintaks kolom metadata. Anda dapat mengakses metadata berikut melalui kolom metadata.

    Kunci metadata

    Tipe metadata

    Deskripsi

    database_name

    STRING NOT NULL

    Nama database yang berisi baris tersebut.

    table_name

    STRING NOT NULL

    Nama tabel yang berisi baris tersebut.

    op_ts

    TIMESTAMP_LTZ(3) NOT NULL

    Waktu saat baris tersebut diubah di database. Jika record berasal dari data historis yang sudah ada di tabel, bukan dari binlog, nilai ini selalu 0.

    op_type

    STRING NOT NULL

    Jenis perubahan baris tersebut.

    • +I: Pesan INSERT

    • -D: Pesan DELETE

    • -U: Pesan UPDATE_BEFORE

    • +U: Pesan UPDATE_AFTER

    Catatan

    Hanya didukung di Ververica Runtime (VVR) 8.0.7 dan versi selanjutnya.

    query_log

    STRING NOT NULL

    Baca record log kueri MySQL yang sesuai dengan baris ini.

    Catatan

    MySQL harus memiliki parameter binlog_rows_query_log_events diaktifkan untuk mencatat log kueri.

    Contoh berikut menunjukkan cara menggabungkan beberapa tabel orders dari berbagai database yang di-shard dalam instans MySQL dan menyinkronkannya ke tabel holo_orders di Hologres.

    CREATE TEMPORARY TABLE mysql_orders (
      db_name STRING METADATA FROM 'database_name' VIRTUAL,  -- Baca nama database.
      table_name STRING METADATA  FROM 'table_name' VIRTUAL, -- Baca nama tabel.
      operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL, -- Baca timestamp perubahan.
      op_type STRING METADATA FROM 'op_type' VIRTUAL, -- Baca jenis perubahan.
      order_id INT,
      order_date TIMESTAMP(0),
      customer_name STRING,
      price DECIMAL(10, 5),
      product_id INT,
      order_status BOOLEAN,
      PRIMARY KEY(order_id) NOT ENFORCED
    ) WITH (
      'connector' = 'mysql-cdc',
      'hostname' = 'localhost',
      'port' = '3306',
      'username' = 'flinkuser',
      'password' = 'flinkpw',
      'database-name' = 'mydb_.*', -- Cocokkan beberapa database yang di-shard menggunakan ekspresi reguler.
      'table-name' = 'orders_.*'   -- Cocokkan beberapa tabel yang di-shard menggunakan ekspresi reguler.
    );
    
    INSERT INTO holo_orders SELECT * FROM mysql_orders;

    Berdasarkan kode di atas, jika Anda mengatur scan.read-changelog-as-append-only.enabled ke true dalam klausa WITH, output bervariasi berdasarkan konfigurasi primary key tabel downstream:

    • Jika primary key tabel downstream adalah order_id, output hanya berisi perubahan terakhir untuk setiap primary key di tabel upstream. Misalnya, jika perubahan terakhir untuk primary key adalah operasi penghapusan, Anda akan melihat record di tabel downstream dengan primary key yang sama dan op_type -D.

    • Jika primary key tabel downstream adalah order_id, operation_ts, dan op_type, output berisi riwayat perubahan lengkap untuk setiap primary key di tabel upstream.

  • Dukungan ekspresi reguler

    Tabel sumber MySQL CDC mendukung penggunaan ekspresi reguler dalam nama tabel atau nama database untuk mencocokkan beberapa tabel atau database. Contoh berikut menunjukkan cara menentukan beberapa tabel menggunakan ekspresi reguler.

    CREATE TABLE products (
      db_name STRING METADATA FROM 'database_name' VIRTUAL,
      table_name STRING METADATA  FROM 'table_name' VIRTUAL,
      operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
      order_id INT,
      order_date TIMESTAMP(0),
      customer_name STRING,
      price DECIMAL(10, 5),
      product_id INT,
      order_status BOOLEAN,
      PRIMARY KEY(order_id) NOT ENFORCED
    ) WITH (
      'connector' = 'mysql-cdc',
      'hostname' = 'localhost',
      'port' = '3306',
      'username' = 'root',
      'password' = '123456',
      'database-name' = '(^(test).*|^(tpc).*|txc|.*[p$]|t{2})', -- Cocokkan beberapa database menggunakan ekspresi reguler.
      'table-name' = '(t[5-8]|tt)' -- Cocokkan beberapa tabel menggunakan ekspresi reguler.
    );

    Penjelasan ekspresi reguler dalam contoh di atas:

    • ^(test).* adalah contoh pencocokan awalan. Ekspresi ini mencocokkan nama database yang dimulai dengan test, seperti test1 atau test2.

    • .*[p$] adalah contoh pencocokan akhiran. Ekspresi ini mencocokkan nama database yang diakhiri dengan p, seperti cdcp atau edcp.

    • txc adalah pencocokan eksak. Ini mencocokkan nama database spesifik txc.

    Saat mencocokkan nama tabel yang memenuhi syarat penuh, MySQL CDC menggunakan nama database dan nama tabel untuk mengidentifikasi tabel secara unik. Sistem menggunakan pola database-name.table-name untuk pencocokan. Misalnya, pola (^(test).*|^(tpc).*|txc|.*[p$]|t{2}).(t[5-8]|tt) mencocokkan tabel seperti txc.tt dan test2.test5 dalam database.

    Penting

    Dalam konfigurasi pekerjaan SQL, opsi table-name dan database-name tidak mendukung penggunaan koma (,) untuk menentukan beberapa tabel atau database.

    • Untuk mencocokkan beberapa tabel atau menggunakan beberapa ekspresi reguler, Anda dapat memisahkannya dengan GARIS TEGAK (|) dan membungkusnya dalam tanda kurung. Misalnya, untuk membaca tabel user dan product, Anda dapat mengonfigurasi table-name sebagai (user|product).

    • Jika ekspresi reguler berisi koma, Anda harus menulis ulang menggunakan operator GARIS TEGAK (|). Misalnya, ekspresi reguler mytable_\d{1, 2} harus ditulis ulang menjadi yang setara (mytable_\d{1}|mytable_\d{2}) untuk menghindari penggunaan koma.

  • Kontrol konkurensi

    Konektor MySQL mendukung pembacaan data lengkap secara konkuren, yang meningkatkan efisiensi pemuatan data. Saat dikombinasikan dengan Autopilot di konsol Realtime Compute for Apache Flink, konektor secara otomatis mengurangi skala selama fase inkremental setelah pembacaan konkuren selesai. Hal ini menghemat sumber daya komputasi.

    Di Konsol pengembangan Realtime Compute, Anda dapat mengatur paralelisme pekerjaan di halaman Konfigurasi Sumber Daya baik dalam mode Dasar maupun mode Ahli. Perbedaannya sebagai berikut:

    • Paralelisme yang diatur dalam mode Dasar adalah paralelisme global untuk seluruh pekerjaan.基础模式

    • Mode Ahli memungkinkan Anda mengatur paralelisme untuk VERTEX tertentu sesuai kebutuhan.vertex并发

    Untuk informasi selengkapnya tentang konfigurasi sumber daya, lihat Konfigurasi informasi penerapan pekerjaan.

    Penting

    Terlepas dari apakah Anda menggunakan mode Dasar atau mode Ahli, rentang server-id yang dideklarasikan dalam tabel harus lebih besar dari atau sama dengan paralelisme pekerjaan. Misalnya, jika rentang server-id adalah 5404-5412, terdapat delapan ID server unik. Oleh karena itu, pekerjaan dapat memiliki maksimal delapan tugas paralel. Pekerjaan berbeda untuk instans MySQL yang sama harus memiliki rentang server-id yang tidak tumpang tindih. Setiap pekerjaan harus secara eksplisit mengonfigurasi server-id yang berbeda.

  • Autopilot Auto Scale-in

    Fase data lengkap mengakumulasi banyak data historis. Untuk meningkatkan efisiensi pembacaan, data historis biasanya dibaca secara konkuren. Namun, dalam fase binlog inkremental, satu konkurensi biasanya sudah cukup karena volume data binlog kecil dan urutan global harus dipertahankan. Autopilot secara otomatis menyeimbangkan kinerja dan sumber daya untuk memenuhi persyaratan berbeda antara fase lengkap dan inkremental.

    Autopilot memantau trafik untuk setiap tugas di Sumber CDC MySQL. Saat memasuki fase binlog, jika hanya satu tugas yang menangani pembacaan binlog sementara yang lain menganggur, Autopilot secara otomatis mengurangi jumlah CU dan paralelisme Sumber. Untuk mengaktifkan Autopilot, atur mode Autopilot ke Aktif di halaman Operasi dan Pemeliharaan pekerjaan.

    Catatan

    Interval pemicu minimum default untuk penskalaan turun paralelisme adalah 24 jam. Untuk informasi selengkapnya tentang parameter dan detail Autopilot, lihat Konfigurasi Autopilot.

  • Startup Mode

    Anda dapat menggunakan opsi scan.startup.mode untuk menentukan mode startup untuk tabel sumber MySQL CDC. Nilai yang valid dijelaskan sebagai berikut:

    • initial (default): Melakukan pembacaan lengkap tabel database saat startup pertama kali, lalu beralih ke mode inkremental untuk membaca binlog.

    • earliest-offset: Melewati fase snapshot dan mulai membaca dari offset binlog paling awal yang tersedia.

    • latest-offset: Melewati fase snapshot dan mulai membaca dari akhir binlog. Dalam mode ini, tabel sumber hanya membaca perubahan yang terjadi setelah pekerjaan dimulai.

    • specific-offset: Melewati fase snapshot dan mulai membaca dari offset binlog tertentu. Anda dapat menentukan offset berdasarkan nama file binlog dan posisi atau berdasarkan set GTID.

    • timestamp: Melewati fase snapshot dan mulai membaca event binlog dari timestamp tertentu.

    Contoh:

    CREATE TABLE mysql_source (...) WITH (
        'connector' = 'mysql-cdc',
        'scan.startup.mode' = 'earliest-offset', -- Mulai dari offset paling awal.
        'scan.startup.mode' = 'latest-offset', -- Mulai dari offset terbaru.
        'scan.startup.mode' = 'specific-offset', -- Mulai dari offset tertentu.
        'scan.startup.mode' = 'timestamp', -- Mulai dari timestamp tertentu.
        'scan.startup.specific-offset.file' = 'mysql-bin.000003', -- Tentukan nama file binlog untuk mode specific-offset.
        'scan.startup.specific-offset.pos' = '4', -- Tentukan posisi binlog untuk mode specific-offset.
        'scan.startup.specific-offset.gtid-set' = '24DA167-0C0C-11E8-8442-00059A3C7B00:1-19', -- Tentukan set GTID untuk mode specific-offset.
        'scan.startup.timestamp-millis' = '1667232000000' -- Tentukan timestamp startup untuk mode timestamp.
        ...
    )
    Penting
    • Sumber MySQL mencatat posisi saat ini pada level INFO selama checkpoint. Awalan log adalah Binlog offset on checkpoint {checkpoint-id}. Log ini membantu Anda memulai ulang pekerjaan dari posisi checkpoint tertentu.

    • Jika skema tabel yang sedang dibaca telah berubah di masa lalu, memulai dari earliest-offset, specific-offset, atau timestamp dapat menyebabkan error. Hal ini karena pembaca Debezium secara internal menyimpan skema terbaru, dan data lama dengan skema yang tidak cocok tidak dapat diurai dengan benar.

  • Tabel sumber CDC tanpa kunci

    • Untuk menggunakan tabel tanpa kunci, Anda harus mengatur scan.incremental.snapshot.chunk.key-column dan memilih hanya bidang non-null.

    • Semantik pemrosesan tabel sumber CDC tanpa kunci bergantung pada perilaku kolom yang ditentukan dalam scan.incremental.snapshot.chunk.key-column:

      • Jika kolom yang ditentukan tidak pernah diperbarui, semantik tepat-sekali dijamin.

      • Jika kolom yang ditentukan diperbarui, hanya semantik at-least-once yang dijamin. Namun, Anda dapat memastikan kebenaran data dengan menggabungkannya dengan sistem downstream, menentukan primary key downstream, dan menggunakan operasi idempoten.

  • Baca log cadangan dari RDS MySQL

    Tabel sumber MySQL CDC mendukung pembacaan log cadangan dari Alibaba Cloud RDS MySQL. Fitur ini sangat berguna ketika fase snapshot lengkap memakan waktu lama. Dalam kasus ini, file binlog lokal mungkin secara otomatis dibersihkan, sedangkan file cadangan yang diunggah secara manual atau otomatis masih ada.

    Contoh:

    CREATE TABLE mysql_source (...) WITH (
        'connector' = 'mysql-cdc',
        'rds.region-id' = 'cn-beijing',
        'rds.access-key-id' = 'xxxxxxxxx', 
        'rds.access-key-secret' = 'xxxxxxxxx', 
        'rds.db-instance-id' = 'rm-xxxxxxxxxxxxxxxxx', 
        'rds.main-db-id' = '12345678',
        'rds.download.timeout' = '60s'
        ...
    )
  • Aktifkan penggunaan ulang sumber CDC

    Satu pekerjaan dengan beberapa tabel sumber MySQL CDC meluncurkan beberapa klien binlog. Jika semua tabel sumber membaca dari instans MySQL yang sama, praktik ini meningkatkan tekanan pada database. Untuk informasi selengkapnya, lihat FAQ MySQL CDC.

    Solusi

    VVR 8.0.7 dan versi selanjutnya mendukung penggunaan ulang sumber MySQL CDC. Penggunaan ulang menggabungkan tabel sumber MySQL CDC yang kompatibel. Penggabungan terjadi ketika tabel sumber memiliki konfigurasi identik kecuali nama database, nama tabel, dan server-id. Mesin secara otomatis menggabungkan sumber MySQL CDC dalam pekerjaan yang sama.

    Prosedur

    1. Anda dapat menggunakan perintah SET dalam pekerjaan SQL:

      SET 'table.optimizer.source-merge.enabled' = 'true';
      
      # (Versi VVR 8.0.8 dan 8.0.9) Atur juga ini:
      SET 'sql-gateway.exec-plan.enabled' = 'false';
      VVR 11.1 dan versi selanjutnya mengaktifkan penggunaan ulang secara default.
    2. Jalankan pekerjaan tanpa status. Memodifikasi konfigurasi penggunaan ulang mengubah topologi pekerjaan. Anda harus menjalankan pekerjaan tanpa status. Jika tidak, pekerjaan mungkin gagal dijalankan atau kehilangan data. Jika sumber digabung, Anda dapat melihat node MergetableSourceScan dalam topologi pekerjaan.

    Penting
    • Setelah Anda mengaktifkan penggunaan ulang, jangan atur pipeline.operator-chaining ke false. Menonaktifkan operator chaining menambahkan overhead serialisasi dan deserialisasi. Semakin banyak sumber yang digabung, semakin besar overhead-nya.

    • Di VVR 8.0.7, menonaktifkan operator chaining menyebabkan masalah serialisasi.

Percepat pembacaan binlog

Saat Anda menggunakan konektor MySQL sebagai tabel sumber atau sumber data ingestion, sistem mengurai file binlog untuk menghasilkan berbagai pesan perubahan selama fase inkremental. File binlog mencatat semua perubahan tabel dalam format biner. Anda dapat mempercepat penguraian file binlog dengan cara berikut:

  • Aktifkan konfigurasi filter penguraian

    • Gunakan opsi scan.only.deserialize.captured.tables.changelog.enabled untuk mengurai event perubahan hanya untuk tabel yang ditentukan.

  • Optimalkan opsi Debezium

    debezium.max.queue.size: 162580
    debezium.max.batch.size: 40960
    debezium.poll.interval.ms: 50
    • debezium.max.queue.size: Jumlah maksimum record yang dapat ditampung oleh antrian pemblokiran. Saat Debezium membaca aliran event dari database, sistem menempatkan event dalam antrian pemblokiran sebelum menuliskannya ke downstream. Nilai default adalah 8192.

    • debezium.max.batch.size: Jumlah maksimum event yang diproses per iterasi. Nilai default adalah 2048.

    • debezium.poll.interval.ms: Jumlah milidetik yang ditunggu konektor sebelum meminta event perubahan baru. Nilai default adalah 1000 ms (1 detik).

Contoh:

CREATE TABLE mysql_source (...) WITH (
    'connector' = 'mysql-cdc',
    -- Konfigurasi Debezium
    'debezium.max.queue.size' = '162580',
    'debezium.max.batch.size' = '40960',
    'debezium.poll.interval.ms' = '50',
    -- Aktifkan filter penguraian
    'scan.only.deserialize.captured.tables.changelog.enabled' = 'true',  -- Urai event perubahan hanya untuk tabel yang ditentukan.
    ...
)
source:
  type: mysql
  name: MySQL Source
  hostname: ${mysql.hostname}
  port: ${mysql.port}
  username: ${mysql.username}
  password: ${mysql.password}
  tables: ${mysql.source.table}
  server-id: 7601-7604
  # Konfigurasi Debezium
  debezium.max.queue.size: 162580
  debezium.max.batch.size: 40960
  debezium.poll.interval.ms: 50
  # Aktifkan filter penguraian
  scan.only.deserialize.captured.tables.changelog.enabled: true

Versi enterprise MySQL CDC mengonsumsi binlog dengan kecepatan 85 MB/detik, sekitar dua kali lipat kecepatan versi komunitas open-source. Jika laju pembuatan binlog melebihi 85 MB/detik (setara dengan satu file 512 MB setiap 6 detik), latensi pekerjaan Flink terus meningkat. Latensi secara bertahap berkurang setelah laju pembuatan binlog melambat. Saat file binlog berisi transaksi besar, latensi pemrosesan mungkin sementara meningkat lalu berkurang setelah log transaksi terbaca.

API DataStream MySQL CDC

Penting

Untuk membaca dan menulis data menggunakan API DataStream, Anda harus menggunakan konektor DataStream yang sesuai untuk terhubung ke Flink. Untuk petunjuk cara menyiapkan konektor DataStream, lihat Penggunaan konektor DataStream.

Contoh berikut menunjukkan cara membuat program API DataStream dan menggunakan MySqlSource, termasuk dependensi pom yang diperlukan.

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
public class MySqlSourceExample {
  public static void main(String[] args) throws Exception {
    MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
        .hostname("yourHostname")
        .port(yourPort)
        .databaseList("yourDatabaseName") // Atur database yang ditangkap.
        .tableList("yourDatabaseName.yourTableName") // Atur tabel yang ditangkap.
        .username("yourUsername")
        .password("yourPassword")
        .deserializer(new JsonDebeziumDeserializationSchema()) // Mengonversi SourceRecord ke String JSON.
        .build();
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    // Aktifkan checkpointing.
    env.enableCheckpointing(3000);
    env
      .fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
      // Atur 4 tugas sumber paralel.
      .setParallelism(4)
      .print().setParallelism(1); // Gunakan paralelisme 1 untuk sink guna mempertahankan urutan pesan.
    env.execute("Print MySQL Snapshot + Binlog");
  }
}
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-core</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-base</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-common</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-java-bridge</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>ververica-connector-mysql</artifactId>
    <version>${vvr.version}</version>
</dependency>

Saat membangun MySqlSource, Anda harus menentukan parameter berikut dalam kode Anda:

Parameter

Deskripsi

hostname

Alamat IP atau hostname database MySQL.

port

Nomor port layanan database MySQL.

databaseList

Nama database MySQL.

Catatan

Nama database mendukung ekspresi reguler untuk membaca data dari beberapa database. Gunakan .* untuk mencocokkan semua database.

username

Username untuk layanan database MySQL.

password

Password untuk layanan database MySQL.

deserializer

Deserializer yang mengonversi objek SourceRecord ke tipe tertentu. Nilai yang valid:

  • RowDataDebeziumDeserializeSchema: Mengonversi SourceRecord ke RowData, struktur data internal Flink Table atau SQL.

  • JsonDebeziumDeserializationSchema: Mengonversi SourceRecord ke string berformat JSON.

Dependensi pom Anda harus menentukan parameter berikut:

${vvr.version}

Versi mesin Alibaba Cloud Realtime Compute for Apache Flink—misalnya, 1.17-vvr-8.0.4-3.

Catatan

Gunakan nomor versi yang ditampilkan di Maven. Kami secara berkala merilis versi hotfix, dan pembaruan ini mungkin tidak diumumkan melalui saluran lain.

${flink.version}

Versi Apache Flink—misalnya, 1.17.2.

Penting

Gunakan versi Apache Flink yang sesuai dengan versi mesin Realtime Compute for Apache Flink Anda untuk menghindari masalah kompatibilitas selama runtime pekerjaan. Untuk detail pemetaan versi, lihat Mesin.

FAQ

Untuk informasi tentang masalah yang mungkin Anda temui saat menggunakan tabel sumber CDC, lihat Masalah CDC.