全部产品
Search
文档中心

Realtime Compute for Apache Flink:MySQL

更新时间:Jan 10, 2026

Topik ini menjelaskan cara menggunakan konektor MySQL.

Informasi latar belakang

Konektor MySQL mendukung semua database yang kompatibel dengan protokol MySQL, termasuk ApsaraDB RDS for MySQL, PolarDB for MySQL, OceanBase (mode MySQL), dan self-managed MySQL.

Penting

Saat Anda menggunakan konektor MySQL untuk membaca dari OceanBase, pastikan binary logging (binlog) untuk OceanBase diaktifkan dan dikonfigurasi dengan benar. Untuk informasi selengkapnya, lihat Operasi terkait. Fitur ini berada dalam pratinjau publik. Kami menyarankan agar Anda mengevaluasinya secara menyeluruh dan menggunakannya dengan hati-hati.

Konektor MySQL mendukung hal-hal berikut.

Kategori

Rincian

Jenis yang didukung

Tabel sumber, tabel dimensi, tabel sink, dan sumber ingesti data

Mode runtime

Hanya mode streaming

Format data

Tidak berlaku

Metrik pemantauan spesifik

Metrik pemantauan

  • Tabel sumber

    • currentFetchEventTimeLag: Interval dari saat data dihasilkan hingga ditarik oleh Operator Sumber.

      Metrik ini hanya berlaku pada fase binlog. Nilainya 0 selama fase snapshot.

    • currentEmitEventTimeLag: Interval dari saat data dihasilkan hingga meninggalkan Operator Sumber.

      Metrik ini hanya berlaku pada fase binlog. Nilainya 0 selama fase snapshot.

    • sourceIdleTime: Durasi tabel sumber tidak menghasilkan data baru.

  • Tabel dimensi dan tabel sink: Tidak ada.

Catatan

Untuk informasi selengkapnya tentang metrik, lihat Metrik.

Jenis API

DataStream, SQL, dan YAML ingesti data

Apakah saya dapat memperbarui atau menghapus data di tabel sink?

Ya

Fitur

Tabel sumber MySQL Change Data Capture (CDC) adalah tabel sumber streaming yang pertama-tama membaca seluruh data historis dari database, lalu secara mulus beralih ke pembacaan binlog untuk memastikan data tidak dibaca lebih dari sekali atau terlewat. Bahkan jika terjadi kegagalan, data diproses dengan semantik tepat-sekali. Tabel sumber CDC MySQL mendukung pembacaan konkuren data lengkap dan menggunakan algoritma snapshot inkremental untuk mencapai pembacaan tanpa penguncian serta transfer data yang dapat dilanjutkan. Untuk informasi selengkapnya, lihat Tentang tabel sumber CDC MySQL.

  • Pemrosesan batch dan stream terpadu: Membaca data lengkap dan inkremental, menghilangkan kebutuhan akan pipeline terpisah.

  • Pembacaan data lengkap secara konkuren: Meningkatkan kinerja secara horizontal.

  • Peralihan mulus dari pembacaan lengkap ke inkremental: Secara otomatis mengurangi penggunaan sumber daya komputasi.

  • Transfer data yang dapat dilanjutkan: Mendukung pemulihan transfer data selama fase pembacaan data lengkap guna meningkatkan stabilitas.

  • Pembacaan tanpa penguncian: Membaca data lengkap tanpa memengaruhi operasi bisnis online.

  • Mendukung pembacaan log cadangan dari ApsaraDB RDS for MySQL.

  • Penguraian file binlog secara paralel mengurangi latensi baca.

Prasyarat

Anda harus mengonfigurasi database MySQL seperti yang dijelaskan dalam Konfigurasi MySQL sebelum dapat menggunakan tabel sumber CDC MySQL.

ApsaraDB RDS for MySQL

  • Lakukan probe jaringan dengan Realtime Compute for Apache Flink untuk memastikan konektivitas jaringan.

  • Versi MySQL: 5.6, 5.7, dan 8.0.x.

  • Aktifkan binlog. Ini diaktifkan secara default.

  • Atur format binlog ke ROW. Ini adalah format default.

  • Atur binlog_row_image ke FULL. Ini adalah pengaturan default.

  • Nonaktifkan Binary Log Transaction Compression. Fitur ini diperkenalkan di MySQL 8.0.20 dan dinonaktifkan secara default.

  • Pengguna MySQL telah dibuat dengan izin SELECT, SHOW DATABASES, REPLICATION SLAVE, dan REPLICATION CLIENT.

  • Buat database dan tabel MySQL. Untuk informasi selengkapnya, lihat Buat database dan akun untuk ApsaraDB RDS for MySQL. Gunakan akun istimewa untuk membuat database MySQL guna mencegah kegagalan operasional akibat izin yang tidak mencukupi.

  • Konfigurasikan daftar putih alamat IP. Untuk informasi selengkapnya, lihat Konfigurasi daftar putih untuk ApsaraDB RDS for MySQL.

PolarDB for MySQL

  • Lakukan probe jaringan dengan Realtime Compute for Apache Flink untuk memastikan konektivitas jaringan.

  • Versi MySQL: 5.6, 5.7, dan 8.0.x.

  • Aktifkan binlog. Ini dinonaktifkan secara default.

  • Atur format binlog ke ROW. Ini adalah format default.

  • Atur binlog_row_image ke FULL. Ini adalah pengaturan default.

  • Nonaktifkan Binary Log Transaction Compression. Fitur ini diperkenalkan di MySQL 8.0.20 dan dinonaktifkan secara default.

  • Pengguna MySQL telah dibuat dengan izin SELECT, SHOW DATABASES, REPLICATION SLAVE, dan REPLICATION CLIENT.

  • Buat database dan tabel MySQL. Untuk informasi selengkapnya, lihat Buat database dan akun untuk PolarDB for MySQL. Gunakan akun istimewa untuk membuat database MySQL guna mencegah kegagalan operasional akibat izin yang tidak mencukupi.

  • Konfigurasikan daftar putih alamat IP. Untuk informasi selengkapnya, lihat Konfigurasi daftar putih untuk kluster PolarDB for MySQL.

Self-managed MySQL

  • Lakukan probe jaringan dengan Realtime Compute for Apache Flink untuk memastikan konektivitas jaringan.

  • Versi MySQL: 5.6, 5.7, dan 8.0.x.

  • Aktifkan binlog. Ini dinonaktifkan secara default.

  • Atur format binlog ke ROW. Format default adalah STATEMENT.

  • Atur binlog_row_image ke FULL. Ini adalah pengaturan default.

  • Nonaktifkan Binary Log Transaction Compression. Fitur ini diperkenalkan di MySQL 8.0.20 dan dinonaktifkan secara default.

  • Pengguna MySQL telah dibuat dengan izin SELECT, SHOW DATABASES, REPLICATION SLAVE, dan REPLICATION CLIENT.

  • Buat database dan tabel MySQL. Untuk informasi selengkapnya, lihat Buat database dan akun untuk instans MySQL yang dikelola sendiri. Gunakan akun istimewa untuk membuat database MySQL guna mencegah kegagalan operasional akibat izin yang tidak mencukupi.

  • Konfigurasikan daftar putih alamat IP. Untuk informasi selengkapnya, lihat Konfigurasi daftar putih untuk instans MySQL yang dikelola sendiri.

Batasan

Batasan umum

  • Tabel sumber CDC MySQL tidak mendukung pendefinisian watermark.

  • Pada pekerjaan CREATE TABLE AS SELECT (CTAS) dan CREATE DATABASE AS SELECT (CDAS), tabel sumber CDC MySQL mendukung sinkronisasi perubahan skema parsial. Untuk informasi tentang jenis perubahan yang didukung, lihat Kebijakan sinkronisasi evolusi skema.

  • Konektor CDC MySQL tidak mendukung Binary Log Transaction Compression. Oleh karena itu, saat menggunakan konektor CDC MySQL untuk mengonsumsi data inkremental, pastikan fitur ini dinonaktifkan. Jika tidak, pengambilan data inkremental mungkin gagal.

Batasan untuk ApsaraDB RDS for MySQL

  • Untuk ApsaraDB RDS for MySQL, kami tidak merekomendasikan membaca data dari database secondary atau replika read-only. Periode retensi binlog untuk instans ini singkat secara default. Jika binlog kedaluwarsa dan dihapus, pekerjaan mungkin gagal mengonsumsi data binlog, menyebabkan error.

  • Secara default, ApsaraDB RDS for MySQL mengaktifkan replikasi paralel antara database primary dan secondary serta tidak menjamin konsistensi urutan transaksi. Hal ini dapat menyebabkan kehilangan data selama alih bencana primary-secondary dan pemulihan checkpoint. Anda dapat mengaktifkan opsi slave_preserve_commit_order secara manual di ApsaraDB RDS for MySQL untuk mengatasi masalah ini.

Batasan untuk PolarDB for MySQL

Tabel sumber CDC MySQL tidak mendukung pembacaan dari Kluster Multi-master PolarDB for MySQL versi 1.0.19 atau lebih lama. Untuk informasi selengkapnya, lihat Apa itu Kluster Multi-master?. Binlog yang dihasilkan oleh kluster ini mungkin berisi ID tabel duplikat, yang dapat menyebabkan kesalahan pemetaan skema pada tabel sumber CDC, sehingga mengakibatkan kesalahan saat parsing data binlog.

Batasan untuk MySQL open source

Secara default, MySQL mempertahankan urutan transaksi selama replikasi Binlog primary-replika. Jika replika MySQL mengaktifkan replikasi paralel (slave_parallel_workers > 1) tetapi tidak memiliki slave_preserve_commit_order = ON, urutan commit transaksinya mungkin tidak konsisten dengan database primary. Saat Flink CDC pulih dari checkpoint, data mungkin terlewat akibat ketidakkonsistenan urutan ini. Kami menyarankan agar Anda mengatur slave_preserve_commit_order = ON pada replika MySQL, atau mengatur slave_parallel_workers = 1. Opsi terakhir ini mungkin mengorbankan performa replikasi.

Catatan

  • Konfigurasikan eksplisit Server ID yang berbeda untuk setiap sumber data CDC MySQL

    Tujuan Server ID

    Anda harus mengonfigurasi eksplisit Server ID yang berbeda untuk setiap sumber data CDC MySQL. Jika beberapa sumber data CDC MySQL berbagi Server ID yang sama dan tidak dapat digunakan ulang, hal ini dapat menyebabkan offset binlog salah, sehingga data dibaca berulang kali atau dilewati.

    Konfigurasi Server ID untuk skenario berbeda

    Anda dapat menentukan Server ID dalam pernyataan DDL, tetapi kami merekomendasikan agar Anda mengonfigurasinya menggunakan dynamic hints sebagai gantinya.

    • Paralelisme = 1 atau snapshot inkremental dinonaktifkan

      ## Saat framework snapshot inkremental tidak diaktifkan atau paralelisme adalah 1, Anda dapat menentukan Server ID tertentu.
      SELECT * FROM source_table /*+ OPTIONS('server-id'='123456') */ ;
    • Paralelisme > 1 dan snapshot inkremental diaktifkan

      ## Anda harus menentukan rentang Server ID. Jumlah Server ID yang tersedia dalam rentang tersebut harus lebih besar dari atau sama dengan paralelisme. Misalnya, paralelisme adalah 3.
      SELECT * FROM source_table /*+ OPTIONS('server-id'='123456-123458') */ ;
    • Sinkronisasi data menggunakan CTAS

      Saat Anda menggunakan CTAS untuk sinkronisasi data, jika sumber data CDC memiliki konfigurasi yang sama, sumber tersebut akan digunakan ulang secara otomatis. Dalam kasus ini, Anda dapat mengonfigurasi Server ID yang sama untuk beberapa sumber data CDC. Untuk informasi selengkapnya, lihat Contoh 4: Beberapa pernyataan CTAS.

    • Beberapa tabel sumber non-CTAS yang tidak dapat digunakan ulang

      Jika pekerjaan berisi beberapa tabel sumber CDC MySQL dan tidak menggunakan pernyataan CTAS untuk sinkronisasi, sumber data tidak dapat digunakan ulang. Anda harus memberikan Server ID yang berbeda untuk setiap tabel sumber CDC. Demikian pula, jika framework snapshot inkremental diaktifkan dan paralelisme lebih besar dari 1, Anda harus menentukan rentang Server ID.

      select * from 
        source_table1 /*+ OPTIONS('server-id'='123456-123457') */
      left join 
        source_table2 /*+ OPTIONS('server-id'='123458-123459') */
      on source_table1.id=source_table2.id;
  • Tabel sink

    • Jangan deklarasikan primary key auto-increment dalam pernyataan DDL. MySQL akan mengisi bidang ini secara otomatis saat menulis data.

    • Deklarasikan setidaknya satu bidang non-primary key. Jika tidak, error akan dilaporkan.

    • NOT ENFORCED dalam pernyataan DDL menunjukkan bahwa Flink tidak melakukan pemeriksaan validitas pada primary key. Anda harus memastikan kebenaran dan integritas primary key. Untuk informasi selengkapnya, lihat Pemeriksaan Validitas.

  • Tabel dimensi

    Untuk menggunakan indeks guna mempercepat kueri, urutan bidang dalam klausa JOIN harus sesuai dengan urutan definisi indeks. Ini dikenal sebagai aturan prefiks paling kiri (leftmost prefix rule). Misalnya, jika indeks berada pada (a, b, c), kondisi JOIN harus berupa ON t.a = x AND t.b = y.

    SQL yang dihasilkan Flink mungkin ditulis ulang oleh pengoptimal, yang dapat mencegah penggunaan indeks selama kueri database aktual. Untuk memverifikasi apakah indeks digunakan, periksa rencana eksekusi (EXPLAIN) atau log kueri lambat di MySQL untuk melihat pernyataan SELECT aktual yang dieksekusi.

SQL

Anda dapat menggunakan konektor MySQL dalam pekerjaan SQL sebagai tabel sumber, tabel dimensi, atau tabel sink.

Sintaks

CREATE TEMPORARY TABLE mysqlcdc_source (
   order_id INT,
   order_date TIMESTAMP(0),
   customer_name STRING,
   price DECIMAL(10, 5),
   product_id INT,
   order_status BOOLEAN,
   PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
  'connector' = 'mysql',
  'hostname' = '<yourHostname>',
  'port' = '3306',
  'username' = '<yourUsername>',
  'password' = '<yourPassword>',
  'database-name' = '<yourDatabaseName>',
  'table-name' = '<yourTableName>'
);

Catatan
  • Cara konektor menulis ke tabel sink: Untuk setiap catatan yang diterima, konektor membuat dan mengeksekusi satu pernyataan SQL. Pernyataan SQL spesifik tergantung pada skema tabel:

    • Untuk tabel sink tanpa primary key, konektor mengeksekusi pernyataan INSERT INTO table_name (column1, column2, ...) VALUES (value1, value2, ...);.

    • Untuk tabel sink dengan primary key, konektor mengeksekusi pernyataan INSERT INTO table_name (column1, column2, ...) VALUES (value1, value2, ...) ON DUPLICATE KEY UPDATE column1 = VALUES(column1), column2 = VALUES(column2), ...;. Catatan: Jika tabel fisik memiliki batasan indeks unik selain primary key, memasukkan dua catatan dengan primary key berbeda tetapi nilai indeks unik yang sama dapat menyebabkan konflik indeks unik di database downstream, mengakibatkan penimpaan data dan potensi kehilangan data.

  • Jika Anda mendefinisikan primary key auto-increment di database MySQL, jangan deklarasikan kolom auto-increment dalam pernyataan DDL Flink. Database akan mengisi bidang ini secara otomatis selama penyisipan data. Konektor hanya mendukung penulisan dan penghapusan data dengan kolom auto-increment dan tidak mendukung pembaruan.

Parameter WITH

  • Umum

    Parameter

    Deskripsi

    Wajib

    Tipe data

    Nilai default

    Keterangan

    connector

    Jenis tabel.

    Ya

    STRING

    None

    Saat digunakan sebagai tabel sumber, atur opsi ini ke mysql-cdc atau mysql. Keduanya setara. Saat digunakan sebagai tabel dimensi atau sink, atur opsi ini ke mysql.

    hostname

    Alamat IP atau hostname database MySQL.

    Ya

    STRING

    None

    Kami menyarankan memasukkan alamat VPC.

    Catatan

    Jika database MySQL dan Realtime Compute for Apache Flink tidak berada dalam VPC yang sama, Anda harus membuat koneksi jaringan cross-VPC atau menggunakan jaringan publik untuk akses. Untuk informasi selengkapnya, lihat Kelola dan operasikan ruang kerja dan Bagaimana kluster Flink yang sepenuhnya dikelola mengakses jaringan publik?.

    username

    Username untuk layanan database MySQL.

    Ya

    STRING

    None

    None.

    password

    Password untuk layanan database MySQL.

    Ya

    STRING

    None

    None.

    database-name

    Nama database MySQL.

    Ya

    STRING

    None

    • Saat Anda menggunakan database sebagai tabel sumber, Anda dapat menggunakan ekspresi reguler untuk nama database guna membaca data dari beberapa database.

    • Saat menggunakan ekspresi reguler, hindari penggunaan simbol ^ dan $ untuk mencocokkan awal dan akhir string. Untuk informasi selengkapnya, lihat kolom Keterangan untuk opsi table-name.

    table-name

    Nama tabel MySQL.

    Ya

    STRING

    None

    • Anda dapat menggunakan ekspresi reguler untuk nama tabel sumber guna membaca data dari beberapa tabel.

      Saat Anda membaca data dari beberapa tabel MySQL, kirimkan beberapa pernyataan CTAS sebagai satu Pekerjaan. Ini menghindari pengaktifan beberapa Pendengar log biner dan meningkatkan kinerja dan efisiensi. Untuk informasi selengkapnya, lihat Beberapa pernyataan CTAS: Kirim sebagai satu Pekerjaan.

    • Saat menggunakan ekspresi reguler, hindari penggunaan simbol ^ dan $ untuk mencocokkan awal dan akhir string. Untuk informasi selengkapnya, lihat catatan berikut.

    Catatan

    Saat tabel sumber CDC MySQL mencocokkan nama tabel, sistem menggabungkan database-name dan table-name yang Anda tentukan menjadi ekspresi reguler jalur lengkap menggunakan string \\. (karakter . digunakan dalam versi VVR sebelum 8.0.1). Sistem kemudian menggunakan ekspresi reguler ini untuk mencocokkan nama tabel lengkap di database MySQL.

    Misalnya, jika Anda mengatur 'database-name'='db_.*' dan 'table-name'='tb_.+', konektor menggunakan ekspresi reguler db_.*\\.tb_.+ (db_.*.tb_.+ dalam versi sebelum 8.0.1) untuk mencocokkan nama tabel lengkap guna menentukan tabel mana yang akan dibaca.

    port

    Nomor port layanan database MySQL.

    Tidak

    INTEGER

    3306

    None.

  • Khusus sumber

    Parameter

    Deskripsi

    Wajib

    Tipe data

    Nilai default

    Keterangan

    server-id

    ID numerik untuk klien database.

    Tidak

    STRING

    Nilai acak antara 5400 dan 6400 dihasilkan.

    ID ini harus unik secara global dalam kluster MySQL. Kami merekomendasikan agar Anda menetapkan ID berbeda untuk setiap pekerjaan yang terhubung ke database yang sama.

    Opsi ini juga mendukung rentang ID, seperti 5400-5408. Saat pembacaan inkremental diaktifkan, pembacaan konkuren didukung. Dalam hal ini, kami merekomendasikan agar Anda menetapkan rentang ID sehingga setiap pembaca konkuren menggunakan ID berbeda. Untuk informasi selengkapnya, lihat Penggunaan Server ID.

    scan.incremental.snapshot.enabled

    Menentukan apakah snapshot inkremental diaktifkan.

    Tidak

    BOOLEAN

    true

    Snapshot inkremental diaktifkan secara default. Snapshot inkremental adalah mekanisme baru untuk membaca snapshot data lengkap. Dibandingkan dengan metode pembacaan snapshot lama, snapshot inkremental menawarkan beberapa keunggulan:

    • Sumber dapat membaca data lengkap secara paralel.

    • Sumber mendukung checkpoint tingkat chunk saat membaca data lengkap.

    • Sumber tidak perlu mengambil kunci baca global (FLUSH TABLES WITH read lock) saat membaca data lengkap.

    Jika Anda ingin sumber mendukung pembacaan konkuren, setiap pembaca konkuren memerlukan server ID unik. Oleh karena itu, server-id harus berupa rentang, seperti 5400-6400, dan rentang tersebut harus lebih besar dari atau sama dengan tingkat paralelisme.

    Catatan

    Item konfigurasi ini dihapus di Ververica Runtime (VVR) 11.1 dan versi selanjutnya.

    scan.incremental.snapshot.chunk.size

    Ukuran setiap chunk dalam jumlah baris.

    Tidak

    INTEGER

    8096

    Saat pembacaan snapshot inkremental diaktifkan, tabel dibagi menjadi beberapa chunk untuk dibaca. Data chunk disimpan dalam memori sebelum sepenuhnya dibaca.

    Semakin sedikit baris dalam chunk, semakin banyak jumlah chunk total dalam tabel. Meskipun ini mengurangi granularitas pemulihan kesalahan, hal ini dapat menyebabkan masalah kehabisan memori (OOM) dan mengurangi throughput keseluruhan. Oleh karena itu, Anda perlu mempertimbangkan trade-off dan menetapkan ukuran chunk yang wajar.

    scan.snapshot.fetch.size

    Jumlah maksimum catatan yang diambil sekaligus saat membaca data lengkap tabel.

    Tidak

    INTEGER

    1024

    None.

    scan.startup.mode

    Mode startup untuk konsumsi data.

    Tidak

    STRING

    initial

    Nilai yang valid:

    • initial (Default): Memindai data historis lengkap terlebih dahulu, lalu membaca data binlog terbaru saat startup pertama kali.

    • latest-offset: Tidak memindai data historis saat startup pertama kali. Mulai membaca dari akhir binlog, artinya hanya membaca perubahan terbaru setelah konektor dimulai.

    • earliest-offset: Tidak memindai data historis. Mulai membaca dari posisi binlog paling awal yang tersedia.

    • specific-offset: Tidak memindai data historis. Mulai dari offset binlog tertentu yang Anda tentukan. Anda dapat menentukan offset dengan mengonfigurasi scan.startup.specific-offset.file dan scan.startup.specific-offset.pos, atau hanya mengonfigurasi scan.startup.specific-offset.gtid-set untuk memulai dari set GTID tertentu.

    • timestamp: Tidak memindai data historis. Mulai membaca binlog dari timestamp tertentu. Timestamp ditentukan oleh scan.startup.timestamp-millis dalam milidetik.

    Penting

    Saat Anda menggunakan mode startup earliest-offset, specific-offset, atau timestamp, pastikan skema tabel yang sesuai tidak berubah antara posisi konsumsi binlog yang ditentukan dan waktu startup pekerjaan untuk menghindari error akibat ketidakcocokan skema.

    scan.startup.specific-offset.file

    Nama file binlog dari offset awal saat menggunakan mode startup offset tertentu.

    Tidak

    STRING

    None

    Saat menggunakan konfigurasi ini, Anda harus mengatur scan.startup.mode ke specific-offset. Format nama file contohnya mysql-bin.000003.

    scan.startup.specific-offset.pos

    Offset dalam file binlog tertentu untuk memulai saat menggunakan mode startup offset tertentu.

    Tidak

    INTEGER

    None

    Saat menggunakan konfigurasi ini, Anda harus mengatur scan.startup.mode ke specific-offset.

    scan.startup.specific-offset.gtid-set

    Set GTID dari offset awal saat menggunakan mode startup offset tertentu.

    Tidak

    STRING

    None

    Saat menggunakan konfigurasi ini, Anda harus mengatur scan.startup.mode ke specific-offset. Format set GTID contohnya 24DA167-0C0C-11E8-8442-00059A3C7B00:1-19.

    scan.startup.timestamp-millis

    Offset awal sebagai timestamp milidetik saat menggunakan mode startup timestamp.

    Tidak

    LONG

    None

    Saat menggunakan konfigurasi ini, Anda harus mengatur scan.startup.mode ke timestamp. Timestamp dalam milidetik.

    Penting

    Saat Anda menentukan waktu, CDC MySQL mencoba membaca event awal setiap file binlog untuk menentukan timestamp-nya dan menemukan file binlog yang sesuai dengan waktu yang ditentukan. Pastikan file binlog untuk timestamp yang ditentukan belum dihapus dari database dan dapat dibaca.

    server-time-zone

    Zona waktu sesi yang digunakan oleh database.

    Tidak

    STRING

    Jika Anda tidak menentukan opsi ini, sistem menggunakan zona waktu lingkungan runtime pekerjaan Flink sebagai zona waktu server database. Ini adalah zona waktu wilayah yang Anda pilih.

    Contoh: Asia/Shanghai. Opsi ini mengontrol bagaimana tipe TIMESTAMP di MySQL dikonversi ke tipe STRING. Untuk informasi selengkapnya, lihat Debezium temporal values.

    debezium.min.row.count.to.stream.results

    Saat jumlah baris dalam tabel melebihi nilai ini, mode pembacaan batch digunakan.

    Tidak

    INTEGER

    1000

    Flink membaca data dari tabel sumber MySQL dengan cara berikut:

    • Pembacaan lengkap: Membaca seluruh data tabel langsung ke memori. Metode ini cepat tetapi mengonsumsi memori yang sesuai. Jika tabel sumber sangat besar, hal ini dapat menimbulkan risiko OOM.

    • Pembacaan batch: Membaca data dalam beberapa batch, dengan jumlah baris tertentu per batch, hingga semua data terbaca. Metode ini menghindari risiko OOM saat membaca tabel besar tetapi relatif lebih lambat.

    connect.timeout

    Waktu maksimum menunggu koneksi ke server database MySQL hingga timeout sebelum mencoba lagi.

    Tidak

    DURATION

    30s

    None.

    connect.max-retries

    Jumlah maksimum percobaan ulang setelah koneksi ke layanan database MySQL gagal.

    Tidak

    INTEGER

    3

    None.

    connection.pool.size

    Ukuran kolam koneksi database.

    Tidak

    INTEGER

    20

    Kolam koneksi database digunakan untuk menggunakan ulang koneksi, yang dapat mengurangi jumlah koneksi database.

    jdbc.properties.*

    Opsi koneksi kustom dalam URL JDBC.

    Tidak

    STRING

    None

    Anda dapat meneruskan opsi koneksi kustom. Misalnya, untuk tidak menggunakan protokol SSL, Anda dapat mengatur 'jdbc.properties.useSSL' = 'false'.

    Untuk informasi selengkapnya tentang opsi koneksi yang didukung, lihat MySQL Configuration Properties.

    debezium.*

    Opsi kustom untuk Debezium guna membaca binlog.

    Tidak

    STRING

    None

    Anda dapat meneruskan opsi Debezium kustom. Misalnya, gunakan 'debezium.event.deserialization.failure.handling.mode'='ignore' untuk menentukan logika penanganan kesalahan penguraian.

    heartbeat.interval

    Interval sumber menggunakan event heartbeat untuk memajukan offset binlog.

    Tidak

    DURATION

    30s

    Event heartbeat digunakan untuk memajukan offset binlog di sumber, yang sangat berguna untuk tabel MySQL yang diperbarui secara perlahan. Untuk tabel tersebut, offset binlog tidak maju secara otomatis. Event heartbeat dapat mendorong offset binlog maju, mencegah masalah di mana offset binlog kedaluwarsa menyebabkan pekerjaan gagal dan memerlukan restart tanpa status.

    scan.incremental.snapshot.chunk.key-column

    Kolom yang digunakan untuk membagi chunk selama fase snapshot.

    Lihat Keterangan.

    STRING

    None

    • Wajib untuk tabel tanpa primary key. Kolom yang dipilih harus bertipe non-null (NOT NULL).

    • Opsional untuk tabel dengan primary key. Anda hanya dapat memilih satu kolom dari primary key.

    rds.region-id

    ID wilayah instans ApsaraDB RDS for MySQL.

    Wajib saat membaca log arsip dari OSS.

    STRING

    None

    Untuk informasi selengkapnya tentang ID wilayah, lihat Wilayah dan zona.

    rds.access-key-id

    ID AccessKey akun untuk instans ApsaraDB RDS for MySQL.

    Wajib saat membaca log arsip dari OSS.

    STRING

    None

    Untuk informasi selengkapnya, lihat Bagaimana cara melihat ID AccessKey dan Rahasia AccessKey?.

    Penting

    Untuk mencegah kebocoran informasi AccessKey Anda, kami merekomendasikan agar Anda mengelola ID AccessKey menggunakan manajemen rahasia. Untuk informasi selengkapnya, lihat Kelola variabel.

    rds.access-key-secret

    Rahasia AccessKey akun untuk instans ApsaraDB RDS for MySQL.

    Wajib saat membaca log arsip dari OSS.

    STRING

    None

    Untuk informasi selengkapnya, lihat Bagaimana cara melihat ID AccessKey dan Rahasia AccessKey?

    Penting

    Untuk mencegah kebocoran informasi AccessKey Anda, kami merekomendasikan agar Anda mengelola Rahasia AccessKey menggunakan manajemen rahasia. Untuk informasi selengkapnya, lihat Kelola variabel.

    rds.db-instance-id

    ID instans instans ApsaraDB RDS for MySQL.

    Wajib saat membaca log arsip dari OSS.

    STRING

    None

    None.

    rds.main-db-id

    ID database utama instans ApsaraDB RDS for MySQL.

    Tidak

    STRING

    None

    rds.download.timeout

    Waktu tunggu untuk mengunduh satu log arsip dari OSS.

    Tidak

    DURATION

    60s

    None.

    rds.endpoint

    Titik akhir layanan untuk mendapatkan informasi binlog OSS.

    Tidak

    STRING

    None

    • Untuk informasi selengkapnya tentang nilai yang valid, lihat Titik akhir.

    • Didukung hanya di VVR 8.0.8 dan versi selanjutnya.

    scan.incremental.close-idle-reader.enabled

    Menentukan apakah pembaca idle ditutup setelah fase snapshot berakhir.

    Tidak

    BOOLEAN

    false

    • Didukung hanya di VVR 8.0.1 dan versi selanjutnya.

    • Agar konfigurasi ini berlaku, Anda harus mengatur execution.checkpointing.checkpoints-after-tasks-finish.enabled ke true.

    scan.read-changelog-as-append-only.enabled

    Menentukan apakah aliran changelog diubah menjadi aliran append-only.

    Tidak

    BOOLEAN

    false

    Nilai yang valid:

    • true: Semua jenis pesan, termasuk INSERT, DELETE, UPDATE_BEFORE, dan UPDATE_AFTER, diubah menjadi pesan INSERT. Aktifkan ini hanya dalam skenario khusus, seperti saat Anda perlu menyimpan pesan hapus dari tabel upstream.

    • false (Default): Semua jenis pesan dikirim downstream apa adanya.

    Catatan

    Didukung hanya di VVR 8.0.8 dan versi selanjutnya.

    scan.only.deserialize.captured.tables.changelog.enabled

    Selama fase inkremental, menentukan apakah hanya mendeserialisasi event perubahan untuk tabel yang ditentukan.

    Tidak

    BOOLEAN

    • Nilai default adalah false di versi VVR 8.x.

    • Nilai default adalah true di VVR 11.1 dan versi selanjutnya.

    Nilai yang valid:

    • true: Mendeserialisasi data perubahan hanya untuk tabel target guna mempercepat pembacaan binlog.

    • false (Default): Mendeserialisasi data perubahan untuk semua tabel.

    Catatan
    • Didukung hanya di VVR 8.0.7 dan versi selanjutnya.

    • Saat digunakan di VVR 8.0.8 dan versi sebelumnya, nama opsi harus diubah menjadi debezium.scan.only.deserialize.captured.tables.changelog.enable.

    scan.parse.online.schema.changes.enabled

    Selama fase inkremental, menentukan apakah mencoba mengurai event DDL perubahan tanpa penguncian RDS.

    Tidak

    BOOLEAN

    false

    Nilai yang valid:

    • true: Mengurai event DDL perubahan tanpa penguncian RDS.

    • false (Default): Tidak mengurai event DDL perubahan tanpa penguncian RDS.

    Ini adalah fitur eksperimental. Kami merekomendasikan agar Anda mengambil snapshot pekerjaan Flink untuk pemulihan sebelum melakukan perubahan online tanpa penguncian.

    Catatan

    Didukung hanya di VVR 11.1 dan versi selanjutnya.

    scan.incremental.snapshot.backfill.skip

    Menentukan apakah melewati backfill selama fase pembacaan snapshot.

    Tidak

    BOOLEAN

    false

    Nilai yang valid:

    • true: Melewati backfill selama fase pembacaan snapshot.

    • false (Default): Tidak melewati backfill selama fase pembacaan snapshot.

    Jika Anda melewati backfill, perubahan pada tabel selama fase snapshot dibaca pada fase inkremental berikutnya, bukan digabungkan ke dalam snapshot.

    Penting

    Melewati backfill dapat menyebabkan ketidakkonsistenan data karena perubahan yang terjadi selama fase snapshot mungkin diputar ulang. Hanya semantik at-least-once yang dijamin.

    Catatan

    Didukung hanya di VVR 11.1 dan versi selanjutnya.

    scan.incremental.snapshot.unbounded-chunk-first.enabled

    Menentukan apakah mendistribusikan chunk tak terbatas terlebih dahulu selama fase pembacaan snapshot.

    Tidak

    BOOLEAN

    false

    Nilai yang valid:

    • true: Mendistribusikan chunk tak terbatas terlebih dahulu selama fase pembacaan snapshot.

    • false (Default): Tidak mendistribusikan chunk tak terbatas terlebih dahulu selama fase pembacaan snapshot.

    Ini adalah fitur eksperimental. Mengaktifkannya dapat mengurangi risiko error kehabisan memori (OOM) saat Pengelola Tugas menyinkronkan chunk terakhir selama fase snapshot. Kami menyarankan agar Anda menambahkan ini sebelum startup pekerjaan pertama kali.

    Catatan

    Didukung hanya di VVR 11.1 dan versi selanjutnya.

    binlog.session.network.timeout

    Timeout baca/tulis jaringan untuk koneksi binlog.

    Tidak

    DURATION

    10m

    Jika Anda mengatur opsi ini ke 0s, timeout default server MySQL digunakan.

    Catatan

    Didukung hanya di VVR 11.5 dan versi selanjutnya.

    scan.rate-limit.records-per-second

    Jumlah maksimum catatan yang dapat dikirim sumber per detik.

    Tidak

    LONG

    None

    Opsi ini berlaku untuk skenario di mana Anda perlu membatasi pembacaan data. Batasan berlaku di kedua fase, lengkap dan inkremental.

    Metrik numRecordsOutPerSecond dari sumber mencerminkan jumlah catatan yang dikeluarkan aliran data per detik. Anda dapat menyesuaikan opsi ini berdasarkan metrik tersebut.

    Pada fase pembacaan data lengkap, Anda biasanya perlu mengurangi jumlah catatan data yang dibaca dalam setiap batch. Anda dapat mengurangi nilai opsi scan.incremental.snapshot.chunk.size.

    Catatan

    Didukung hanya di VVR 11.5 dan versi selanjutnya.

  • Khusus tabel dimensi

    Parameter

    Deskripsi

    Wajib

    Tipe data

    Nilai default

    Keterangan

    url

    URL JDBC MySQL.

    Tidak

    STRING

    None

    Format URL adalah: jdbc:mysql://<endpoint>:<port>/<database_name>.

    lookup.max-retries

    Jumlah maksimum percobaan ulang setelah pembacaan data gagal.

    Tidak

    INTEGER

    3

    Didukung hanya di VVR 6.0.7 dan versi selanjutnya.

    lookup.cache.strategy

    Kebijakan cache.

    Tidak

    STRING

    None

    Mendukung tiga kebijakan cache: None, LRU, dan ALL. Untuk informasi selengkapnya tentang nilai-nilai tersebut, lihat Informasi latar belakang.

    Catatan

    Saat Anda menggunakan kebijakan cache LRU, Anda juga harus mengonfigurasi opsi lookup.cache.max-rows.

    lookup.cache.max-rows

    Jumlah maksimum baris yang dicache.

    Tidak

    INTEGER

    100000

    • Jika Anda memilih kebijakan cache LRU, Anda harus mengatur ukuran cache.

    • Jika Anda memilih kebijakan cache ALL, Anda tidak perlu mengatur ukuran cache.

    lookup.cache.ttl

    Waktu hidup (TTL) cache.

    Tidak

    DURATION

    10 s

    Konfigurasi lookup.cache.ttl tergantung pada lookup.cache.strategy:

    • Jika lookup.cache.strategy diatur ke None, Anda tidak perlu mengonfigurasi lookup.cache.ttl. Artinya cache tidak kedaluwarsa.

    • Jika lookup.cache.strategy diatur ke LRU, lookup.cache.ttl adalah TTL cache. Secara default, tidak kedaluwarsa.

    • Jika lookup.cache.strategy diatur ke ALL, lookup.cache.ttl adalah waktu reload cache. Secara default, tidak direload.

    Gunakan format waktu, seperti 1min atau 10s.

    lookup.max-join-rows

    Jumlah maksimum hasil yang dikembalikan saat mengkueri tabel dimensi untuk setiap catatan di tabel utama.

    Tidak

    INTEGER

    1024

    None.

    lookup.filter-push-down.enabled

    Menentukan apakah mengaktifkan filter pushdown untuk tabel dimensi.

    Tidak

    BOOLEAN

    false

    Nilai yang valid:

    • true: Mengaktifkan filter pushdown untuk tabel dimensi. Saat memuat data dari tabel database MySQL, tabel dimensi memfilter data terlebih dahulu berdasarkan kondisi yang ditetapkan dalam pekerjaan SQL.

    • false (Default): Menonaktifkan filter pushdown untuk tabel dimensi. Saat memuat data dari tabel database MySQL, tabel dimensi memuat semua data.

    Catatan

    Didukung hanya di VVR 8.0.7 dan versi selanjutnya.

    Penting

    Filter pushdown untuk tabel dimensi hanya boleh diaktifkan saat tabel Flink digunakan sebagai tabel dimensi. Tabel sumber MySQL tidak mendukung pengaktifan filter pushdown. Jika tabel Flink digunakan sebagai tabel sumber dan tabel dimensi, dan filter pushdown diaktifkan untuk tabel dimensi, Anda harus secara eksplisit mengatur item konfigurasi ini ke false untuk tabel sumber menggunakan SQL Hints. Jika tidak, pekerjaan mungkin berjalan abnormal.

  • Khusus sink

    Parameter

    Deskripsi

    Wajib

    Tipe data

    Nilai default

    Keterangan

    url

    URL JDBC MySQL.

    Tidak

    STRING

    None

    Format URL adalah: jdbc:mysql://<endpoint>:<port>/<database_name>.

    sink.max-retries

    Jumlah maksimum percobaan ulang setelah penulisan data gagal.

    Tidak

    INTEGER

    3

    None.

    sink.buffer-flush.batch-size

    Jumlah catatan dalam satu batch penulisan.

    Tidak

    INTEGER

    4096

    None.

    sink.buffer-flush.max-rows

    Jumlah catatan data yang disimpan dalam memori.

    Tidak

    INTEGER

    10000

    Opsi ini hanya berlaku setelah primary key ditentukan.

    sink.buffer-flush.interval

    Interval waktu untuk mengosongkan buffer. Jika data dalam buffer tidak memenuhi kondisi output setelah waktu tunggu yang ditentukan, sistem secara otomatis mengeluarkan semua data dalam buffer.

    Tidak

    DURATION

    1s

    None.

    sink.ignore-delete

    Menentukan apakah mengabaikan operasi DELETE.

    Tidak

    BOOLEAN

    false

    Saat aliran yang dihasilkan Flink SQL berisi catatan delete atau update-before, jika beberapa tugas output memperbarui bidang berbeda dari tabel yang sama secara bersamaan, ketidakkonsistenan data dapat terjadi.

    Misalnya, setelah catatan dihapus, tugas lain hanya memperbarui beberapa bidang. Bidang yang tidak diperbarui akan menjadi null atau nilai default-nya, menyebabkan kesalahan data.

    Dengan mengatur sink.ignore-delete ke true, Anda dapat mengabaikan operasi DELETE dan UPDATE_BEFORE upstream untuk menghindari masalah tersebut.

    Catatan
    • UPDATE_BEFORE adalah bagian dari mekanisme retraksi Flink, digunakan untuk "menarik kembali" nilai lama dalam operasi pembaruan.

    • Saat ignoreDelete = true, semua catatan DELETE dan UPDATE_BEFORE dilewati. Hanya catatan INSERT dan UPDATE_AFTER yang diproses.

    sink.ignore-null-when-update

    Saat memperbarui data, menentukan apakah memperbarui bidang yang sesuai menjadi null atau melewati pembaruan untuk bidang tersebut jika bidang data masuk adalah null.

    Tidak

    BOOLEAN

    false

    Nilai yang valid:

    • true: Tidak memperbarui bidang. Opsi ini dapat diatur ke true hanya saat primary key ditetapkan untuk tabel Flink. Saat diatur ke true:

      • Di VVR 8.0.6 dan versi sebelumnya, eksekusi batch tidak didukung untuk menulis data ke tabel sink.

      • Di VVR 8.0.7 dan versi selanjutnya, eksekusi batch didukung untuk menulis data ke tabel sink.

        Meskipun penulisan batch dapat meningkatkan efisiensi penulisan dan throughput keseluruhan secara signifikan, hal ini dapat menyebabkan latensi data dan risiko OOM. Oleh karena itu, pertimbangkan trade-off berdasarkan skenario bisnis aktual Anda.

    • false: Memperbarui bidang menjadi null.

    Catatan

    Opsi ini didukung hanya di VVR 8.0.5 dan versi selanjutnya.

Pemetaan tipe

  • Tabel sumber CDC

    Tipe bidang CDC MySQL

    Tipe bidang Flink

    TINYINT

    TINYINT

    SMALLINT

    SMALLINT

    TINYINT UNSIGNED

    TINYINT UNSIGNED ZEROFILL

    INT

    INT

    MEDIUMINT

    SMALLINT UNSIGNED

    SMALLINT UNSIGNED ZEROFILL

    BIGINT

    BIGINT

    INT UNSIGNED

    INT UNSIGNED ZEROFILL

    MEDIUMINT UNSIGNED

    MEDIUMINT UNSIGNED ZEROFILL

    BIGINT UNSIGNED

    DECIMAL(20, 0)

    BIGINT UNSIGNED ZEROFILL

    SERIAL

    FLOAT [UNSIGNED] [ZEROFILL]

    FLOAT

    DOUBLE [UNSIGNED] [ZEROFILL]

    DOUBLE

    DOUBLE PRECISION [UNSIGNED] [ZEROFILL]

    REAL [UNSIGNED] [ZEROFILL]

    NUMERIC(p, s) [UNSIGNED] [ZEROFILL]

    DECIMAL(p, s)

    DECIMAL(p, s) [UNSIGNED] [ZEROFILL]

    BOOLEAN

    BOOLEAN

    TINYINT(1)

    DATE

    DATE

    TIME [(p)]

    TIME [(p)] [WITHOUT TIME ZONE]

    DATETIME [(p)]

    TIMESTAMP [(p)] [WITHOUT TIME ZONE]

    TIMESTAMP [(p)]

    TIMESTAMP [(p)]

    TIMESTAMP [(p)] WITH LOCAL TIME ZONE

    CHAR(n)

    STRING

    VARCHAR(n)

    TEXT

    BINARY

    BYTES

    VARBINARY

    BLOB

    Penting

    Kami menyarankan agar Anda tidak menggunakan tipe TINYINT(1) di MySQL untuk menyimpan nilai selain 0 dan 1. Saat property-version = 0, tabel sumber CDC MySQL memetakan TINYINT(1) ke tipe BOOLEAN Flink secara default, yang dapat menyebabkan ketidakakuratan data. Untuk menggunakan tipe TINYINT(1) guna menyimpan nilai selain 0 dan 1, lihat opsi konfigurasi catalog.table.treat-tinyint1-as-boolean.

  • Tabel dimensi dan tabel sink

    Tipe bidang MySQL

    Tipe bidang Flink

    TINYINT

    TINYINT

    SMALLINT

    SMALLINT

    TINYINT UNSIGNED

    INT

    INT

    MEDIUMINT

    SMALLINT UNSIGNED

    BIGINT

    BIGINT

    INT UNSIGNED

    BIGINT UNSIGNED

    DECIMAL(20, 0)

    FLOAT

    FLOAT

    DOUBLE

    DOUBLE

    DOUBLE PRECISION

    NUMERIC(p, s)

    DECIMAL(p, s)

    Catatan

    p harus kurang dari atau sama dengan 38.

    DECIMAL(p, s)

    BOOLEAN

    BOOLEAN

    TINYINT(1)

    DATE

    DATE

    TIME [(p)]

    TIME [(p)] [WITHOUT TIME ZONE]

    DATETIME [(p)]

    TIMESTAMP [(p)] [WITHOUT TIME ZONE]

    TIMESTAMP [(p)]

    CHAR(n)

    CHAR(n)

    VARCHAR(n)

    VARCHAR(n)

    BIT(n)

    BINARY(⌈n/8⌉)

    BINARY(n)

    BINARY(n)

    VARBINARY(N)

    VARBINARY(N)

    TINYTEXT

    STRING

    TEXT

    MEDIUMTEXT

    LONGTEXT

    TINYBLOB

    BYTES

    Penting

    Flink hanya mendukung catatan tipe BLOB MySQL hingga 2.147.483.647 (2^31 - 1) byte.

    BLOB

    MEDIUMBLOB

    LONGBLOB

Ingesti Data

Anda dapat menggunakan konektor MySQL sebagai sumber data dalam pekerjaan YAML ingesti data.

Sintaks

source:
   type: mysql
   name: MySQL Source
   hostname: localhost
   port: 3306
   username: <username>
   password: <password>
   tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
   server-id: 5401-5404

sink:
  type: xxx

Item konfigurasi

Parameter

Deskripsi

Wajib

Tipe data

Nilai default

Keterangan

type

Jenis sumber data.

Ya

STRING

None

Bidang statis diatur ke mysql.

name

Nama sumber data.

Tidak

STRING

None

None.

hostname

Alamat IP atau hostname database MySQL.

Ya

STRING

None

Kami menyarankan memasukkan alamat Virtual Private Cloud (VPC).

Catatan

Jika database MySQL dan Realtime Compute for Apache Flink tidak berada dalam VPC yang sama, Anda harus membuat koneksi jaringan cross-VPC atau menggunakan jaringan publik untuk akses. Untuk informasi selengkapnya, lihat Kelola dan operasikan ruang kerja dan Bagaimana kluster Flink yang sepenuhnya dikelola mengakses jaringan publik?.

username

Username untuk layanan database MySQL.

Ya

STRING

None

None.

password

Password untuk layanan database MySQL.

Ya

STRING

None

None.

tables

Tabel data MySQL yang akan disinkronkan.

Ya

STRING

None

  • Opsi ini mendukung ekspresi reguler untuk membaca data dari beberapa tabel.

  • Anda dapat menggunakan koma untuk memisahkan beberapa ekspresi reguler.

Catatan
  • Jangan gunakan karakter pencocokan awal dan akhir ^ dan $ dalam ekspresi reguler. Di versi 11.2, ekspresi reguler database diperoleh dengan memisahkan menggunakan titik. Karakter pencocokan awal dan akhir akan membuat ekspresi reguler database yang diperoleh tidak dapat digunakan. Misalnya, ^db.user_[0-9]+$ perlu diubah menjadi db.user_[0-9]+.

  • Titik digunakan untuk memisahkan nama database dan nama tabel. Untuk menggunakan titik untuk mencocokkan karakter apa pun, Anda harus meng-escape titik dengan backslash. Contoh: db0.\.*, db1.user_table_[0-9]+, db[1-2].[app|web]order_\.*.

tables.exclude

Tabel yang dikecualikan dari sinkronisasi.

Tidak

STRING

None

  • Opsi ini mendukung ekspresi reguler untuk mengecualikan data dari beberapa tabel.

  • Anda dapat menggunakan koma untuk memisahkan beberapa ekspresi reguler.

Catatan

Titik digunakan untuk memisahkan nama database dan nama tabel. Untuk menggunakan titik untuk mencocokkan karakter apa pun, Anda harus meng-escape titik dengan backslash. Contoh: db0.\.*, db1.user_table_[0-9]+, db[1-2].[app|web]order_\.*.

port

Nomor port layanan database MySQL.

Tidak

INTEGER

3306

None.

schema-change.enabled

Menentukan apakah mengirim event perubahan skema.

Tidak

BOOLEAN

true

None.

server-id

ID numerik atau rentang ID untuk klien database yang digunakan untuk sinkronisasi.

Tidak

STRING

Nilai acak antara 5400 dan 6400 dihasilkan.

ID ini harus unik secara global dalam kluster MySQL. Kami merekomendasikan agar Anda menetapkan ID berbeda untuk setiap pekerjaan yang terhubung ke database yang sama.

Opsi ini juga mendukung rentang ID, seperti 5400-5408. Saat pembacaan inkremental diaktifkan, pembacaan konkuren didukung. Dalam hal ini, kami merekomendasikan agar Anda menetapkan rentang ID sehingga setiap pembaca konkuren menggunakan ID berbeda.

jdbc.properties.*

Opsi koneksi kustom dalam URL JDBC.

Tidak

STRING

None

Anda dapat meneruskan opsi koneksi kustom. Misalnya, untuk tidak menggunakan protokol SSL, Anda dapat mengatur 'jdbc.properties.useSSL' = 'false'.

Untuk informasi selengkapnya tentang opsi koneksi yang didukung, lihat MySQL Configuration Properties.

debezium.*

Opsi kustom untuk Debezium guna membaca binlog.

Tidak

STRING

None

Anda dapat meneruskan opsi Debezium kustom. Misalnya, gunakan 'debezium.event.deserialization.failure.handling.mode'='ignore' untuk menentukan logika penanganan kesalahan penguraian.

scan.incremental.snapshot.chunk.size

Ukuran setiap chunk dalam jumlah baris.

Tidak

INTEGER

8096

Tabel MySQL dibagi menjadi beberapa chunk untuk dibaca. Data chunk disimpan dalam memori sebelum sepenuhnya dibaca.

Meskipun menggunakan lebih sedikit baris per chunk memberikan granularitas lebih baik untuk pemulihan kesalahan, hal ini juga dapat menyebabkan error kehabisan memori (OOM) dan mengurangi throughput keseluruhan. Oleh karena itu, Anda harus menetapkan ukuran chunk yang wajar yang menyeimbangkan trade-off ini.

scan.snapshot.fetch.size

Jumlah maksimum catatan yang diambil sekaligus saat membaca data lengkap tabel.

Tidak

INTEGER

1024

None.

scan.startup.mode

Mode startup untuk konsumsi data.

Tidak

STRING

initial

Nilai yang valid:

  • initial (Default): Memindai data historis lengkap terlebih dahulu, lalu membaca data binlog terbaru saat startup pertama kali.

  • latest-offset: Tidak memindai data historis saat startup pertama kali. Mulai membaca dari akhir binlog, artinya hanya membaca perubahan terbaru setelah konektor dimulai.

  • earliest-offset: Tidak memindai data historis. Mulai membaca dari binlog paling awal yang tersedia.

  • specific-offset: Tidak memindai data historis. Mulai dari offset binlog tertentu yang Anda tentukan. Anda dapat menentukan offset dengan mengonfigurasi scan.startup.specific-offset.file dan scan.startup.specific-offset.pos, atau hanya mengonfigurasi scan.startup.specific-offset.gtid-set untuk memulai dari set GTID tertentu.

  • timestamp: Tidak memindai data historis. Mulai membaca binlog dari timestamp tertentu. Timestamp ditentukan oleh scan.startup.timestamp-millis dalam milidetik.

Penting

Untuk mode startup earliest-offset, specific-offset, dan timestamp, jika skema tabel saat startup berbeda dengan skema pada waktu offset awal yang ditentukan, pekerjaan akan gagal akibat ketidakcocokan skema. Dengan kata lain, saat Anda menggunakan ketiga mode startup ini, Anda harus memastikan bahwa skema tabel yang sesuai tidak berubah antara posisi konsumsi binlog yang ditentukan dan waktu startup pekerjaan.

scan.startup.specific-offset.file

Nama file binlog dari offset awal saat menggunakan mode startup offset tertentu.

Tidak

STRING

None

Saat menggunakan konfigurasi ini, Anda harus mengatur scan.startup.mode ke specific-offset. Format nama file contohnya mysql-bin.000003.

scan.startup.specific-offset.pos

Offset dalam file binlog tertentu untuk memulai saat menggunakan mode startup offset tertentu.

Tidak

INTEGER

None

Saat menggunakan konfigurasi ini, Anda harus mengatur scan.startup.mode ke specific-offset.

scan.startup.specific-offset.gtid-set

Set GTID dari offset awal saat menggunakan mode startup offset tertentu.

Tidak

STRING

None

Saat menggunakan konfigurasi ini, Anda harus mengatur scan.startup.mode ke specific-offset. Format set GTID contohnya 24DA167-0C0C-11E8-8442-00059A3C7B00:1-19.

scan.startup.timestamp-millis

Offset awal sebagai timestamp milidetik saat menggunakan mode startup timestamp.

Tidak

LONG

None

Saat menggunakan konfigurasi ini, Anda harus mengatur scan.startup.mode ke timestamp. Timestamp dalam milidetik.

Penting

Saat Anda menentukan waktu, CDC MySQL mencoba membaca event awal setiap file binlog untuk menentukan timestamp-nya dan menemukan file binlog yang sesuai dengan waktu yang ditentukan. Pastikan file binlog untuk timestamp yang ditentukan belum dihapus dari database dan dapat dibaca.

server-time-zone

Zona waktu sesi yang digunakan oleh database.

Tidak

STRING

Jika Anda tidak menentukan opsi ini, sistem menggunakan zona waktu lingkungan runtime pekerjaan Flink sebagai zona waktu server database. Ini adalah zona waktu wilayah yang Anda pilih.

Contoh: Asia/Shanghai. Opsi ini mengontrol bagaimana tipe TIMESTAMP di MySQL dikonversi ke tipe STRING. Untuk informasi selengkapnya, lihat Debezium temporal values.

scan.startup.specific-offset.skip-events

Jumlah event binlog yang dilewati saat membaca dari offset tertentu.

Tidak

INTEGER

None

Saat menggunakan konfigurasi ini, scan.startup.mode harus diatur ke specific-offset.

scan.startup.specific-offset.skip-rows

Jumlah perubahan baris yang dilewati saat membaca dari offset tertentu. Satu event binlog mungkin sesuai dengan beberapa perubahan baris.

Tidak

INTEGER

None

Saat menggunakan konfigurasi ini, scan.startup.mode harus diatur ke specific-offset.

connect.timeout

Waktu maksimum menunggu koneksi ke server database MySQL hingga timeout sebelum mencoba lagi.

Tidak

DURATION

30s

None.

connect.max-retries

Jumlah maksimum percobaan ulang setelah koneksi ke layanan database MySQL gagal.

Tidak

INTEGER

3

None.

connection.pool.size

Ukuran kolam koneksi database.

Tidak

INTEGER

20

Kolam koneksi database digunakan untuk menggunakan ulang koneksi, yang dapat mengurangi jumlah koneksi database.

heartbeat.interval

Interval sumber menggunakan event heartbeat untuk memajukan offset binlog.

Tidak

DURATION

30s

Event heartbeat digunakan untuk memajukan offset binlog di sumber, yang sangat berguna untuk tabel MySQL yang diperbarui secara perlahan. Untuk tabel tersebut, offset binlog tidak maju secara otomatis. Event heartbeat dapat mendorong offset binlog maju, mencegah masalah di mana offset binlog kedaluwarsa menyebabkan pekerjaan gagal dan memerlukan restart tanpa status.

scan.incremental.snapshot.chunk.key-column

Kolom yang digunakan untuk membagi chunk selama fase snapshot.

Tidak.

STRING

None

Anda hanya dapat memilih satu kolom dari primary key.

rds.region-id

ID wilayah instans ApsaraDB RDS for MySQL.

Wajib saat membaca log arsip dari OSS.

STRING

None

Untuk informasi selengkapnya tentang ID wilayah, lihat Wilayah dan zona.

rds.access-key-id

ID AccessKey akun untuk instans ApsaraDB RDS for MySQL.

Wajib saat membaca log arsip dari OSS.

STRING

None

Untuk informasi selengkapnya, lihat Bagaimana cara melihat ID AccessKey dan Rahasia AccessKey?

Penting

Untuk mencegah kebocoran informasi AccessKey Anda, kami merekomendasikan agar Anda mengelola ID AccessKey menggunakan manajemen rahasia. Untuk informasi selengkapnya, lihat Kelola variabel.

rds.access-key-secret

Rahasia AccessKey akun untuk instans ApsaraDB RDS for MySQL.

Wajib saat membaca log arsip dari OSS.

STRING

None

Untuk informasi selengkapnya, lihat Bagaimana cara melihat ID AccessKey dan Rahasia AccessKey?

Penting

Untuk mencegah kebocoran informasi AccessKey Anda, kami merekomendasikan agar Anda mengelola Rahasia AccessKey menggunakan manajemen rahasia. Untuk informasi selengkapnya, lihat Kelola variabel.

rds.db-instance-id

ID instans dari instans ApsaraDB RDS untuk MySQL.

Wajib saat membaca log arsip dari OSS.

STRING

None

None.

rds.main-db-id

ID database utama instans ApsaraDB RDS for MySQL.

Tidak

STRING

None

Untuk informasi selengkapnya tentang cara mendapatkan ID database utama, lihat Backup log untuk ApsaraDB RDS for MySQL.

rds.download.timeout

Waktu tunggu untuk mengunduh satu log arsip dari OSS.

Tidak

DURATION

60s

None.

rds.endpoint

Titik akhir layanan untuk mendapatkan informasi binlog OSS.

Tidak

STRING

None

Untuk informasi selengkapnya tentang nilai yang valid, lihat Titik akhir.

rds.binlog-directory-prefix

Awalan direktori untuk menyimpan file binlog.

Tidak

STRING

rds-binlog-

None.

rds.use-intranet-link

Menentukan apakah menggunakan jaringan internal untuk mengunduh file binlog.

Tidak

BOOLEAN

true

None.

rds.binlog-directories-parent-path

Jalur mutlak direktori induk tempat file binlog disimpan.

Tidak

STRING

None

None.

chunk-meta.group.size

Ukuran metadata chunk.

Tidak

INTEGER

1000

Jika metadata lebih besar dari nilai ini, metadata dikirim dalam beberapa bagian.

chunk-key.even-distribution.factor.lower-bound

Batas bawah faktor distribusi chunk untuk pemisahan merata.

Tidak

DOUBLE

0.05

Jika faktor distribusi kurang dari nilai parameter ini, chunk tidak didistribusikan secara merata.

Faktor distribusi chunk = (MAX(chunk-key) - MIN(chunk-key) + 1) / jumlah total baris data.

chunk-key.even-distribution.factor.upper-bound

Batas atas faktor distribusi chunk untuk pemisahan merata.

Tidak

DOUBLE

1000.0

Jika faktor distribusi lebih besar dari nilai ini, pemisahan tidak merata digunakan.

Faktor distribusi chunk = (MAX(chunk-key) - MIN(chunk-key) + 1) / Jumlah total baris.

scan.incremental.close-idle-reader.enabled

Menentukan apakah pembaca idle ditutup setelah fase snapshot berakhir.

Tidak

BOOLEAN

false

Agar konfigurasi ini berlaku, Anda harus mengatur execution.checkpointing.checkpoints-after-tasks-finish.enabled ke true.

scan.only.deserialize.captured.tables.changelog.enabled

Selama fase inkremental, menentukan apakah hanya mendeserialisasi event perubahan untuk tabel yang ditentukan.

Tidak

BOOLEAN

  • Nilai default adalah false di versi VVR 8.x.

  • Nilai default adalah true di VVR 11.1 dan versi selanjutnya.

Nilai yang valid:

  • true: Mendeserialisasi data perubahan hanya untuk tabel target guna mempercepat pembacaan binlog.

  • false (Default): Mendeserialisasi data perubahan untuk semua tabel.

scan.parallel-deserialize-changelog.enabled

Selama fase inkremental, menentukan apakah menggunakan beberapa thread untuk mengurai event perubahan.

Tidak

BOOLEAN

false

Nilai yang valid:

  • true: Menggunakan beberapa thread selama fase deserialisasi event perubahan sambil mempertahankan urutan event binlog, yang mempercepat pembacaan.

  • false (Default): Menggunakan satu thread selama fase deserialisasi event.

Catatan

Didukung hanya di VVR 8.0.11 dan versi selanjutnya.

scan.parallel-deserialize-changelog.handler.size

Jumlah handler event saat menggunakan beberapa thread untuk mengurai event perubahan.

Tidak

INTEGER

2

Catatan

Didukung hanya di VVR 8.0.11 dan versi selanjutnya.

metadata-column.include-list

Kolom metadata yang diteruskan ke downstream.

Tidak

STRING

None

Metadata yang tersedia meliputi table_name, database_name, op_ts, es_ts, query_log, file, dan pos. Anda dapat menggunakan koma untuk memisahkannya.

Catatan

Konektor YAML CDC MySQL tidak perlu dan tidak mendukung penambahan kolom metadata op_type. Anda dapat langsung menggunakan __data_event_type__ dalam ekspresi Transform untuk mendapatkan tipe data perubahan.

Penting

Kolom metadata file merepresentasikan file binlog tempat data berada. Kolom ini berupa "" selama fase lengkap dan nama file binlog selama fase inkremental. Kolom metadata pos merepresentasikan offset data dalam file binlog. Kolom ini berupa "0" selama fase lengkap dan offset data dalam file binlog selama fase inkremental. Kedua kolom metadata ini didukung mulai dari VVR 11.5.

Kolom metadata es_ts merepresentasikan waktu mulai transaksi di MySQL yang sesuai dengan changelog. Kolom ini hanya dapat ditambahkan saat Anda menggunakan MySQL versi 8.0.x. Jangan tambahkan kolom metadata ini saat menggunakan versi MySQL sebelumnya.

scan.newly-added-table.enabled

Saat memulai ulang dari checkpoint, menentukan apakah menyinkronkan tabel yang baru ditambahkan yang tidak cocok pada proses sebelumnya atau menghapus tabel yang saat ini tidak cocok yang disimpan dalam state.

Tidak

BOOLEAN

false

Ini berlaku saat memulai ulang dari checkpoint atau titik simpan.

scan.binlog.newly-added-table.enabled

Selama fase inkremental, menentukan apakah mengirim data dari tabel yang baru ditambahkan yang cocok.

Tidak

BOOLEAN

false

Tidak dapat diaktifkan secara bersamaan dengan scan.newly-added-table.enabled.

scan.incremental.snapshot.chunk.key-column

Menentukan kolom untuk tabel tertentu yang akan digunakan sebagai kunci pemisahan chunk selama fase snapshot.

Tidak

STRING

None

  • Gunakan titik dua (:) untuk menghubungkan nama tabel dan nama bidang guna membentuk aturan. Nama tabel dapat berupa ekspresi reguler. Anda dapat mendefinisikan beberapa aturan yang dipisahkan dengan titik koma (;). Contoh: db1.user_table_[0-9]+:col1;db[1-2].[app|web]_order_\\.*:col2.

  • Untuk tabel tanpa primary key, ini wajib. Kolom yang dipilih tidak boleh null (NOT NULL). Untuk tabel dengan primary key, ini opsional. Anda hanya dapat memilih satu kolom dari primary key.

scan.parse.online.schema.changes.enabled

Selama fase inkremental, menentukan apakah mencoba mengurai event DDL perubahan tanpa penguncian RDS.

Tidak

BOOLEAN

false

Nilai yang valid:

  • true: Mengurai event DDL perubahan tanpa penguncian RDS.

  • false (Default): Tidak mengurai event DDL perubahan tanpa penguncian RDS.

Ini adalah fitur eksperimental. Kami merekomendasikan agar Anda mengambil snapshot pekerjaan Flink untuk pemulihan sebelum melakukan perubahan online tanpa penguncian.

Catatan

Didukung hanya di VVR 11.0 dan versi selanjutnya.

scan.incremental.snapshot.backfill.skip

Menentukan apakah melewati backfill selama fase pembacaan snapshot.

Tidak

BOOLEAN

false

Nilai yang valid:

  • true: Melewati backfill selama fase pembacaan snapshot.

  • false (Default): Tidak melewati backfill selama fase pembacaan snapshot.

Jika backfill dilewati, perubahan pada tabel selama fase snapshot dibaca pada fase inkremental berikutnya, bukan digabungkan ke dalam snapshot.

Penting

Melewati backfill dapat menyebabkan ketidakkonsistenan data karena perubahan yang terjadi selama fase snapshot mungkin diputar ulang, yang hanya menjamin semantik at-least-once.

Catatan

Fitur ini hanya tersedia untuk mesin komputasi Flink Ververica Runtime (VVR) 11.1 dan versi selanjutnya.

treat-tinyint1-as-boolean.enabled

Menentukan apakah memperlakukan tipe TINYINT(1) sebagai tipe Boolean.

Tidak

BOOLEAN

true

Nilai yang valid:

  • true (Default): Memperlakukan tipe TINYINT(1) sebagai tipe Boolean.

  • false: Tidak memperlakukan tipe TINYINT(1) sebagai tipe Boolean.

treat-timestamp-as-datetime-enabled

Menentukan apakah memperlakukan tipe TIMESTAMP sebagai tipe DATETIME.

Tidak

BOOLEAN

false

Nilai yang valid:

  • true: Memperlakukan tipe TIMESTAMP MySQL sebagai tipe DATETIME dan memetakannya ke tipe TIMESTAMP CDC.

  • false (Default): Memetakan tipe TIMESTAMP MySQL ke tipe TIMESTAMP_LTZ CDC.

Tipe TIMESTAMP MySQL menyimpan waktu UTC dan dipengaruhi oleh zona waktu. Tipe DATETIME MySQL menyimpan waktu literal dan tidak dipengaruhi oleh zona waktu.

Saat diaktifkan, fitur ini mengubah data TIMESTAMP MySQL ke tipe DATETIME berdasarkan server-time-zone.

include-comments.enabled

Menentukan apakah menyinkronkan komentar tabel dan komentar bidang.

Tidak

BOOLEAN

false

Nilai yang valid:

  • true: Menyinkronkan komentar tabel dan komentar bidang.

  • false (Default): Tidak menyinkronkan komentar tabel dan komentar bidang.

Mengaktifkan fitur ini meningkatkan penggunaan memori pekerjaan.

scan.incremental.snapshot.unbounded-chunk-first.enabled

Menentukan apakah mendistribusikan shard tak terbatas terlebih dahulu selama fase pembacaan snapshot.

Tidak

BOOLEAN

false

Nilai yang valid:

  • true: Mendistribusikan shard tak terbatas terlebih dahulu selama fase pembacaan snapshot.

  • false (Default): Tidak mendistribusikan shard tak terbatas terlebih dahulu selama fase pembacaan snapshot.

Ini adalah fitur eksperimental. Mengaktifkannya dapat mengurangi ancaman Pengelola Tugas mengalami error kehabisan memori (OOM) saat menyinkronkan shard terakhir selama fase snapshot. Tambahkan parameter ini sebelum memulai pekerjaan untuk pertama kalinya.

Catatan

Fitur ini hanya tersedia untuk mesin komputasi Flink VVR 11.1 dan versi selanjutnya.

binlog.session.network.timeout

Durasi timeout jaringan untuk koneksi pencatatan biner.

Tidak

DURATION

10m

Jika parameter ini diatur ke 0 s, durasi timeout default sisi server MySQL digunakan.

Catatan

Fitur ini hanya tersedia untuk mesin komputasi Flink VVR 11.5 dan versi selanjutnya.

scan.rate-limit.records-per-second

Jumlah maksimum catatan yang dapat dikirim sumber per detik.

Tidak

LONG

None

Parameter ini berlaku untuk skenario di mana pembacaan data harus dibatasi. Batasan ini berlaku baik pada fase lengkap maupun inkremental.

Metrik numRecordsOutPerSecond dari sumber mencerminkan jumlah catatan yang dikeluarkan seluruh aliran data per detik. Gunakan metrik ini untuk menyesuaikan parameter.

Selama fase baca lengkap, juga kurangi jumlah catatan yang dibaca dalam setiap batch. Untuk melakukan ini, kurangi nilai parameter scan.incremental.snapshot.chunk.size.

Catatan

Fitur ini hanya tersedia untuk mesin komputasi Flink VVR 11.5 dan versi selanjutnya.

Pemetaan tipe

Tabel berikut menunjukkan pemetaan tipe untuk ingesti data.

Tipe bidang CDC MySQL

Tipe bidang CDC

TINYINT(n)

TINYINT

SMALLINT

SMALLINT

TINYINT UNSIGNED

TINYINT UNSIGNED ZEROFILL

YEAR

INT

INT

MEDIUMINT

MEDIUMINT UNSIGNED

MEDIUMINT UNSIGNED ZEROFILL

SMALLINT UNSIGNED

SMALLINT UNSIGNED ZEROFILL

BIGINT

BIGINT

INT UNSIGNED

INT UNSIGNED ZEROFILL

BIGINT UNSIGNED

DECIMAL(20, 0)

BIGINT UNSIGNED ZEROFILL

SERIAL

FLOAT [UNSIGNED] [ZEROFILL]

FLOAT

DOUBLE [UNSIGNED] [ZEROFILL]

DOUBLE

DOUBLE PRECISION [UNSIGNED] [ZEROFILL]

REAL [UNSIGNED] [ZEROFILL]

NUMERIC(p, s) [UNSIGNED] [ZEROFILL] where p <= 38

DECIMAL(p, s)

DECIMAL(p, s) [UNSIGNED] [ZEROFILL] where p <= 38

FIXED(p, s) [UNSIGNED] [ZEROFILL] where p <= 38

BOOLEAN

BOOLEAN

BIT(1)

TINYINT(1)

DATE

DATE

TIME [(p)]

TIME [(p)]

DATETIME [(p)]

TIMESTAMP [(p)]

TIMESTAMP [(p)]

Bergantung pada nilai parameter treat-timestamp-as-datetime-enabled, bidang pemetaan yang berbeda digunakan:

true:TIMESTAMP[(p)]

false:TIMESTAMP_LTZ[(p)]

CHAR(n)

CHAR(n)

VARCHAR(n)

VARCHAR(n)

BIT(n)

BINARY(⌈(n + 7) / 8⌉)

BINARY(n)

BINARY(n)

VARBINARY(N)

VARBINARY(N)

NUMERIC(p, s) [UNSIGNED] [ZEROFILL] where 38 < p <= 65

STRING

Catatan

MySQL mendukung presisi desimal hingga 65. Flink membatasi presisi desimal hingga 38. Jika kolom desimal memiliki presisi lebih dari 38, petakan ke string untuk mencegah kehilangan presisi.

DECIMAL(p, s) [UNSIGNED] [ZEROFILL] where 38 < p <= 65

FIXED(p, s) [UNSIGNED] [ZEROFILL] where 38 < p <= 65

TINYTEXT

STRING

TEXT

MEDIUMTEXT

LONGTEXT

ENUM

JSON

STRING

Catatan

Tipe data JSON diubah menjadi string berformat JSON di Flink.

GEOMETRY

STRING

Catatan

Tipe data spasial MySQL diubah menjadi string dengan format JSON tetap. Untuk informasi selengkapnya, lihat pemetaan tipe data spasial MySQL.

POINT

LINESTRING

POLYGON

MULTIPOINT

MULTILINESTRING

MULTIPOLYGON

GEOMETRYCOLLECTION

TINYBLOB

BYTES

Catatan

Untuk tipe data BLOB MySQL, panjang maksimum yang didukung adalah 2.147.483.647 (2**31 - 1).

BLOB

MEDIUMBLOB

LONGBLOB

Contoh

  • Tabel sumber CDC

    CREATE TEMPORARY TABLE mysqlcdc_source (
       order_id INT,
       order_date TIMESTAMP(0),
       customer_name STRING,
       price DECIMAL(10, 5),
       product_id INT,
       order_status BOOLEAN,
       PRIMARY KEY(order_id) NOT ENFORCED
    ) WITH (
      'connector' = 'mysql',
      'hostname' = '<yourHostname>',
      'port' = '3306',
      'username' = '<yourUsername>',
      'password' = '<yourPassword>',
      'database-name' = '<yourDatabaseName>',
      'table-name' = '<yourTableName>'
    );
    
    CREATE TEMPORARY TABLE blackhole_sink(
      order_id INT,
      customer_name STRING
    ) WITH (
      'connector' = 'blackhole'
    );
    
    INSERT INTO blackhole_sink
    SELECT order_id, customer_name FROM mysqlcdc_source;
  • Tabel dimensi

    CREATE TEMPORARY TABLE datagen_source(
      a INT,
      b BIGINT,
      c STRING,
      `proctime` AS PROCTIME()
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE mysql_dim (
      a INT,
      b VARCHAR,
      c VARCHAR
    ) WITH (
      'connector' = 'mysql',
      'hostname' = '<yourHostname>',
      'port' = '3306',
      'username' = '<yourUsername>',
      'password' = '<yourPassword>',
      'database-name' = '<yourDatabaseName>',
      'table-name' = '<yourTableName>'
    );
    
    CREATE TEMPORARY TABLE blackhole_sink(
      a INT,
      b STRING
    ) WITH (
      'connector' = 'blackhole'
    );
    
    INSERT INTO blackhole_sink
    SELECT T.a, H.b
    FROM datagen_source AS T JOIN mysql_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H ON T.a = H.a;
  • Tabel sink

    CREATE TEMPORARY TABLE datagen_source (
      `name` VARCHAR,
      `age` INT
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE mysql_sink (
      `name` VARCHAR,
      `age` INT
    ) WITH (
      'connector' = 'mysql',
      'hostname' = '<yourHostname>',
      'port' = '3306',
      'username' = '<yourUsername>',
      'password' = '<yourPassword>',
      'database-name' = '<yourDatabaseName>',
      'table-name' = '<yourTableName>'
    );
    
    INSERT INTO mysql_sink
    SELECT * FROM datagen_source;
  • Sumber ingesti data

    source:
      type: mysql
      name: MySQL Source
      hostname: ${mysql.hostname}
      port: ${mysql.port}
      username: ${mysql.username}
      password: ${mysql.password}
      tables: ${mysql.source.table}
      server-id: 7601-7604
    
    sink:
      type: values
      name: Values Sink
      print.enabled: true
      sink.print.logger: true

Tentang tabel sumber CDC MySQL

  • Cara kerja

    Saat tabel sumber CDC MySQL dimulai, sistem memindai seluruh tabel, membaginya menjadi beberapa chunk berdasarkan primary key, dan mencatat posisi binlog saat ini. Sistem kemudian menggunakan algoritma snapshot inkremental untuk membaca data dari setiap chunk menggunakan pernyataan SELECT. Pekerjaan secara berkala melakukan checkpoint untuk mencatat chunk yang telah selesai. Jika terjadi failover, pekerjaan melanjutkan pembacaan dari chunk yang belum selesai. Setelah semua chunk terbaca, pekerjaan mulai membaca catatan perubahan inkremental dari posisi binlog yang sebelumnya dicatat. Pekerjaan Flink terus melakukan checkpoint berkala untuk mencatat posisi binlog. Jika pekerjaan gagal, sistem melanjutkan pemrosesan dari posisi binlog terakhir yang dicatat untuk mencapai semantik tepat-sekali.

    Untuk penjelasan lebih rinci tentang algoritma snapshot inkremental, lihat Konektor CDC MySQL.

  • Metadata

    Metadata berguna untuk menggabungkan database dan tabel yang terpisah (sharded). Setelah penggabungan, Anda mungkin perlu mengidentifikasi database dan tabel sumber untuk setiap baris data. Kolom metadata memungkinkan Anda mengakses informasi ini, sehingga memudahkan penggabungan beberapa tabel sharded menjadi satu tabel tujuan.

    Sumber CDC MySQL mendukung sintaks kolom metadata. Anda dapat mengakses metadata berikut melalui kolom metadata.

    Kunci metadata

    Tipe metadata

    Deskripsi

    database_name

    STRING NOT NULL

    Nama database yang berisi baris tersebut.

    table_name

    STRING NOT NULL

    Nama tabel yang berisi baris tersebut.

    op_ts

    TIMESTAMP_LTZ(3) NOT NULL

    Waktu saat baris tersebut diubah di database. Jika catatan berasal dari data historis yang sudah ada di tabel, bukan dari binlog, nilai ini selalu 0.

    op_type

    STRING NOT NULL

    Jenis perubahan baris tersebut.

    • +I: Pesan INSERT

    • -D: Pesan DELETE

    • -U: Pesan UPDATE_BEFORE

    • +U: Pesan UPDATE_AFTER

    Catatan

    Didukung hanya di VVR 8.0.7 dan versi selanjutnya.

    query_log

    STRING NOT NULL

    Catatan log kueri MySQL yang sesuai dengan baris yang dibaca.

    Catatan

    Untuk mencatat kueri, MySQL memerlukan pengaktifan parameter binlog_rows_query_log_events.

    Contoh berikut menunjukkan cara menggabungkan beberapa tabel orders dari berbagai database terpisah dalam instans MySQL dan menyinkronkannya ke tabel holo_orders di Hologres.

    CREATE TEMPORARY TABLE mysql_orders (
      db_name STRING METADATA FROM 'database_name' VIRTUAL,  -- Membaca nama database.
      table_name STRING METADATA  FROM 'table_name' VIRTUAL, -- Membaca nama tabel.
      operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL, -- Membaca timestamp perubahan.
      op_type STRING METADATA FROM 'op_type' VIRTUAL, -- Membaca jenis perubahan.
      order_id INT,
      order_date TIMESTAMP(0),
      customer_name STRING,
      price DECIMAL(10, 5),
      product_id INT,
      order_status BOOLEAN,
      PRIMARY KEY(order_id) NOT ENFORCED
    ) WITH (
      'connector' = 'mysql-cdc',
      'hostname' = 'localhost',
      'port' = '3306',
      'username' = 'flinkuser',
      'password' = 'flinkpw',
      'database-name' = 'mydb_.*', -- Mencocokkan beberapa database terpisah menggunakan ekspresi reguler.
      'table-name' = 'orders_.*'   -- Mencocokkan beberapa tabel terpisah menggunakan ekspresi reguler.
    );
    
    INSERT INTO holo_orders SELECT * FROM mysql_orders;

    Berdasarkan kode di atas, jika Anda mengatur opsi scan.read-changelog-as-append-only.enabled ke true dalam klausa WITH, output bervariasi berdasarkan pengaturan primary key tabel downstream:

    • Jika primary key tabel downstream adalah order_id, output hanya mencakup perubahan terakhir untuk setiap primary key di tabel sumber. Misalnya, jika perubahan terakhir untuk primary key adalah operasi hapus, catatan dengan primary key yang sama dan op_type -D muncul di tabel downstream.

    • Jika primary key tabel downstream adalah gabungan dari order_id, operation_ts, dan op_type, output mencakup riwayat perubahan lengkap untuk setiap primary key di tabel sumber.

  • Dukungan ekspresi reguler

    Tabel sumber CDC MySQL mendukung penggunaan ekspresi reguler dalam nama tabel atau nama database untuk mencocokkan beberapa tabel atau database. Contoh berikut menunjukkan cara menentukan beberapa tabel menggunakan ekspresi reguler.

    CREATE TABLE products (
      db_name STRING METADATA FROM 'database_name' VIRTUAL,
      table_name STRING METADATA  FROM 'table_name' VIRTUAL,
      operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
      order_id INT,
      order_date TIMESTAMP(0),
      customer_name STRING,
      price DECIMAL(10, 5),
      product_id INT,
      order_status BOOLEAN,
      PRIMARY KEY(order_id) NOT ENFORCED
    ) WITH (
      'connector' = 'mysql-cdc',
      'hostname' = 'localhost',
      'port' = '3306',
      'username' = 'root',
      'password' = '123456',
      'database-name' = '(^(test).*|^(tpc).*|txc|.*[p$]|t{2})', -- Mencocokkan beberapa database menggunakan ekspresi reguler.
      'table-name' = '(t[5-8]|tt)' -- Mencocokkan beberapa tabel menggunakan ekspresi reguler.
    );

    Penjelasan ekspresi reguler dalam contoh di atas:

    • ^(test).* adalah contoh pencocokan awalan. Ekspresi ini dapat mencocokkan nama database yang dimulai dengan test, seperti test1 atau test2.

    • .*[p$] adalah contoh pencocokan akhiran. Ekspresi ini dapat mencocokkan nama database yang diakhiri dengan p, seperti cdcp atau edcp.

    • txc adalah pencocokan spesifik. Ekspresi ini dapat mencocokkan nama database tertentu, seperti txc.

    Saat mencocokkan nama tabel lengkap, CDC MySQL menggunakan nama database dan nama tabel untuk mengidentifikasi tabel secara unik. Sistem menggunakan pola database-name.table-name untuk pencocokan. Misalnya, pola (^(test).*|^(tpc).*|txc|.*[p$]|t{2}).(t[ 5-8]|tt) dapat mencocokkan tabel seperti txc.tt dan test2.test5 dalam database.

    Penting

    Dalam konfigurasi pekerjaan SQL, opsi table-name dan database-name tidak mendukung penggunaan koma (,) untuk menentukan beberapa tabel atau database.

    • Untuk mencocokkan beberapa tabel atau menggunakan beberapa ekspresi reguler, hubungkan dengan garis vertikal (|) dan masukkan dalam tanda kurung. Misalnya, untuk membaca tabel user dan product, Anda dapat mengonfigurasi table-name sebagai (user|product).

    • Jika ekspresi reguler berisi koma, Anda harus menulis ulang menggunakan operator garis vertikal (|). Misalnya, ekspresi reguler mytable_\d{1, 2} perlu ditulis ulang menjadi setara (mytable_\d{1}|mytable_\d{2}) untuk menghindari penggunaan koma.

  • Kontrol konkurensi

    Konektor MySQL mendukung pembacaan data lengkap secara konkuren, yang meningkatkan efisiensi pemuatan data. Saat dikombinasikan dengan Autopilot di konsol Realtime Compute for Apache Flink, sistem dapat secara otomatis mengurangi sumber daya komputasi selama fase inkremental setelah pembacaan konkuren selesai, sehingga menghemat sumber daya komputasi.

    Di Konsol pengembangan Realtime Compute for Apache Flink, Anda dapat mengatur paralelisme pekerjaan di halaman Konfigurasi Sumber Daya baik dalam mode Dasar maupun mode Ahli. Perbedaannya sebagai berikut:

    • Paralelisme yang diatur dalam mode Dasar adalah paralelisme global untuk seluruh pekerjaan.基础模式

    • Mode Ahli memungkinkan Anda mengatur paralelisme untuk VERTEX tertentu sesuai kebutuhan.vertex并发

    Untuk informasi selengkapnya tentang konfigurasi sumber daya, lihat Konfigurasi penerapan pekerjaan.

    Penting

    Terlepas dari apakah Anda menggunakan mode Dasar atau mode Ahli, rentang server-id yang dideklarasikan dalam tabel harus lebih besar dari atau sama dengan paralelisme pekerjaan. Misalnya, jika rentang server-id adalah 5404-5412, rentang tersebut berisi 9 ID server unik, memungkinkan paralelisme pekerjaan maksimum 9. Pekerjaan berbeda untuk instans MySQL yang sama harus memiliki rentang server-id yang tidak tumpang tindih, artinya setiap pekerjaan harus memiliki server-id yang dikonfigurasi secara eksplisit dan unik.

  • Autopilot Autoscaling

    Fase data lengkap mengakumulasi banyak data historis. Untuk meningkatkan efisiensi pembacaan, data historis biasanya dibaca secara konkuren. Namun, dalam fase binlog inkremental, karena volume data binlog kecil dan urutan global harus dipertahankan, paralelisme tunggal biasanya cukup. Autopilot dapat secara otomatis menyeimbangkan performa dan sumber daya untuk memenuhi persyaratan berbeda dari fase lengkap dan inkremental ini.

    Autopilot memantau trafik setiap tugas di Sumber CDC MySQL. Saat pekerjaan memasuki fase binlog, jika hanya satu tugas yang bertanggung jawab membaca binlog dan tugas lainnya idle, Autopilot akan secara otomatis mengurangi jumlah CU dan paralelisme Sumber. Untuk mengaktifkan Autopilot, atur mode Autopilot ke Aktif di halaman O&M pekerjaan.

    Catatan

    Interval pemicu minimum untuk mengurangi paralelisme adalah 24 jam secara default. Untuk informasi selengkapnya tentang opsi dan detail Autopilot, lihat Konfigurasi Autopilot.

  • Mode mulai

    Anda dapat menggunakan opsi scan.startup.mode untuk menentukan mode startup untuk tabel sumber CDC MySQL. Opsi tersebut meliputi:

    • initial (default): Melakukan pembacaan lengkap tabel database saat startup pertama kali, lalu beralih ke mode inkremental untuk membaca binlog.

    • earliest-offset: Melewati fase snapshot dan mulai membaca dari posisi binlog paling awal yang tersedia.

    • latest-offset: Melewati fase snapshot dan mulai membaca dari akhir binlog. Dalam mode ini, tabel sumber hanya dapat membaca perubahan data yang terjadi setelah pekerjaan dimulai.

    • specific-offset: Melewati fase snapshot dan mulai membaca dari posisi binlog tertentu. Posisi dapat ditentukan oleh nama file binlog dan posisi, atau menggunakan set GTID.

    • timestamp: Melewati fase snapshot dan mulai membaca event binlog dari timestamp tertentu.

    Contoh:

    CREATE TABLE mysql_source (...) WITH (
        'connector' = 'mysql-cdc',
        'scan.startup.mode' = 'earliest-offset', -- Mulai dari offset paling awal.
        'scan.startup.mode' = 'latest-offset', -- Mulai dari offset terbaru.
        'scan.startup.mode' = 'specific-offset', -- Mulai dari offset tertentu.
        'scan.startup.mode' = 'timestamp', -- Mulai dari timestamp tertentu.
        'scan.startup.specific-offset.file' = 'mysql-bin.000003', -- Tentukan nama file binlog untuk mode specific-offset.
        'scan.startup.specific-offset.pos' = '4', -- Tentukan posisi binlog untuk mode specific-offset.
        'scan.startup.specific-offset.gtid-set' = '24DA167-0C0C-11E8-8442-00059A3C7B00:1-19', -- Tentukan set GTID untuk mode specific-offset.
        'scan.startup.timestamp-millis' = '1667232000000' -- Tentukan timestamp startup untuk mode timestamp.
        ...
    )
    Penting
    • Sumber MySQL mencatat posisi saat ini pada level INFO selama checkpoint. Awalan log adalah Binlog offset on checkpoint {checkpoint-id}. Log ini dapat membantu Anda memulai ulang pekerjaan dari posisi checkpoint tertentu.

    • Jika skema tabel yang dibaca telah berubah di masa lalu, memulai dari earliest-offset, specific-offset, atau timestamp dapat menyebabkan error. Hal ini karena pembaca Debezium secara internal menyimpan skema terbaru, dan data lama dengan skema yang tidak cocok tidak dapat diurai dengan benar.

  • Tentang tabel sumber CDC tanpa kunci

    • Untuk menggunakan tabel tanpa kunci, Anda harus mengatur scan.incremental.snapshot.chunk.key-column dan menentukan kolom non-null.

    • Semantik pemrosesan tabel sumber CDC tanpa kunci ditentukan oleh perilaku kolom yang ditentukan dalam scan.incremental.snapshot.chunk.key-column:

      • Jika kolom yang ditentukan tidak diperbarui, semantik tepat-sekali dapat dijamin.

      • Jika kolom yang ditentukan diperbarui, hanya semantik at-least-once yang dapat dijamin. Namun, Anda dapat memastikan kebenaran data dengan menggabungkannya dengan sistem downstream, menentukan primary key downstream, dan menggunakan operasi idempoten.

  • Baca log cadangan dari ApsaraDB RDS for MySQL

    Tabel sumber CDC MySQL mendukung pembacaan log cadangan dari ApsaraDB RDS for MySQL. Ini berguna ketika fase snapshot lengkap memakan waktu lama, yang dapat menyebabkan file binlog lokal dibersihkan sebelum sempat dibaca. Jika file cadangan tersedia, konektor dapat membacanya sebagai gantinya.

    Contoh:

    CREATE TABLE mysql_source (...) WITH (
        'connector' = 'mysql-cdc',
        'rds.region-id' = 'cn-beijing',
        'rds.access-key-id' = 'xxxxxxxxx', 
        'rds.access-key-secret' = 'xxxxxxxxx', 
        'rds.db-instance-id' = 'rm-xxxxxxxxxxxxxxxxx', 
        'rds.main-db-id' = '12345678',
        'rds.download.timeout' = '60s'
        ...
    )
  • Aktifkan penggunaan ulang Sumber CDC

    Dalam pekerjaan yang sama, beberapa tabel sumber CDC MySQL memulai beberapa klien binlog. Jika semua tabel sumber berada dalam instans yang sama, hal ini meningkatkan tekanan pada database. Untuk informasi selengkapnya, lihat FAQ CDC MySQL.

    Solusi

    VVR 8.0.7 dan versi selanjutnya mendukung penggunaan ulang Sumber CDC MySQL. Fitur ini menggabungkan tabel sumber CDC MySQL yang memenuhi syarat. Tabel sumber memenuhi syarat untuk digabungkan jika item konfigurasinya identik, kecuali untuk nama database, nama tabel, dan server-id. Mesin secara otomatis menggabungkan sumber CDC MySQL dalam pekerjaan yang sama.

    Prosedur

    1. Gunakan perintah SET dalam pekerjaan SQL:

      SET 'table.optimizer.source-merge.enabled' = 'true';
      
      # (Untuk VVR 8.0.8 dan 8.0.9) Atur juga item ini:
      SET 'sql-gateway.exec-plan.enabled' = 'false';
      Penggunaan ulang diaktifkan secara default di VVR 11.1 dan versi selanjutnya.
    2. Jalankan pekerjaan tanpa state. Memodifikasi item konfigurasi penggunaan ulang Sumber mengubah topologi pekerjaan. Anda harus menjalankan pekerjaan tanpa state. Jika tidak, pekerjaan mungkin gagal dijalankan atau kehilangan data. Jika Sumber digabung, node MergetableSourceScan muncul.

    Penting
    • Setelah Anda mengaktifkan penggunaan ulang, kami tidak merekomendasikan menonaktifkan operator chaining. Jika Anda mengatur pipeline.operator-chaining ke false, hal ini meningkatkan overhead serialisasi dan deserialisasi data. Semakin banyak Sumber yang digabung, semakin besar overhead-nya.

    • Di VVR 8.0.7, menonaktifkan operator chaining menyebabkan masalah serialisasi.

Percepat pembacaan binlog

Saat Anda menggunakan konektor MySQL sebagai tabel sumber atau sumber ingesti data, sistem mengurai file binlog untuk menghasilkan berbagai pesan perubahan selama fase inkremental. File binlog mencatat semua perubahan tabel dalam format biner. Anda dapat mempercepat penguraian file binlog dengan cara berikut.

  • Aktifkan filter penguraian

    • Gunakan opsi scan.only.deserialize.captured.tables.changelog.enabled untuk mengurai event perubahan hanya untuk tabel yang ditentukan.

  • Optimalkan opsi Debezium

    debezium.max.queue.size: 162580
    debezium.max.batch.size: 40960
    debezium.poll.interval.ms: 50
    • debezium.max.queue.size: Jumlah maksimum catatan yang dapat ditampung oleh antrian pemblokiran. Saat Debezium membaca aliran event dari database, sistem menempatkan event dalam antrian pemblokiran sebelum menuliskannya ke downstream. Nilai default adalah 8192.

    • debezium.max.batch.size: Jumlah maksimum event yang diproses konektor dalam setiap iterasi. Nilai default adalah 2048.

    • debezium.poll.interval.ms: Jumlah milidetik yang harus ditunggu konektor sebelum meminta event perubahan baru. Nilai default adalah 1000 milidetik, atau 1 detik.

Contoh:

CREATE TABLE mysql_source (...) WITH (
    'connector' = 'mysql-cdc',
    -- Konfigurasi Debezium
    'debezium.max.queue.size' = '162580',
    'debezium.max.batch.size' = '40960',
    'debezium.poll.interval.ms' = '50',
    -- Aktifkan filter penguraian
    'scan.only.deserialize.captured.tables.changelog.enabled' = 'true',  -- Mengurai event perubahan hanya untuk tabel yang ditentukan.
    ...
)
source:
  type: mysql
  name: MySQL Source
  hostname: ${mysql.hostname}
  port: ${mysql.port}
  username: ${mysql.username}
  password: ${mysql.password}
  tables: ${mysql.source.table}
  server-id: 7601-7604
  # Konfigurasi Debezium
  debezium.max.queue.size: 162580
  debezium.max.batch.size: 40960
  debezium.poll.interval.ms: 50
  # Aktifkan filter penguraian
  scan.only.deserialize.captured.tables.changelog.enabled: true

Edisi Perusahaan CDC MySQL memiliki kapasitas konsumsi binlog 85 MB/detik, sekitar dua kali lipat dari versi komunitas open source. Jika kecepatan pembuatan binlog melebihi 85 MB/detik (setara dengan satu file 512 MB setiap 6 detik), latensi pekerjaan Flink akan terus meningkat. Latensi pemrosesan akan berangsur-angsur berkurang setelah kecepatan pembuatan binlog melambat. Saat file binlog berisi transaksi besar, latensi pemrosesan mungkin sementara meningkat lalu berkurang setelah log transaksi dibaca.

API DataStream CDC MySQL

Penting

Untuk membaca dan menulis data menggunakan API DataStream, Anda harus menggunakan konektor DataStream yang sesuai untuk terhubung ke Flink. Untuk informasi tentang cara mengatur konektor DataStream, lihat Integrasikan dan gunakan konektor dalam program DataStream.

Contoh berikut menunjukkan cara membuat program API DataStream dan menggunakan MySqlSource, termasuk dependensi pom yang diperlukan.

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
public class MySqlSourceExample {
  public static void main(String[] args) throws Exception {
    MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
        .hostname("yourHostname")
        .port(yourPort)
        .databaseList("yourDatabaseName") // Atur database yang akan ditangkap.
        .tableList("yourDatabaseName.yourTableName") // Atur tabel yang akan ditangkap.
        .username("yourUsername")
        .password("yourPassword")
        .deserializer(new JsonDebeziumDeserializationSchema()) // Mengonversi SourceRecord ke string JSON.
        .build();
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    // Aktifkan checkpointing.
    env.enableCheckpointing(3000);
    env
      .fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
      // Atur 4 tugas sumber paralel.
      .setParallelism(4)
      .print().setParallelism(1); // Gunakan paralelisme 1 untuk sink guna mempertahankan urutan pesan.
    env.execute("Print MySQL Snapshot + Binlog");
  }
}
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-core</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-base</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-common</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-java-bridge</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>ververica-connector-mysql</artifactId>
    <version>${vvr.version}</version>
</dependency>

Saat membangun MySqlSource, Anda harus menentukan parameter berikut dalam kode Anda:

Parameter

Deskripsi

hostname

Alamat IP atau hostname database MySQL.

port

Nomor port layanan database MySQL.

databaseList

Nama database MySQL.

Catatan

Nama database mendukung ekspresi reguler untuk membaca data dari beberapa database. Gunakan .* untuk mencocokkan semua database.

username

Username untuk layanan database MySQL.

password

Password untuk layanan database MySQL.

deserializer

Deserializer yang mendeserialisasi catatan tipe SourceRecord ke tipe yang ditentukan. Parameter dapat diatur ke salah satu nilai berikut:

  • RowDataDebeziumDeserializeSchema: Mengonversi SourceRecord ke RowData, yang merupakan struktur data internal untuk Tabel atau SQL Flink.

  • JsonDebeziumDeserializationSchema: Mengonversi SourceRecord ke string dalam format JSON.

Anda harus menentukan parameter berikut dalam dependensi pom Anda:

${vvr.version}

Versi mesin Realtime Compute for Apache Flink. Misalnya, 1.17-vvr-8.0.4-3.

Catatan

Gunakan nomor versi yang ditampilkan di Maven. Kami secara berkala merilis versi hotfix, dan pembaruan ini mungkin tidak diumumkan melalui saluran lain.

${flink.version}

Versi Apache Flink. Misalnya: 1.17.2.

Penting

Pastikan versi Apache Flink Anda sesuai dengan versi mesin Realtime Compute for Apache Flink untuk menghindari masalah ketidakcocokan selama runtime pekerjaan. Untuk informasi selengkapnya tentang pemetaan versi, lihat Mesin.

FAQ

Untuk informasi tentang masalah yang mungkin Anda temui saat menggunakan tabel sumber CDC, lihat Masalah CDC.