全部产品
Search
文档中心

Realtime Compute for Apache Flink:MySQL

更新时间:Dec 16, 2025

Tema ini menjelaskan cara menggunakan konektor MySQL.

Informasi latar belakang

Konektor MySQL mendukung semua database yang kompatibel dengan protokol MySQL, seperti ApsaraDB RDS for MySQL, PolarDB for MySQL, OceanBase (mode MySQL), dan self-managed MySQL.

Penting

Saat menggunakan konektor MySQL untuk membaca data dari OceanBase, pastikan binary logging (Binlog) diaktifkan dan dikonfigurasi dengan benar. Untuk informasi selengkapnya, lihat Operasi terkait Binlog. Fitur ini berada dalam pratinjau publik. Kami menyarankan agar Anda mengevaluasinya secara menyeluruh sebelum menggunakannya.

Konektor MySQL mendukung fitur-fitur berikut.

Kategori

Rincian

Tipe yang didukung

Tabel sumber, tabel dimensi, tabel sink, dan sumber data Ingesti Data

Mode runtime

Hanya mode streaming yang didukung.

Format data

Tidak berlaku

Metrik pemantauan spesifik

Metrik pemantauan

  • Tabel sumber

    • currentFetchEventTimeLag: Interval antara saat data dihasilkan dan saat data ditarik ke Operator Sumber.

      Metrik ini hanya berlaku pada fase log biner (Binlog). Pada fase snapshot, nilai ini selalu 0.

    • currentEmitEventTimeLag: Interval antara saat data dihasilkan dan saat data meninggalkan Operator Sumber.

      Metrik ini hanya berlaku pada fase Binlog. Pada fase snapshot, nilai ini selalu 0.

    • sourceIdleTime: Durasi tabel sumber tidak aktif.

  • Tabel dimensi dan sink: Tidak ada.

Catatan

Untuk informasi selengkapnya tentang metrik, lihat Metrik pemantauan.

Tipe API

DataStream, SQL, dan YAML ingesti data

Mendukung pembaruan atau penghapusan data di tabel sink

Ya

Fitur

Tabel sumber change data capture (CDC) MySQL adalah tabel sumber streaming yang pertama-tama membaca seluruh data historis dari database, lalu secara mulus beralih ke pembacaan log biner (Binlog) untuk memastikan tidak ada data yang terlewat atau duplikat. Semantik tepat-sekali dijamin bahkan jika terjadi kegagalan. Tabel sumber CDC MySQL mendukung pembacaan konkuren data penuh dan menggunakan algoritma snapshot inkremental untuk menerapkan pembacaan tanpa lock serta transfer data yang dapat dilanjutkan. Untuk informasi selengkapnya, lihat Tentang tabel sumber CDC MySQL.

  • Pemrosesan batch dan stream terpadu: Konektor mendukung pembacaan data penuh dan inkremental, sehingga menghilangkan kebutuhan akan pipeline terpisah.

  • Mendukung pembacaan konkuren data penuh untuk penskalaan kinerja horizontal.

  • Beralih mulus dari pembacaan data penuh ke pembacaan data inkremental dan secara otomatis melakukan skala-masuk untuk menghemat sumber daya komputasi.

  • Mendukung transfer data yang dapat dilanjutkan selama fase pembacaan data penuh guna meningkatkan stabilitas.

  • Pembacaan data penuh tanpa lock tidak memengaruhi operasi bisnis online.

  • Mendukung pembacaan log backup dari ApsaraDB RDS for MySQL.

  • Mengurai file log biner secara paralel untuk mengurangi latensi data.

Prasyarat

Sebelum menggunakan tabel sumber CDC MySQL, Anda harus mengonfigurasi MySQL sesuai petunjuk dalam Konfigurasi MySQL untuk memenuhi prasyarat.

RDS for MySQL

  • Anda dapat melakukan probe jaringan dengan Realtime Compute for Apache Flink untuk memastikan konektivitas jaringan.

  • Versi MySQL yang didukung: 5.6, 5.7, dan 8.0.x.

  • Binary logging (Binlog) harus diaktifkan. Secara default sudah diaktifkan.

  • Format log biner harus ROW. Ini adalah format default.

  • Atur binlog_row_image ke FULL. Ini adalah pengaturan default.

  • Nonaktifkan Binary Log Transaction Compression. Fitur ini diperkenalkan di MySQL 8.0.20 dan dinonaktifkan secara default.

  • Akun pengguna MySQL harus dibuat dan diberikan izin SELECT, SHOW DATABASES, REPLICATION SLAVE, dan REPLICATION CLIENT.

  • Database dan tabel MySQL harus dibuat. Untuk informasi selengkapnya, lihat Buat database dan akun untuk instans ApsaraDB RDS for MySQL. Kami menyarankan agar Anda menggunakan akun istimewa untuk membuat database MySQL guna mencegah kegagalan operasi akibat izin yang tidak mencukupi.

  • Daftar putih IP harus dikonfigurasi. Untuk informasi selengkapnya, lihat Konfigurasi daftar putih untuk instans ApsaraDB RDS for MySQL.

PolarDB untuk MySQL

  • Anda dapat melakukan probe jaringan dengan Realtime Compute for Apache Flink untuk memastikan konektivitas jaringan.

  • Versi MySQL yang didukung: 5.6, 5.7, dan 8.0.x.

  • Binary logging (Binlog) harus diaktifkan. Secara default dinonaktifkan.

  • Format log biner harus ROW. Ini adalah format default.

  • Atur binlog_row_image ke FULL. Ini adalah pengaturan default.

  • Nonaktifkan Binary Log Transaction Compression. Fitur ini diperkenalkan di MySQL 8.0.20 dan dinonaktifkan secara default.

  • Akun pengguna MySQL harus dibuat dan diberikan izin SELECT, SHOW DATABASES, REPLICATION SLAVE, dan REPLICATION CLIENT.

  • Database dan tabel MySQL harus dibuat. Untuk informasi selengkapnya, lihat Buat database dan akun untuk kluster PolarDB for MySQL. Kami menyarankan agar Anda menggunakan akun istimewa untuk membuat database MySQL guna mencegah kegagalan operasi akibat izin yang tidak mencukupi.

  • Daftar putih IP harus dikonfigurasi. Untuk informasi selengkapnya, lihat Konfigurasi daftar putih untuk kluster PolarDB for MySQL.

MySQL yang dikelola sendiri

  • Anda dapat melakukan probe jaringan dengan Realtime Compute for Apache Flink untuk memastikan konektivitas jaringan.

  • Versi MySQL yang didukung: 5.6, 5.7, dan 8.0.x.

  • Binary logging (Binlog) harus diaktifkan. Secara default dinonaktifkan.

  • Format log biner harus ROW. Format default adalah STATEMENT.

  • Atur binlog_row_image ke FULL. Ini adalah pengaturan default.

  • Nonaktifkan Binary Log Transaction Compression. Fitur ini diperkenalkan di MySQL 8.0.20 dan dinonaktifkan secara default.

  • Akun pengguna MySQL harus dibuat dan diberikan izin SELECT, SHOW DATABASES, REPLICATION SLAVE, dan REPLICATION CLIENT.

  • Database dan tabel MySQL harus dibuat. Untuk informasi selengkapnya, lihat Buat database dan akun untuk instans self-managed MySQL. Kami menyarankan agar Anda menggunakan akun istimewa untuk membuat database MySQL guna mencegah kegagalan operasi akibat izin yang tidak mencukupi.

  • Daftar putih IP harus dikonfigurasi. Untuk informasi selengkapnya, lihat Konfigurasi daftar putih untuk instans self-managed MySQL.

Batasan

Batasan umum

  • Tabel sumber CDC MySQL tidak mendukung definisi watermark.

  • Pada pekerjaan Create Table As Select (CTAS) dan Create Database As Select (CDAS), tabel sumber CDC MySQL dapat menyinkronkan beberapa perubahan skema. Untuk informasi selengkapnya tentang tipe perubahan yang didukung, lihat Kebijakan sinkronisasi evolusi skema.

  • Konektor CDC MySQL tidak mendukung fitur Binary Log Transaction Compression. Oleh karena itu, saat menggunakan konektor CDC MySQL untuk mengonsumsi data inkremental, pastikan fitur ini dinonaktifkan. Jika tidak, Anda mungkin gagal mengambil data inkremental.

RDS for MySQL batasan

  • Untuk RDS for MySQL, kami tidak menyarankan membaca data dari database secondary atau replica read-only. Hal ini karena periode retensi default untuk log biner pada instans tersebut sangat singkat. Jika log biner kedaluwarsa dan dibersihkan, pekerjaan gagal mengonsumsi data log biner dan melaporkan error.

  • RDS for MySQL secara default mengaktifkan replikasi paralel primary/secondary tetapi tidak menjamin urutan transaksi yang konsisten antara database primary dan secondary. Hal ini dapat menyebabkan kehilangan data selama pemulihan data dari checkpoint setelah alih bencana primary/secondary. Untuk menghindari masalah ini, Anda dapat secara manual mengaktifkan opsi slave_preserve_commit_order untuk RDS for MySQL.

PolarDB untuk MySQL batasan

Tabel sumber CDC MySQL tidak mendukung pembacaan data dari arsitektur Kluster Multi-master (untuk informasi selengkapnya, lihat Apa itu Kluster Multi-master?) pada PolarDB for MySQL versi 1.0.19 dan sebelumnya. Log biner yang dihasilkan oleh versi kluster ini mungkin berisi ID tabel duplikat, yang dapat menyebabkan kesalahan pemetaan skema pada tabel sumber CDC dan mengakibatkan error saat mengurai data log biner.

Open source MySQL batasan

Dengan konfigurasi default, MySQL mempertahankan urutan transaksi selama replikasi log biner primary-secondary. Jika replica MySQL memiliki replikasi paralel diaktifkan (slave_parallel_workers > 1) tetapi tidak memiliki slave_preserve_commit_order=ON, urutan commit transaksinya mungkin tidak konsisten dengan database primary. Saat Flink CDC pulih dari checkpoint, data mungkin terlewat karena urutan yang salah. Kami menyarankan mengatur slave_preserve_commit_order = ON pada replica MySQL. Atau, Anda dapat mengatur slave_parallel_workers = 1, yang akan mengorbankan kinerja replikasi.

Catatan

  • Setiap sumber data CDC MySQL harus dikonfigurasi secara eksplisit dengan server ID yang berbeda.

    Tujuan server ID

    Jika beberapa sumber data CDC MySQL berbagi server ID yang sama dan tidak dapat digunakan ulang, hal ini dapat menyebabkan offset log biner menjadi tidak teratur, sehingga data dapat dibaca berulang kali atau terlewat.

    Konfigurasi server ID dalam berbagai skenario

    Anda dapat menentukan server ID dalam DDL. Namun, kami menyarankan agar Anda mengonfigurasinya menggunakan dynamic hints daripada parameter DDL.

    • Tingkat paralelisme = 1 atau snapshot inkremental dinonaktifkan

      ## Saat framework snapshot inkremental dinonaktifkan atau tingkat paralelisme adalah 1, Anda dapat menentukan server ID tertentu.
      SELECT * FROM source_table /*+ OPTIONS('server-id'='123456') */ ;
    • Tingkat paralelisme > 1 dan snapshot inkremental diaktifkan

      ## Anda harus menentukan rentang server ID. Jumlah server ID yang tersedia dalam rentang tersebut harus lebih besar dari atau sama dengan tingkat paralelisme. Misalnya, tingkat paralelisme adalah 3.
      SELECT * FROM source_table /*+ OPTIONS('server-id'='123456-123458') */ ;
    • Sinkronisasi data menggunakan CTAS

      Saat Anda menggunakan CTAS untuk sinkronisasi data, jika sumber data CDC memiliki konfigurasi yang sama, sumber data tersebut akan secara otomatis digunakan ulang. Dalam kasus ini, Anda dapat mengonfigurasi server ID yang sama untuk beberapa sumber data CDC. Untuk informasi selengkapnya, lihat Contoh 4: Beberapa pernyataan CTAS.

    • Beberapa tabel sumber non-CTAS yang tidak dapat digunakan kembali

      Jika pekerjaan berisi beberapa tabel sumber CDC MySQL dan tidak menggunakan pernyataan CTAS untuk sinkronisasi, sumber data tidak dapat digunakan ulang. Anda harus memberikan server ID yang berbeda untuk setiap tabel sumber CDC. Demikian pula, jika framework snapshot inkremental diaktifkan dan tingkat paralelisme lebih besar dari 1, Anda harus menentukan rentang server ID.

      select * from 
        source_table1 /*+ OPTIONS('server-id'='123456-123457') */
      left join 
        source_table2 /*+ OPTIONS('server-id'='123458-123459') */
      on source_table1.id=source_table2.id;
  • Tabel sink

    • Jangan deklarasikan primary key auto-increment dalam DDL. MySQL akan mengisinya secara otomatis saat menulis data.

    • Anda harus mendeklarasikan setidaknya satu bidang non-primary key. Jika tidak, error akan dilaporkan.

    • Dalam DDL, NOT ENFORCED berarti Flink tidak menegakkan kendala primary key. Anda harus memastikan kebenaran dan integritas primary key. Untuk informasi selengkapnya, lihat Validity Check.

  • Tabel dimensi

    Jika Anda ingin menggunakan indeks untuk mempercepat kueri, urutan bidang dalam kondisi JOIN harus sesuai dengan urutan yang ditentukan dalam indeks (aturan prefiks paling kiri). Misalnya, jika indeks adalah (a, b, c), kondisi JOIN-nya adalah ON t.a = x AND t.b = y.

    SQL yang dihasilkan oleh Flink mungkin ditulis ulang oleh pengoptimal, yang mencegah penggunaan indeks selama kueri database aktual. Untuk memastikan apakah indeks digunakan, Anda dapat memeriksa rencana eksekusi (EXPLAIN) atau Redis Slow Log di MySQL untuk melihat pernyataan SELECT aktual yang dieksekusi.

SQL

Anda dapat menggunakan konektor MySQL dalam pekerjaan SQL sebagai tabel sumber, tabel dimensi, atau tabel sink.

Sintaksis

CREATE TEMPORARY TABLE mysqlcdc_source (
   order_id INT,
   order_date TIMESTAMP(0),
   customer_name STRING,
   price DECIMAL(10, 5),
   product_id INT,
   order_status BOOLEAN,
   PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
  'connector' = 'mysql',
  'hostname' = '<yourHostname>',
  'port' = '3306',
  'username' = '<yourUsername>',
  'password' = '<yourPassword>',
  'database-name' = '<yourDatabaseName>',
  'table-name' = '<yourTableName>'
);

Catatan
  • Cara konektor menulis ke tabel sink: Saat menulis ke tabel sink, konektor membuat dan mengeksekusi pernyataan SQL untuk setiap catatan data yang diterima. Pernyataan SQL spesifik yang dieksekusi adalah sebagai berikut:

    • Untuk tabel sink tanpa primary key, pernyataan INSERT INTO table_name (column1, column2, ...) VALUES (value1, value2, ...); dieksekusi.

    • Untuk tabel sink dengan primary key, pernyataan INSERT INTO table_name (column1, column2, ...) VALUES (value1, value2, ...) ON DUPLICATE KEY UPDATE column1 = VALUES(column1), column2 = VALUES(column2), ...; dieksekusi. Catatan: Jika tabel fisik memiliki kendala indeks unik selain primary key, memasukkan dua catatan dengan primary key berbeda tetapi nilai indeks unik yang sama menyebabkan konflik indeks unik. Konflik ini dapat menyebabkan data ditimpa dan hilang.

  • Jika primary key auto-increment didefinisikan dalam database MySQL, jangan deklarasikan kolom auto-increment dalam DDL Flink. Database akan mengisi kolom ini secara otomatis selama penulisan data. Konektor hanya mendukung penulisan dan penghapusan data dengan kolom auto-increment, bukan pembaruan.

Parameter WITH

  • Umum

    Parameter

    Deskripsi

    Required

    Tipe data

    Nilai default

    Catatan

    connector

    Tipe tabel.

    Ya

    STRING

    Tidak ada

    Saat digunakan sebagai tabel sumber, Anda dapat mengatur ini ke mysql-cdc atau mysql. Kedua nilai ini setara. Saat digunakan sebagai tabel dimensi atau sink, nilainya harus mysql.

    hostname

    Alamat IP atau nama host database MySQL.

    Ya

    STRING

    Tidak ada

    Kami menyarankan agar Anda memasukkan alamat virtual private cloud (VPC).

    Catatan

    Jika database MySQL dan Realtime Compute for Apache Flink tidak berada dalam VPC yang sama, Anda harus membuat koneksi jaringan cross-VPC atau menggunakan titik akhir publik untuk akses. Untuk informasi selengkapnya, lihat Manajemen dan operasi penyimpanan dan Bagaimana kluster Flink yang sepenuhnya dikelola mengakses Internet?.

    username

    Nama pengguna layanan database MySQL.

    Ya

    STRING

    Tidak ada

    Tidak ada.

    password

    Kata sandi layanan database MySQL.

    Ya

    STRING

    Tidak ada

    Tidak ada.

    database-name

    Nama database MySQL.

    Ya

    STRING

    Tidak ada

    • Saat digunakan sebagai tabel sumber, nama database mendukung ekspresi reguler untuk membaca data dari beberapa database.

    • Saat menggunakan ekspresi reguler, hindari penggunaan simbol ^ dan $ untuk mencocokkan awal dan akhir string. Untuk alasan spesifiknya, lihat keterangan untuk table-name.

    table-name

    Nama tabel MySQL.

    Ya

    STRING

    Tidak ada

    • Saat digunakan sebagai tabel sumber, nama tabel mendukung ekspresi reguler untuk membaca data dari beberapa tabel.

      Saat membaca dari beberapa tabel MySQL, kirimkan beberapa pernyataan CTAS sebagai satu pekerjaan untuk menghindari pengaktifan beberapa listener Binlog, yang meningkatkan kinerja dan efisiensi. Untuk informasi selengkapnya, lihat Beberapa pernyataan CTAS: Kirim sebagai satu pekerjaan.

    • Saat menggunakan ekspresi reguler, hindari penggunaan simbol ^ dan $ untuk mencocokkan awal dan akhir string. Alasannya dijelaskan di bawah ini.

    Catatan

    Saat mencocokkan nama tabel dengan ekspresi reguler, tabel sumber CDC MySQL menggabungkan database-name dan table-name yang Anda berikan dengan string \\. (atau karakter . sebelum Ververica Runtime (VVR) 8.0.1) untuk membentuk ekspresi reguler yang memenuhi syarat sepenuhnya. Ekspresi ini kemudian digunakan untuk mencocokkan nama tabel yang memenuhi syarat sepenuhnya dalam database MySQL.

    Misalnya, jika Anda mengonfigurasi 'database-name'='db_.*' dan 'table-name'='tb_.+', konektor akan menggunakan ekspresi reguler db_.*\\.tb_.+ (atau db_.*.tb_.+ sebelum versi 8.0.1) untuk mencocokkan nama tabel yang memenuhi syarat sepenuhnya guna menentukan tabel mana yang akan dibaca.

    port

    Nomor port layanan database MySQL.

    Tidak

    INTEGER

    3306

    Tidak ada.

  • Khusus tabel sumber

    Parameter

    Deskripsi

    Diperlukan

    Tipe data

    Nilai default

    Catatan

    server-id

    ID numerik untuk klien database.

    Tidak

    STRING

    Nilai acak antara 5400 dan 6400 dihasilkan.

    ID ini harus unik secara global dalam kluster MySQL. Kami menyarankan agar Anda menetapkan ID yang berbeda untuk setiap pekerjaan yang terhubung ke database yang sama.

    Parameter ini juga mendukung rentang ID, seperti 5400-5408. Saat pembacaan inkremental diaktifkan, beberapa pembaca konkuren didukung. Dalam kasus ini, kami menyarankan agar Anda menetapkan rentang ID sehingga setiap pembaca konkuren menggunakan ID yang berbeda. Untuk informasi selengkapnya, lihat Penggunaan server ID.

    scan.incremental.snapshot.enabled

    Menentukan apakah akan mengaktifkan snapshot inkremental.

    Tidak

    BOOLEAN

    true

    Snapshot inkremental diaktifkan secara default. Ini adalah mekanisme baru untuk membaca snapshot data penuh. Dibandingkan dengan metode pembacaan snapshot lama, snapshot inkremental menawarkan beberapa keunggulan:

    • Sumber dapat membaca data penuh secara paralel.

    • Sumber mendukung checkpoint tingkat chunk saat membaca data penuh.

    • Sumber tidak perlu memperoleh kunci baca global (FLUSH TABLES WITH read lock) saat membaca data penuh.

    Jika Anda ingin sumber mendukung pembacaan konkuren, setiap pembaca konkuren memerlukan server ID yang unik. Oleh karena itu, server-id harus berupa rentang, seperti 5400-6400, dan ukuran rentang harus lebih besar dari atau sama dengan tingkat paralelisme.

    Catatan

    Item konfigurasi ini dihapus di mesin komputasi Flink VVR 11.1 dan versi yang lebih baru.

    scan.incremental.snapshot.chunk.size

    Ukuran setiap chunk (jumlah baris).

    Tidak

    INTEGER

    8096

    Saat pembacaan snapshot inkremental diaktifkan, tabel dibagi menjadi beberapa chunk untuk dibaca. Data dalam chunk di-cache dalam memori sebelum sepenuhnya dibaca.

    Jumlah baris per chunk yang lebih kecil menghasilkan total jumlah chunk yang lebih besar di tabel. Meskipun ini meningkatkan granularitas pemulihan kesalahan, ini dapat menyebabkan kesalahan kehabisan memori (OOM) dan throughput keseluruhan yang lebih rendah. Oleh karena itu, Anda perlu menemukan keseimbangan dan menetapkan ukuran chunk yang masuk akal.

    scan.snapshot.fetch.size

    Jumlah maksimum record yang diambil sekaligus saat membaca data penuh dari tabel.

    Tidak

    INTEGER

    1024

    Tidak ada.

    scan.startup.mode

    Mode startup untuk konsumsi data.

    Tidak

    STRING

    initial

    Nilai valid:

    • initial (default): Saat startup pertama, memindai data historis penuh dan kemudian membaca data Binlog terbaru.

    • latest-offset: Saat startup pertama, tidak memindai data historis dan mulai membaca dari akhir Binlog (posisi terbaru). Hanya membaca perubahan yang terjadi setelah konektor dimulai.

    • earliest-offset: Tidak memindai data historis dan mulai membaca dari Binlog yang tersedia paling awal.

    • specific-offset: Tidak memindai data historis dan mulai dari offset Binlog tertentu. Anda dapat menentukan offset dengan mengonfigurasi scan.startup.specific-offset.file dan scan.startup.specific-offset.pos, atau Anda dapat menentukan set GTID dengan hanya mengonfigurasi scan.startup.specific-offset.gtid-set.

    • timestamp: Tidak memindai data historis dan mulai membaca Binlog dari timestamp tertentu. Timestamp ditentukan oleh scan.startup.timestamp-millis dalam milidetik.

    Penting

    Saat menggunakan mode startup earliest-offset, specific-offset, atau timestamp, pastikan skema tabel tidak berubah antara posisi konsumsi Binlog yang ditentukan dan waktu startup pekerjaan untuk menghindari error akibat ketidakcocokan skema.

    scan.startup.specific-offset.file

    Nama file Binlog untuk offset awal saat menggunakan mode startup offset tertentu.

    Tidak

    STRING

    Tidak ada

    Saat menggunakan konfigurasi ini, scan.startup.mode harus diatur ke specific-offset. Contoh format nama file: mysql-bin.000003.

    scan.startup.specific-offset.pos

    Offset dalam file Binlog yang ditentukan untuk offset awal saat menggunakan mode startup offset tertentu.

    Tidak

    INTEGER

    Tidak ada

    Saat menggunakan konfigurasi ini, scan.startup.mode harus diatur ke specific-offset.

    scan.startup.specific-offset.gtid-set

    Set GTID untuk offset awal saat menggunakan mode startup offset tertentu.

    Tidak

    STRING

    Tidak ada

    Saat menggunakan konfigurasi ini, scan.startup.mode harus diatur ke specific-offset. Contoh format set GTID: 24DA167-0C0C-11E8-8442-00059A3C7B00:1-19.

    scan.startup.timestamp-millis

    Timestamp offset awal dalam milidetik saat menggunakan mode startup timestamp.

    Tidak

    LONG

    Tidak ada

    Saat menggunakan konfigurasi ini, scan.startup.mode harus diatur ke timestamp. Timestamp dalam milidetik.

    Penting

    Saat menggunakan waktu tertentu, CDC MySQL mencoba membaca event awal setiap file Binlog untuk menentukan timestamp-nya, akhirnya menemukan file Binlog yang sesuai dengan waktu yang ditentukan. Pastikan file Binlog untuk timestamp yang ditentukan belum dibersihkan dari database dan dapat dibaca.

    server-time-zone

    Zona waktu sesi yang digunakan oleh database.

    Tidak

    STRING

    Jika Anda tidak menentukan parameter ini, sistem menggunakan zona waktu lingkungan runtime pekerjaan Flink sebagai zona waktu server database, yaitu zona waktu wilayah yang Anda pilih.

    Misalnya, Asia/Shanghai. Parameter ini mengontrol bagaimana tipe TIMESTAMP MySQL dikonversi ke tipe STRING. Untuk informasi selengkapnya, lihat Debezium temporal values.

    debezium.min.row.count.to.stream.results

    Ketika jumlah baris dalam tabel melebihi nilai ini, mode pembacaan batch digunakan.

    Tidak

    INTEGER

    1.000

    Flink membaca data dari tabel sumber MySQL dengan salah satu cara berikut:

    • Baca penuh: Membaca seluruh data tabel langsung ke memori. Ini cepat tetapi mengonsumsi memori yang sesuai. Jika tabel sumber sangat besar, ada risiko error OOM.

    • Baca batch: Membaca data dalam beberapa batch, mengambil sejumlah baris tertentu setiap kali hingga semua data dibaca. Ini menghindari risiko OOM untuk tabel besar tetapi relatif lebih lambat.

    connect.timeout

    Jangka waktu maksimum untuk menunggu koneksi ke server database MySQL mengalami timeout sebelum mencoba kembali.

    Tidak

    DURATION

    30s

    Tidak ada.

    connect.max-retries

    Jumlah maksimum percobaan ulang setelah koneksi gagal ke layanan database MySQL.

    Tidak

    INTEGER

    3

    Tidak ada.

    connection.pool.size

    Ukuran kolam koneksi database.

    Tidak

    INTEGER

    20

    Kolam koneksi database digunakan untuk menggunakan kembali koneksi, yang dapat mengurangi jumlah koneksi ke database.

    jdbc.properties.*

    Parameter koneksi kustom untuk URL JDBC.

    Tidak

    STRING

    Tidak ada

    Anda dapat meneruskan parameter koneksi kustom. Misalnya, untuk tidak menggunakan protokol SSL, Anda dapat mengonfigurasi 'jdbc.properties.useSSL' = 'false'.

    Untuk informasi selengkapnya tentang parameter koneksi yang didukung, lihat MySQL Configuration Properties.

    debezium.*

    Parameter kustom untuk Debezium guna membaca Binlog.

    Tidak

    STRING

    Tidak ada

    Anda dapat meneruskan parameter Debezium kustom. Misalnya, gunakan 'debezium.event.deserialization.failure.handling.mode'='ignore' untuk menentukan logika penanganan error parsing.

    heartbeat.interval

    Interval waktu sumber memajukan offset Binlog menggunakan event heartbeat.

    Tidak

    DURATION

    30s

    Event heartbeat digunakan untuk memajukan offset Binlog di sumber, yang sangat berguna untuk tabel MySQL yang diperbarui secara perlahan. Untuk tabel tersebut, offset Binlog tidak maju secara otomatis. Event heartbeat dapat mendorong offset Binlog maju, mencegahnya kedaluwarsa. Offset Binlog yang kedaluwarsa akan menyebabkan pekerjaan gagal dan memerlukan restart tanpa status.

    scan.incremental.snapshot.chunk.key-column

    Menentukan kolom yang akan digunakan sebagai kunci pemisahan untuk sharding selama fase snapshot.

    Lihat Keterangan.

    STRING

    Tidak ada

    • Diperlukan untuk tabel tanpa kunci utama. Kolom yang dipilih harus bertipe non-null (NOT NULL).

    • Opsional untuk tabel dengan primary key. Hanya satu kolom dari primary key yang dapat dipilih.

    rds.region-id

    ID wilayah instans Alibaba Cloud RDS for MySQL.

    Wajib saat menggunakan fitur membaca log arsip dari OSS.

    STRING

    Tidak ada

    Untuk informasi lebih lanjut tentang ID wilayah, lihat Wilayah dan zona.

    rds.access-key-id

    ID AccessKey akun Alibaba Cloud RDS for MySQL.

    Wajib saat menggunakan fitur membaca log arsip dari OSS.

    STRING

    Tidak ada

    Untuk informasi selengkapnya, lihat Bagaimana cara melihat Informasi AccessKey?.

    Penting

    Untuk mencegah Informasi AccessKey Anda bocor, kami menyarankan agar Anda mengelola rahasia untuk menentukan ID AccessKey. Untuk informasi selengkapnya, lihat Manajemen variabel.

    rds.access-key-secret

    Rahasia AccessKey akun Alibaba Cloud RDS for MySQL.

    Wajib saat menggunakan fitur membaca log arsip dari OSS.

    STRING

    Tidak ada

    Untuk informasi selengkapnya, lihat Bagaimana cara melihat Informasi AccessKey?

    Penting

    Untuk mencegah Informasi AccessKey Anda bocor, kami menyarankan agar Anda mengelola rahasia untuk menentukan Rahasia AccessKey. Untuk informasi selengkapnya, lihat Manajemen variabel.

    rds.db-instance-id

    ID instans Alibaba Cloud RDS for MySQL.

    Wajib saat menggunakan fitur membaca log arsip dari OSS.

    STRING

    Tidak ada

    Tidak ada.

    rds.main-db-id

    ID database utama instans Alibaba Cloud RDS for MySQL.

    Tidak

    STRING

    Tidak ada

    • Untuk informasi selengkapnya tentang cara mendapatkan ID database utama, lihat Cadangan log RDS for MySQL.

    • Hanya didukung di mesin komputasi Flink VVR 8.0.7 dan versi yang lebih baru.

    rds.download.timeout

    Periode waktu habis untuk mengunduh satu log arsip dari OSS.

    Tidak

    DURASI

    60s

    Tidak ada.

    rds.endpoint

    Titik akhir untuk mendapatkan informasi Binlog OSS.

    Tidak

    STRING

    Tidak ada

    • Untuk informasi lebih lanjut tentang nilai yang valid, lihat Endpoints.

    • Hanya didukung di mesin komputasi Flink VVR 8.0.8 dan versi yang lebih baru.

    scan.incremental.close-idle-reader.enabled

    Menentukan apakah akan menutup pembaca tidak aktif setelah fase Snapshot berakhir.

    Tidak

    BOOLEAN

    false

    • Hanya didukung di mesin komputasi Flink VVR 8.0.1 dan versi yang lebih baru.

    • Agar konfigurasi ini berlaku, atur execution.checkpointing.checkpoints-after-tasks-finish.enabled ke true.

    scan.read-changelog-as-append-only.enabled

    Menentukan apakah aliran data changelog diubah menjadi aliran data append-only.

    Tidak

    BOOLEAN

    false

    Nilai yang valid:

    • true: Semua jenis pesan (termasuk INSERT, DELETE, UPDATE_BEFORE, dan UPDATE_AFTER) dikonversi menjadi pesan INSERT. Aktifkan ini hanya dalam skenario khusus, seperti ketika Anda perlu menyimpan pesan penghapusan dari tabel upstream.

    • false (default): Semua jenis pesan dikirim ke hilir apa adanya.

    Catatan

    Hanya didukung di mesin komputasi Flink VVR 8.0.8 dan versi yang lebih baru.

    scan.only.deserialize.captured.tables.changelog.enabled

    Pada fase inkremental, menentukan apakah hanya mendeserialisasi event perubahan dari tabel yang ditentukan.

    Tidak

    BOOLEAN

    • Nilai default adalah false di versi VVR 8.x.

    • Nilai default adalah true di VVR 11.1 dan versi yang lebih baru.

    Nilai yang valid:

    • true: Hanya mendeserialisasi data perubahan dari tabel target, yang mempercepat pembacaan Binlog.

    • false (default): Mendeserialisasi data perubahan dari semua tabel.

    Catatan
    • Hanya didukung di mesin komputasi Flink VVR 8.0.7 dan versi yang lebih baru.

    • Saat menggunakan ini di mesin komputasi Flink VVR 8.0.8 dan sebelumnya, Anda harus mengubah nama parameter menjadi debezium.scan.only.deserialize.captured.tables.changelog.enable.

    scan.parse.online.schema.changes.enabled

    Pada fase inkremental, menentukan apakah mencoba mengurai event DDL perubahan tanpa lock RDS.

    Tidak

    BOOLEAN

    false

    Nilai yang valid:

    • true: Mengurai event DDL perubahan tanpa lock RDS.

    • false (default): Tidak mengurai event DDL perubahan tanpa lock RDS.

    Ini adalah fitur eksperimen. Kami menyarankan agar Anda mengambil snapshot pekerjaan Flink untuk pemulihan sebelum melakukan perubahan online tanpa lock.

    Catatan

    Hanya didukung di mesin komputasi Flink VVR 11.1 dan versi yang lebih baru.

    scan.incremental.snapshot.backfill.skip

    Menentukan apakah akan melewati backfill selama fase pembacaan snapshot.

    Tidak

    BOOLEAN

    false

    Nilai yang valid:

    • true: Melewati backfill selama fase pembacaan snapshot.

    • false (default): Tidak melewati backfill selama fase pembacaan snapshot.

    Jika backfill dilewati, perubahan pada tabel selama fase snapshot akan dibaca pada fase inkremental berikutnya, bukan digabungkan ke dalam snapshot.

    Penting

    Melewati backfill dapat menyebabkan ketidaksesuaian data karena perubahan yang terjadi selama fase snapshot mungkin diputar ulang. Hanya jaminan semantik setidaknya sekali yang diberikan.

    Catatan

    Hanya didukung di mesin komputasi Flink VVR 11.1 dan versi yang lebih baru.

    scan.incremental.snapshot.unbounded-chunk-first.enabled

    Menentukan apakah mendistribusikan chunk tak terbatas terlebih dahulu selama fase pembacaan snapshot.

    Tidak

    BOOELEAN

    false

    Nilai yang valid:

    • true: Mendistribusikan chunk tak terbatas terlebih dahulu selama fase pembacaan snapshot.

    • false (default): Tidak mendistribusikan chunk tak terbatas terlebih dahulu selama fase pembacaan snapshot.

    Ini adalah fitur eksperimen. Mengaktifkannya dapat mengurangi risiko error OOM pada Pengelola Tugas saat menyinkronkan chunk terakhir selama fase snapshot. Kami menyarankan menambahkan ini sebelum startup pertama pekerjaan.

    Catatan

    Hanya didukung di mesin komputasi Flink VVR 11.1 dan versi yang lebih baru.

  • Khusus tabel dimensi

    Parameter

    Deskripsi

    Diperlukan

    Tipe data

    Nilai default

    Catatan

    url

    URL JDBC MySQL.

    Tidak

    STRING

    Tidak ada

    Format URL adalah: jdbc:mysql://<endpoint>:<port>/<database_name>.

    lookup.max-retries

    Jumlah maksimum percobaan ulang setelah pembacaan data gagal.

    Tidak

    BILANGAN BULAT

    3

    Hanya didukung di mesin komputasi Flink VVR 6.0.7 dan versi yang lebih baru.

    lookup.cache.strategy

    Kebijakan cache.

    Tidak

    STRING

    Tidak ada

    Mendukung tiga kebijakan cache: None, LRU, dan ALL. Untuk informasi lebih lanjut tentang nilai-nilai tersebut, lihat Informasi latar belakang.

    Catatan

    Saat menggunakan kebijakan cache least recently used (LRU), Anda juga harus mengonfigurasi parameter lookup.cache.max-rows.

    lookup.cache.max-rows

    Jumlah maksimum baris yang di-cache.

    Tidak

    INTEGER

    100.000

    • Jika Anda memilih kebijakan cache LRU, Anda harus menetapkan ukuran cache.

    • Jika Anda memilih kebijakan cache ALL, Anda tidak perlu menetapkan ukuran cache.

    lookup.cache.ttl

    Waktu hidup cache (TTL).

    Tidak

    DURASI

    10 s

    Konfigurasi lookup.cache.ttl bergantung pada lookup.cache.strategy:

    • Jika lookup.cache.strategy diatur ke None, Anda tidak perlu mengonfigurasi lookup.cache.ttl. Ini berarti cache tidak akan kedaluwarsa.

    • Jika lookup.cache.strategy diatur ke LRU, lookup.cache.ttl adalah waktu kedaluwarsa cache. Secara default, tidak kedaluwarsa.

    • Jika lookup.cache.strategy diatur ke ALL, lookup.cache.ttl adalah waktu reload cache. Secara default, tidak reload.

    Gunakan format waktu, seperti 1min atau 10s.

    lookup.max-join-rows

    Jumlah maksimum hasil yang dikembalikan saat catatan dari tabel utama cocok dengan catatan di tabel dimensi.

    Tidak

    INTEGER

    1.024

    Tidak ada.

    lookup.filter-push-down.enabled

    Menentukan apakah mengaktifkan filter pushdown untuk tabel dimensi.

    Tidak

    BOOLEAN

    false

    Nilai yang valid:

    • true: Mengaktifkan filter pushdown untuk tabel dimensi. Saat memuat data dari tabel database MySQL, tabel dimensi memfilter data terlebih dahulu berdasarkan kondisi yang ditetapkan dalam pekerjaan SQL.

    • false (default): Menonaktifkan filter pushdown untuk tabel dimensi. Tabel dimensi memuat semua data saat memuat dari tabel database MySQL.

    Catatan

    Hanya didukung di mesin komputasi Flink VVR 8.0.7 dan versi yang lebih baru.

    Penting

    Pushdown tabel dimensi hanya boleh diaktifkan saat tabel Flink digunakan sebagai tabel dimensi. Tabel sumber MySQL tidak mendukung pengaktifan filter pushdown. Jika tabel Flink digunakan sebagai tabel sumber dan tabel dimensi, dan filter pushdown diaktifkan untuk tabel dimensi, Anda harus secara eksplisit mengatur item konfigurasi ini ke false menggunakan SQL Hints saat menggunakannya sebagai tabel sumber. Jika tidak, pekerjaan mungkin berjalan tidak normal.

  • Khusus tabel sink

    Parameter

    Deskripsi

    Diperlukan

    Tipe data

    Nilai default

    Catatan

    url

    URL JDBC MySQL.

    Tidak

    STRING

    Tidak ada

    Format URL adalah: jdbc:mysql://<endpoint>:<port>/<database_name>.

    sink.max-retries

    Jumlah maksimum percobaan ulang setelah penulisan data gagal.

    Tidak

    INTEGER

    3

    Tidak ada.

    sink.buffer-flush.batch-size

    Jumlah catatan yang ditulis dalam satu batch.

    Tidak

    INTEGER

    4096

    Tidak ada.

    sink.buffer-flush.max-rows

    Jumlah catatan data yang di-cache dalam memori.

    Tidak

    INTEGER

    10.000

    Parameter ini hanya berlaku setelah primary key ditentukan.

    sink.buffer-flush.interval

    Interval waktu untuk membersihkan cache. Jika data yang di-cache tidak memenuhi kondisi output setelah waktu tunggu yang ditentukan, sistem secara otomatis mengeluarkan semua data dalam cache.

    Tidak

    DURATION

    1s

    Tidak ada.

    sink.ignore-delete

    Menentukan apakah mengabaikan operasi Delete data.

    Tidak

    BOOLEAN

    false

    Saat aliran yang dihasilkan oleh Flink SQL mencakup catatan delete atau update-before, jika beberapa tugas output memperbarui bidang berbeda dari tabel yang sama secara bersamaan, ketidaksesuaian data dapat terjadi.

    Misalnya, setelah catatan dihapus, tugas lain hanya memperbarui beberapa bidang. Bidang yang tidak diperbarui akan menjadi null atau nilai default-nya, menyebabkan error data.

    Dengan menyetel sink.ignore-delete ke true, Anda dapat mengabaikan operasi DELETE dan UPDATE_BEFORE dari upstream untuk menghindari masalah tersebut.

    Catatan
    • UPDATE_BEFORE adalah bagian dari mekanisme retraksi Flink, digunakan untuk "menarik kembali" nilai lama dalam operasi pembaruan.

    • Saat ignoreDelete = true, semua catatan tipe DELETE dan UPDATE_BEFORE dilewati. Hanya catatan INSERT dan UPDATE_AFTER yang diproses.

    sink.ignore-null-when-update

    Saat memperbarui data, menentukan apakah memperbarui bidang yang sesuai ke null atau melewati pembaruan untuk bidang tersebut jika nilai bidang data yang masuk adalah null.

    Tidak

    BOOLEAN

    false

    Nilai yang valid:

    • true: Tidak memperbarui bidang. Parameter ini dapat diatur ke true hanya saat primary key ditetapkan untuk tabel Flink. Saat diatur ke true:

      • Untuk VVR 8.0.6 dan sebelumnya, penulisan data ke tabel sink tidak mendukung eksekusi batch.

      • Untuk VVR 8.0.7 dan yang lebih baru, penulisan data ke tabel sink mendukung eksekusi batch.

        Meskipun penulisan batch dapat secara signifikan meningkatkan efisiensi penulisan dan throughput keseluruhan, hal ini dapat menyebabkan latensi data dan risiko error OOM. Oleh karena itu, buat pertimbangan berdasarkan skenario bisnis aktual Anda.

    • false: Memperbarui bidang menjadi null.

    Catatan

    Parameter ini hanya didukung di mesin komputasi waktu nyata VVR 8.0.5 dan versi yang lebih baru.

Pemetaan tipe data

  • Tabel sumber CDC

    Tipe bidang MySQL CDC

    Tipe bidang Flink

    TINYINT

    TINYINT

    SMALLINT

    SMALLINT

    TINYINT UNSIGNED

    TINYINT UNSIGNED ZEROFILL

    INT

    INT

    MEDIUMINT

    SMALLINT UNSIGNED

    SMALLINT UNSIGNED ZEROFILL

    BIGINT

    BIGINT

    INT UNSIGNED

    INT UNSIGNED ZEROFILL

    MEDIUMINT UNSIGNED

    MEDIUMINT UNSIGNED ZEROFILL

    BIGINT UNSIGNED

    DECIMAL(20, 0)

    BIGINT UNSIGNED ZEROFILL

    SERIAL

    FLOAT [UNSIGNED] [ZEROFILL]

    FLOAT

    DOUBLE [UNSIGNED] [ZEROFILL]

    DOUBLE

    DOUBLE PRECISION [UNSIGNED] [ZEROFILL]

    REAL [UNSIGNED] [ZEROFILL]

    NUMERIC(p, s) [UNSIGNED] [ZEROFILL]

    DECIMAL(p, s)

    DECIMAL(p, s) [UNSIGNED] [ZEROFILL]

    BOOLEAN

    BOOLEAN

    TINYINT(1)

    DATE

    DATE

    TIME [(p)]

    TIME [(p)] [WITHOUT TIME ZONE]

    DATETIME [(p)]

    TIMESTAMP [(p)] [WITHOUT TIME ZONE]

    TIMESTAMP [(p)]

    TIMESTAMP [(p)]

    TIMESTAMP [(p)] WITH LOCAL TIME ZONE

    CHAR(n)

    STRING

    VARCHAR(n)

    TEXT

    BINARY

    BYTES

    VARBINARY

    BLOB

    Penting

    Kami menyarankan agar Anda tidak menggunakan tipe TINYINT(1) di MySQL untuk menyimpan nilai selain 0 dan 1. Saat property-version diatur ke 0, tabel sumber CDC MySQL memetakan TINYINT(1) ke tipe BOOLEAN Flink secara default, yang dapat menyebabkan ketidakakuratan data. Untuk menggunakan TINYINT(1) guna menyimpan nilai selain 0 dan 1, lihat parameter konfigurasi catalog.table.treat-tinyint1-as-boolean.

  • Tabel dimensi dan sink

    Tipe Bidang MySQL

    Tipe bidang Flink

    TINYINT

    TINYINT

    SMALLINT

    SMALLINT

    TINYINT UNSIGNED

    INT

    INT

    MEDIUMINT

    SMALLINT UNSIGNED

    BIGINT

    BIGINT

    INT UNSIGNED

    BIGINT UNSIGNED

    DECIMAL(20, 0)

    FLOAT

    FLOAT

    DOUBLE

    DOUBLE

    DOUBLE PRECISION

    NUMERIC(p, s)

    DECIMAL(p, s)

    Catatan

    dengan p <= 38.

    DECIMAL(p, s)

    BOOLEAN

    BOOLEAN

    TINYINT(1)

    DATE

    DATE

    TIME [(p)]

    TIME [(p)] [WITHOUT TIME ZONE]

    DATETIME [(p)]

    TIMESTAMP [(p)] [WITHOUT TIME ZONE]

    TIMESTAMP [(p)]

    CHAR(n)

    CHAR(n)

    VARCHAR(n)

    VARCHAR(n)

    BIT(n)

    BINARY(⌈n/8⌉)

    BINARY(n)

    BINARY(n)

    VARBINARY(N)

    VARBINARY(N)

    TINYTEXT

    STRING

    TEXT

    MEDIUMTEXT

    LONGTEXT

    TINYBLOB

    BYTES

    Penting

    Flink hanya mendukung catatan tipe BLOB MySQL yang kurang dari atau sama dengan 2.147.483.647 (2^31 - 1) byte.

    BLOB

    MEDIUMBLOB

    LONGBLOB

Ingesti Data

Anda dapat menggunakan konektor MySQL sebagai sumber data dalam pekerjaan YAML ingesti data.

Sintaksis

source:
   type: mysql
   name: Sumber MySQL
   hostname: localhost
   port: 3306
   username: <username>
   password: <password>
   tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
   server-id: 5401-5404

sink:
  type: xxx

Parameter

Parameter

Deskripsi

Diperlukan

Tipe data

Nilai default

Catatan

type

Tipe sumber data.

Ya

STRING

Tidak ada

Nilainya harus mysql.

name

Nama sumber data.

Tidak

STRING

Tidak ada

Tidak ada.

hostname

Alamat IP atau nama host database MySQL.

Ya

STRING

Tidak ada

Kami menyarankan agar Anda memasukkan alamat VPC.

Catatan

Jika database MySQL dan Realtime Compute for Apache Flink tidak berada dalam VPC yang sama, Anda harus membuat koneksi jaringan cross-VPC atau menggunakan titik akhir publik untuk akses. Untuk informasi selengkapnya, lihat Manajemen dan operasi penyimpanan dan Bagaimana kluster Flink yang sepenuhnya dikelola mengakses Internet?.

username

Nama pengguna layanan database MySQL.

Ya

STRING

Tidak ada

Tidak ada.

password

Kata sandi layanan database MySQL.

Ya

STRING

Tidak ada

Tidak ada.

tables

Tabel data MySQL untuk disinkronkan.

Ya

STRING

Tidak ada

  • Nama tabel mendukung ekspresi reguler untuk membaca data dari beberapa tabel.

  • Anda dapat memisahkan beberapa ekspresi reguler dengan koma.

Catatan
  • Jangan gunakan anchor awal (^) dan akhir ($) string dalam ekspresi reguler. Di versi 11.2, ekspresi reguler database diperoleh dengan memisahkan menggunakan titik. Anchor awal dan akhir akan membuat ekspresi reguler database yang dihasilkan tidak dapat digunakan. Misalnya, Anda perlu mengubah ^db.user_[0-9]+$ menjadi db.user_[0-9]+.

  • Titik digunakan untuk memisahkan nama database dan nama tabel. Untuk menggunakan titik untuk mencocokkan karakter apa pun, Anda harus meng-escape-nya dengan backslash. Misalnya: db0.\.*, db1.user_table_[0-9]+, db[1-2].[app|web]order_\.*.

tables.exclude

Tabel yang akan dikecualikan dari sinkronisasi.

Tidak

STRING

Tidak ada

  • Nama tabel mendukung ekspresi reguler untuk mengecualikan data dari beberapa tabel.

  • Anda dapat memisahkan beberapa ekspresi reguler dengan koma.

Catatan

Titik digunakan untuk memisahkan nama database dan nama tabel. Untuk menggunakan titik untuk mencocokkan karakter apa pun, Anda harus meng-escape-nya dengan backslash. Misalnya: db0.\.*, db1.user_table_[0-9]+, db[1-2].[app|web]order_\.*.

port

Nomor port layanan database MySQL.

Tidak

INTEGER

3306

Tidak ada.

schema-change.enabled

Menentukan apakah akan mengirim acara perubahan skema.

Tidak

BOOLEAN

true

Tidak ada.

server-id

ID numerik atau rentang untuk klien database yang digunakan untuk sinkronisasi.

Tidak

STRING

Nilai acak antara 5400 dan 6400 dihasilkan.

ID ini harus unik secara global dalam kluster MySQL. Kami menyarankan agar Anda menetapkan ID yang berbeda untuk setiap pekerjaan yang terhubung ke database yang sama.

Parameter ini juga mendukung rentang ID, seperti 5400-5408. Saat pembacaan inkremental diaktifkan, beberapa pembaca konkuren didukung. Dalam kasus ini, kami menyarankan agar Anda menetapkan rentang ID sehingga setiap pembaca konkuren menggunakan ID yang berbeda.

jdbc.properties.*

Parameter koneksi kustom untuk URL JDBC.

Tidak

STRING

Tidak ada

Anda dapat meneruskan parameter koneksi kustom. Misalnya, untuk tidak menggunakan protokol SSL, Anda dapat mengonfigurasi 'jdbc.properties.useSSL' = 'false'.

Untuk informasi selengkapnya tentang parameter koneksi yang didukung, lihat MySQL Configuration Properties.

debezium.*

Parameter kustom untuk Debezium guna membaca Binlog.

Tidak

STRING

Tidak ada

Anda dapat meneruskan parameter Debezium kustom. Misalnya, gunakan 'debezium.event.deserialization.failure.handling.mode'='ignore' untuk menentukan logika penanganan error parsing.

scan.incremental.snapshot.chunk.size

Ukuran setiap chunk (jumlah baris).

Tidak

INTEGER

8096

Tabel MySQL dibagi menjadi beberapa chunk untuk dibaca. Data dalam chunk di-cache dalam memori sebelum sepenuhnya dibaca.

Jumlah baris per chunk yang lebih kecil menghasilkan total jumlah chunk yang lebih besar di tabel. Meskipun ini meningkatkan granularitas pemulihan kesalahan, ini dapat menyebabkan kesalahan OOM dan throughput keseluruhan yang lebih rendah. Oleh karena itu, Anda perlu menemukan keseimbangan dan menetapkan ukuran chunk yang masuk akal.

scan.snapshot.fetch.size

Jumlah maksimum record yang diambil sekaligus saat membaca data penuh dari tabel.

Tidak

INTEGER

1024

Tidak ada.

scan.startup.mode

Mode startup untuk konsumsi data.

Tidak

STRING

initial

Nilai valid:

  • initial (default): Saat startup pertama, memindai data historis penuh dan kemudian membaca data Binlog terbaru.

  • latest-offset: Saat startup pertama, tidak memindai data historis dan mulai membaca dari akhir Binlog (posisi terbaru). Hanya membaca perubahan yang terjadi setelah konektor dimulai.

  • earliest-offset: Tidak memindai data historis dan mulai membaca dari Binlog yang tersedia paling awal.

  • specific-offset: Tidak memindai data historis dan mulai dari offset Binlog tertentu. Anda dapat menentukan offset dengan mengonfigurasi scan.startup.specific-offset.file dan scan.startup.specific-offset.pos, atau Anda dapat menentukan set GTID dengan hanya mengonfigurasi scan.startup.specific-offset.gtid-set.

  • timestamp: Tidak memindai data historis dan mulai membaca Binlog dari timestamp tertentu. Timestamp ditentukan oleh scan.startup.timestamp-millis dalam milidetik.

Penting

Untuk mode startup earliest-offset, specific-offset, dan timestamp, jika skema tabel pada waktu startup berbeda dari skema pada waktu offset awal yang ditentukan, pekerjaan akan gagal. Dengan kata lain, saat menggunakan ketiga mode ini, Anda harus memastikan bahwa skema tabel tidak berubah antara posisi konsumsi Binlog yang ditentukan dan waktu startup pekerjaan.

scan.startup.specific-offset.file

Nama file Binlog untuk offset awal saat menggunakan mode startup offset tertentu.

Tidak

STRING

Tidak ada

Saat menggunakan konfigurasi ini, scan.startup.mode harus diatur ke specific-offset. Contoh format nama file: mysql-bin.000003.

scan.startup.specific-offset.pos

Offset dalam file Binlog yang ditentukan untuk offset awal saat menggunakan mode startup offset tertentu.

Tidak

INTEGER

Tidak ada

Saat menggunakan konfigurasi ini, scan.startup.mode harus diatur ke specific-offset.

scan.startup.specific-offset.gtid-set

Set GTID untuk offset awal saat menggunakan mode startup offset tertentu.

Tidak

STRING

Tidak ada

Saat menggunakan konfigurasi ini, scan.startup.mode harus diatur ke specific-offset. Contoh format set GTID: 24DA167-0C0C-11E8-8442-00059A3C7B00:1-19.

scan.startup.timestamp-millis

Timestamp offset awal dalam milidetik saat menggunakan mode startup timestamp.

Tidak

PANJANG

Tidak ada

Saat menggunakan konfigurasi ini, scan.startup.mode harus diatur ke timestamp. Timestamp dalam milidetik.

Penting

Saat menggunakan waktu tertentu, CDC MySQL mencoba membaca event awal setiap file Binlog untuk menentukan timestamp-nya, akhirnya menemukan file Binlog yang sesuai dengan waktu yang ditentukan. Pastikan file Binlog untuk timestamp yang ditentukan belum dibersihkan dari database dan dapat dibaca.

server-time-zone

Zona waktu sesi yang digunakan oleh database.

Tidak

STRING

Jika Anda tidak menentukan parameter ini, sistem menggunakan zona waktu lingkungan runtime pekerjaan Flink sebagai zona waktu server database, yaitu zona waktu wilayah yang Anda pilih.

Misalnya, Asia/Shanghai. Parameter ini mengontrol bagaimana tipe TIMESTAMP MySQL dikonversi ke tipe STRING. Untuk informasi selengkapnya, lihat Debezium temporal values.

scan.startup.specific-offset.skip-events

Jumlah event Binlog yang dilewati saat membaca dari offset tertentu.

Tidak

INTEGER

Tidak ada

Saat menggunakan konfigurasi ini, scan.startup.mode harus diatur ke specific-offset.

scan.startup.specific-offset.skip-rows

Jumlah perubahan baris yang dilewati saat membaca dari offset tertentu. Satu event Binlog mungkin sesuai dengan beberapa perubahan baris.

Tidak

INTEGER

Tidak ada

Saat menggunakan konfigurasi ini, scan.startup.mode harus diatur ke specific-offset.

connect.timeout

Waktu maksimum untuk menunggu koneksi ke server database MySQL habis sebelum mencoba lagi.

Tidak

DURATION

30s

Tidak ada.

connect.max-retries

Jumlah maksimum percobaan ulang setelah koneksi ke layanan database MySQL gagal.

Tidak

INTEGER

3

Tidak ada.

connection.pool.size

Ukuran kolam koneksi database.

Tidak

INTEGER

20

Kolam koneksi database digunakan untuk menggunakan kembali koneksi, yang dapat mengurangi jumlah koneksi database.

heartbeat.interval

Interval waktu sumber memajukan offset Binlog menggunakan event heartbeat.

Tidak

DURATION

30s

Event heartbeat digunakan untuk memajukan offset Binlog di sumber, yang sangat berguna untuk tabel MySQL yang diperbarui secara perlahan. Untuk tabel tersebut, offset Binlog tidak maju secara otomatis. Event heartbeat dapat mendorong offset Binlog maju, mencegahnya kedaluwarsa. Offset Binlog yang kedaluwarsa akan menyebabkan pekerjaan gagal dan memerlukan restart tanpa status.

scan.incremental.snapshot.chunk.key-column

Menentukan kolom yang akan digunakan sebagai kunci pemisahan untuk sharding selama fase snapshot.

Nomor.

STRING

Tidak ada

Hanya satu kolom dari primary key yang dapat dipilih.

rds.region-id

ID wilayah instans Alibaba Cloud RDS for MySQL.

Wajib saat menggunakan fitur membaca log arsip dari OSS.

STRING

Tidak ada

Untuk informasi lebih lanjut tentang ID wilayah, lihat Wilayah dan zona.

rds.access-key-id

ID AccessKey akun Alibaba Cloud RDS for MySQL.

Wajib saat menggunakan fitur membaca log arsip dari OSS.

STRING

Tidak ada

Untuk informasi selengkapnya, lihat Bagaimana cara melihat Informasi AccessKey?

Penting

Untuk mencegah Informasi AccessKey Anda bocor, kami menyarankan agar Anda mengelola rahasia untuk menentukan ID AccessKey. Untuk informasi selengkapnya, lihat Manajemen variabel.

rds.access-key-secret

Rahasia AccessKey akun Alibaba Cloud RDS for MySQL.

Wajib saat menggunakan fitur membaca log arsip dari OSS.

STRING

Tidak ada

Untuk informasi selengkapnya, lihat Bagaimana cara melihat Informasi AccessKey?

Penting

Untuk mencegah Informasi AccessKey Anda bocor, kami menyarankan agar Anda mengelola rahasia untuk menentukan Rahasia AccessKey. Untuk informasi selengkapnya, lihat Manajemen variabel.

rds.db-instance-id

ID instans Alibaba Cloud RDS for MySQL.

Wajib saat menggunakan fitur membaca log arsip dari OSS.

STRING

Tidak ada

Tidak ada.

rds.main-db-id

ID database utama instans Alibaba Cloud RDS for MySQL.

Tidak

STRING

Tidak ada

Untuk informasi selengkapnya tentang cara mendapatkan ID database utama, lihat Cadangan log RDS for MySQL.

rds.download.timeout

Periode timeout untuk mengunduh satu log arsip dari OSS.

Tidak

DURATION

60s

Tidak ada.

rds.endpoint

Titik akhir untuk mendapatkan informasi Binlog OSS.

Tidak

STRING

Tidak ada

Untuk informasi lebih lanjut tentang nilai valid, lihat Titik akhir.

rds.binlog-directory-prefix

Awalan direktori untuk menyimpan file Binlog.

Tidak

STRING

rds-binlog-

Tidak ada.

rds.use-intranet-link

Menentukan apakah menggunakan tautan jaringan internal untuk mengunduh file Binlog.

Tidak

BOOLEAN

true

Tidak ada.

rds.binlog-directories-parent-path

Jalur absolut direktori induk tempat file Binlog disimpan.

Tidak

STRING

Tidak ada

Tidak ada.

chunk-meta.group.size

Ukuran metadata chunk.

Tidak

INTEGER

1000

Jika metadata lebih besar dari nilai ini, metadata akan dibagi menjadi beberapa bagian untuk transmisi.

chunk-key.even-distribution.factor.lower-bound

Batas bawah faktor distribusi chunk untuk sharding merata.

Tidak

DOUBLE

0.05

Jika faktor distribusi kurang dari nilai ini, sharding tidak merata digunakan.

Faktor distribusi chunk = (MAX(chunk-key) - MIN(chunk-key) + 1) / Jumlah total baris data.

chunk-key.even-distribution.factor.upper-bound

Batas atas faktor distribusi chunk untuk sharding merata.

Tidak

DOUBLE

1000.0

Jika faktor distribusi lebih besar dari nilai ini, sharding tidak merata digunakan.

Faktor distribusi chunk = (MAX(chunk-key) - MIN(chunk-key) + 1) / Jumlah total baris data.

scan.incremental.close-idle-reader.enabled

Menentukan apakah akan menutup pembaca tidak aktif setelah fase Snapshot berakhir.

Tidak

BOOLEAN

false

Agar konfigurasi ini berlaku, atur execution.checkpointing.checkpoints-after-tasks-finish.enabled ke true.

scan.only.deserialize.captured.tables.changelog.enabled

Pada fase inkremental, menentukan apakah hanya mendeserialisasi event perubahan dari tabel yang ditentukan.

Tidak

BOOLEAN

  • Nilai defaultnya adalah false di versi VVR 8.x.

  • Nilai default adalah true di VVR 11.1 dan versi yang lebih baru.

Nilai valid:

  • true: Hanya mendeserialisasi data perubahan dari tabel target, yang mempercepat pembacaan Binlog.

  • false (default): Mendeserialisasi data perubahan dari semua tabel.

scan.parallel-deserialize-changelog.enabled

Pada fase inkremental, menentukan apakah menggunakan beberapa thread untuk mengurai event perubahan.

Tidak

BOOLEAN

false

Nilai valid:

  • true: Menggunakan multithreading untuk mendeserialisasi event perubahan sambil mempertahankan urutan event Binlog, yang mempercepat pembacaan.

  • false (default): Menggunakan satu thread untuk mendeserialisasi event.

Catatan

Hanya didukung di mesin komputasi Flink VVR 8.0.11 dan versi yang lebih baru.

scan.parallel-deserialize-changelog.handler.size

Jumlah handler acara saat menggunakan beberapa thread untuk mengurai acara perubahan.

Tidak

INTEGER

2

Catatan

Hanya didukung di mesin komputasi Flink VVR 8.0.11 dan versi yang lebih baru.

metadata-column.include-list

Kolom metadata yang diteruskan ke sink hilir.

Tidak

STRING

Tidak ada

Metadata yang tersedia meliputi table_name, database_name, op_ts, es_ts, dan query_log. Anda dapat memisahkan beberapa kolom metadata dengan koma.

Catatan

Konektor YAML CDC MySQL tidak memerlukan atau mendukung penambahan kolom metadata op_type. Anda dapat langsung menggunakan __data_event_type__ dalam ekspresi Transform untuk mendapatkan tipe data perubahan.

Penting

Kolom metadata es_ts merepresentasikan waktu mulai transaksi. Kolom ini hanya dapat ditambahkan saat menggunakan MySQL versi 8.0.x. Jangan tambahkan kolom metadata ini saat menggunakan versi MySQL sebelumnya.

scan.newly-added-table.enabled

Saat memulai ulang dari checkpoint, menentukan apakah menyinkronkan tabel yang baru ditambahkan yang tidak cocok selama startup sebelumnya atau menghapus tabel yang saat ini tidak cocok yang disimpan dalam status.

Tidak

BOOLEAN

false

Berlaku saat memulai ulang dari checkpoint atau titik simpan.

scan.binlog.newly-added-table.enabled

Pada fase inkremental, menentukan apakah mengirim data dari tabel yang baru ditambahkan yang cocok.

Tidak

BOOLEAN

false

Tidak dapat diaktifkan secara bersamaan dengan scan.newly-added-table.enabled.

scan.incremental.snapshot.chunk.key-column

Menentukan kolom untuk tabel tertentu yang akan digunakan sebagai kunci pemisahan untuk sharding selama fase snapshot.

Tidak

STRING

Tidak ada

  • Hubungkan nama tabel dan nama bidang dengan titik dua (:) untuk menentukan aturan. Nama tabel dapat berupa ekspresi reguler. Anda dapat menentukan beberapa aturan, dipisahkan dengan titik koma (;). Misalnya: db1.user_table_[0-9]+:col1;db[1-2].[app|web]_order_\\.*:col2.

  • Ini wajib untuk tabel tanpa primary key, dan kolom yang dipilih harus bertipe non-null (NOT NULL). Ini opsional untuk tabel dengan primary key, dan hanya satu kolom dari primary key yang dapat dipilih.

scan.parse.online.schema.changes.enabled

Pada fase inkremental, menentukan apakah mencoba mengurai event DDL perubahan tanpa lock RDS.

Tidak

BOOLEAN

false

Nilai valid:

  • true: Mengurai event DDL perubahan tanpa lock RDS.

  • false (default): Tidak mengurai event DDL perubahan tanpa lock RDS.

Ini adalah fitur eksperimen. Kami menyarankan agar Anda mengambil snapshot pekerjaan Flink untuk pemulihan sebelum melakukan perubahan online tanpa lock.

Catatan

Hanya didukung di mesin komputasi Flink VVR 11.0 dan versi yang lebih baru.

scan.incremental.snapshot.backfill.skip

Menentukan apakah melewati backfill selama fase pembacaan snapshot.

Tidak

BOOLEAN

false

Nilai valid:

  • true: Melewati backfill selama fase pembacaan snapshot.

  • false (default): Tidak melewati backfill selama fase pembacaan snapshot.

Jika backfill dilewati, perubahan pada tabel selama fase snapshot akan dibaca pada fase inkremental berikutnya, bukan digabungkan ke dalam snapshot.

Penting

Melewati backfill dapat menyebabkan ketidaksesuaian data karena perubahan yang terjadi selama fase snapshot mungkin diputar ulang. Hanya semantik setidaknya-sekali yang dijamin.

Catatan

Hanya didukung di mesin komputasi Flink VVR 11.1 dan versi yang lebih baru.

treat-tinyint1-as-boolean.enabled

Menentukan apakah memperlakukan tipe TINYINT(1) sebagai tipe Boolean.

Tidak

BOOLEAN

true

Nilai yang valid:

  • true (default): Memperlakukan tipe TINYINT(1) sebagai tipe Boolean.

  • false: Tidak memperlakukan tipe TINYINT(1) sebagai tipe Boolean.

treat-timestamp-as-datetime-enabled

Menentukan apakah memperlakukan tipe TIMESTAMP sebagai tipe DATETIME.

Tidak

BOOLEAN

false

Nilai yang valid:

  • true: Memperlakukan tipe TIMESTAMP MySQL sebagai tipe DATETIME, memetakannya ke tipe TIMESTAMP CDC.

  • false (default): Memetakan tipe TIMESTAMP MySQL ke tipe TIMESTAMP_LTZ CDC.

Tipe TIMESTAMP MySQL menyimpan waktu UTC dan dipengaruhi oleh zona waktu. Tipe DATETIME MySQL menyimpan waktu literal dan tidak dipengaruhi oleh zona waktu.

Saat diaktifkan, ini mengonversi data tipe TIMESTAMP MySQL ke tipe DATETIME berdasarkan server-time-zone.

include-comments.enabled

Menentukan apakah menyinkronkan komentar tabel dan bidang.

Tidak

BOOELEAN

false

Nilai valid:

  • true: Menyinkronkan komentar tabel dan bidang.

  • false (default): Tidak menyinkronkan komentar tabel dan bidang.

Mengaktifkan ini meningkatkan penggunaan memori pekerjaan.

scan.incremental.snapshot.unbounded-chunk-first.enabled

Menentukan apakah mendistribusikan chunk tak terbatas terlebih dahulu selama fase pembacaan snapshot.

Tidak

BOOELEAN

false

Parameter ini mendukung nilai-nilai berikut:

  • true: Mendistribusikan shard tak terbatas terlebih dahulu selama fase pembacaan snapshot.

  • false (default): Tidak mendistribusikan shard tak terbatas terlebih dahulu selama fase pembacaan snapshot.

Ini adalah fitur eksperimental. Mengaktifkan fitur ini mengurangi risiko error kehabisan memori (OOM) saat Pengelola Tugas menyinkronkan shard terakhir selama fase snapshot. Tambahkan parameter ini sebelum pekerjaan dimulai untuk pertama kalinya.

Catatan

Hanya didukung di Flink compute engine Ververica Runtime (VVR) 11.1 dan versi yang lebih baru.

Pemetaan tipe

Tabel berikut menunjukkan pemetaan tipe data untuk Ingesti Data.

Tipe bidang CDC MySQL

Tipe bidang CDC

TINYINT(n)

TINYINT

SMALLINT

SMALLINT

TINYINT UNSIGNED

TINYINT UNSIGNED ZEROFILL

YEAR

INT

INT

MEDIUMINT

MEDIUMINT UNSIGNED

MEDIUMINT UNSIGNED ZEROFILL

SMALLINT UNSIGNED

SMALLINT UNSIGNED ZEROFILL

BIGINT

BIGINT

INT UNSIGNED

INT UNSIGNED ZEROFILL

BIGINT UNSIGNED

DECIMAL(20, 0)

BIGINT UNSIGNED ZEROFILL

SERIAL

FLOAT [UNSIGNED] [ZEROFILL]

FLOAT

DOUBLE [UNSIGNED] [ZEROFILL]

DOUBLE

DOUBLE PRECISION [UNSIGNED] [ZEROFILL]

REAL [UNSIGNED] [ZEROFILL]

NUMERIC(p, s) [UNSIGNED] [ZEROFILL] where p <= 38

DECIMAL(p, s)

DECIMAL(p, s) [UNSIGNED] [ZEROFILL] where p <= 38

FIXED(p, s) [UNSIGNED] [ZEROFILL] where p <= 38

BOOLEAN

BOOLEAN

BIT(1)

TINYINT(1)

DATE

DATE

TIME [(p)]

WAKTU [(p)]

DATETIME [(p)]

TIMESTAMP [(p)]

TIMESTAMP [(p)]

Berdasarkan nilai parameter treat-timestamp-as-datetime-enabled, bidang dipetakan secara berbeda:

true:TIMESTAMP[(p)]

false:TIMESTAMP_LTZ[(p)]

CHAR(n)

CHAR(n)

VARCHAR(n)

VARCHAR(n)

BIT(n)

BINARY(⌈(n + 7) / 8⌉)

BINARY(n)

BINARY(n)

VARBINARY(N)

VARBINARY(N)

NUMERIC(p, s) [UNSIGNED] [ZEROFILL] where 38 < p <= 65

STRING

Catatan

Dalam MySQL, tipe data desimal memiliki presisi hingga 65, tetapi dalam Flink, presisi tipe data desimal dibatasi hingga 38. Oleh karena itu, jika Anda mendefinisikan kolom desimal dengan presisi lebih dari 38, Anda harus memetakannya ke string untuk menghindari kehilangan presisi.

DECIMAL(p, s) [UNSIGNED] [ZEROFILL] where 38 < p <= 65

FIXED(p, s) [UNSIGNED] [ZEROFILL] where 38 < p <= 65

TINYTEXT

STRING

TEXT

MEDIUMTEXT

LONGTEXT

ENUM

JSON

STRING

Catatan

Tipe data JSON akan dikonversi ke string berformat JSON dalam Flink.

GEOMETRY

STRING

Catatan

Tipe data spasial dalam MySQL akan dikonversi ke string dengan format JSON tetap. Untuk informasi selengkapnya, lihat Pemetaan Tipe Data Spasial MySQL.

POINT

LINESTRING

POLYGON

MULTIPOINT

MULTILINESTRING

MULTIPOLYGON

GEOMETRYCOLLECTION

TINYBLOB

BYTES

Catatan

Untuk tipe data BLOB dalam MySQL, hanya blob dengan panjang tidak lebih dari 2.147.483.647 (2**31-1) yang didukung.

BLOB

MEDIUMBLOB

LONGBLOB

Contoh penggunaan

  • Tabel sumber CDC

    CREATE TEMPORARY TABLE mysqlcdc_source (
       order_id INT,
       order_date TIMESTAMP(0),
       customer_name STRING,
       price DECIMAL(10, 5),
       product_id INT,
       order_status BOOLEAN,
       PRIMARY KEY(order_id) NOT ENFORCED
    ) WITH (
      'connector' = 'mysql',
      'hostname' = '<yourHostname>',
      'port' = '3306',
      'username' = '<yourUsername>',
      'password' = '<yourPassword>',
      'database-name' = '<yourDatabaseName>',
      'table-name' = '<yourTableName>'
    );
    
    CREATE TEMPORARY TABLE blackhole_sink(
      order_id INT,
      customer_name STRING
    ) WITH (
      'connector' = 'blackhole'
    );
    
    INSERT INTO blackhole_sink
    SELECT order_id, customer_name FROM mysqlcdc_source;
  • Tabel dimensi

    CREATE TEMPORARY TABLE datagen_source(
      a INT,
      b BIGINT,
      c STRING,
      `proctime` AS PROCTIME()
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE mysql_dim (
      a INT,
      b VARCHAR,
      c VARCHAR
    ) WITH (
      'connector' = 'mysql',
      'hostname' = '<yourHostname>',
      'port' = '3306',
      'username' = '<yourUsername>',
      'password' = '<yourPassword>',
      'database-name' = '<yourDatabaseName>',
      'table-name' = '<yourTableName>'
    );
    
    CREATE TEMPORARY TABLE blackhole_sink(
      a INT,
      b STRING
    ) WITH (
      'connector' = 'blackhole'
    );
    
    INSERT INTO blackhole_sink
    SELECT T.a, H.b
    FROM datagen_source AS T JOIN mysql_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H ON T.a = H.a;
  • Tabel sink

    CREATE TEMPORARY TABLE datagen_source (
      `name` VARCHAR,
      `age` INT
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE mysql_sink (
      `name` VARCHAR,
      `age` INT
    ) WITH (
      'connector' = 'mysql',
      'hostname' = '<yourHostname>',
      'port' = '3306',
      'username' = '<yourUsername>',
      'password' = '<yourPassword>',
      'database-name' = '<yourDatabaseName>',
      'table-name' = '<yourTableName>'
    );
    
    INSERT INTO mysql_sink
    SELECT * FROM datagen_source;
  • Sumber data Ingesti Data

    source:
      type: mysql
      name: MySQL Source
      hostname: ${mysql.hostname}
      port: ${mysql.port}
      username: ${mysql.username}
      password: ${mysql.password}
      tables: ${mysql.source.table}
      server-id: 7601-7604
    
    sink:
      type: values
      name: Values Sink
      print.enabled: true
      sink.print.logger: true

Tentang tabel sumber CDC MySQL

  • Prinsip implementasi

    Saat tabel sumber CDC MySQL dimulai, tabel tersebut memindai seluruh tabel, membaginya menjadi beberapa chunk berdasarkan primary key, dan mencatat offset log biner saat ini. Kemudian, menggunakan algoritma snapshot inkremental untuk membaca data dari setiap chunk menggunakan pernyataan SELECT. Pekerjaan secara berkala melakukan checkpoint untuk mencatat chunk yang telah selesai. Jika terjadi failover, pekerjaan hanya perlu membaca chunk yang belum selesai. Setelah semua chunk dibaca, pekerjaan mulai membaca catatan perubahan inkremental dari offset log biner yang sebelumnya dicatat. Pekerjaan Flink terus melakukan checkpoint berkala dan mencatat offset log biner. Jika pekerjaan gagal, pekerjaan melanjutkan pemrosesan dari offset log biner terakhir yang dicatat. Proses ini mencapai semantik tepat-sekali.

    Untuk penjelasan lebih rinci tentang algoritma snapshot inkremental, lihat Konektor CDC MySQL.

  • Metadata

    Metadata berguna dalam skenario di mana database dan tabel yang di-shard digabung dan disinkronkan. Hal ini karena setelah penggabungan, bisnis sering ingin membedakan database dan tabel sumber untuk setiap data. Kolom metadata dapat digunakan untuk mengakses informasi nama database dan tabel dari tabel sumber. Oleh karena itu, Anda dapat dengan mudah menggabungkan beberapa tabel yang di-shard menjadi satu tabel tujuan menggunakan kolom metadata.

    Sumber CDC MySQL mendukung sintaks kolom metadata. Anda dapat mengakses metadata berikut menggunakan kolom metadata.

    Kunci metadata

    Tipe metadata

    Deskripsi

    database_name

    STRING NOT NULL

    Nama database yang berisi baris tersebut.

    table_name

    STRING NOT NULL

    Nama tabel yang berisi baris tersebut.

    op_ts

    TIMESTAMP_LTZ(3) NOT NULL

    Waktu perubahan dilakukan di database. Jika catatan berasal dari data historis tabel dan bukan dari Binlog, nilai ini selalu 0.

    op_type

    STRING NOT NULL

    Jenis perubahan pada baris tersebut.

    • +I: Pesan INSERT

    • -D: Pesan DELETE

    • -U: Pesan UPDATE_BEFORE

    • +U: Pesan UPDATE_AFTER

    Catatan

    Hanya didukung di Realtime Compute for Apache Flink VVR 8.0.7 dan versi yang lebih baru.

    query_log

    STRING NOT NULL

    Catatan log kueri MySQL yang sesuai dengan baris yang dibaca.

    Catatan

    MySQL perlu mengaktifkan parameter binlog_rows_query_log_events untuk mencatat log kueri.

    Contoh kode berikut menunjukkan cara menggabungkan dan menyinkronkan beberapa tabel pesanan dari beberapa database yang di-shard dalam instans MySQL ke tabel holo_orders di instans Hologres hilir.

    CREATE TEMPORARY TABLE mysql_orders (
      db_name STRING METADATA FROM 'database_name' VIRTUAL,  -- Membaca nama database.
      table_name STRING METADATA  FROM 'table_name' VIRTUAL, -- Membaca nama tabel.
      operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL, -- Membaca timestamp perubahan.
      op_type STRING METADATA FROM 'op_type' VIRTUAL, -- Membaca tipe perubahan.
      order_id INT,
      order_date TIMESTAMP(0),
      customer_name STRING,
      price DECIMAL(10, 5),
      product_id INT,
      order_status BOOLEAN,
      PRIMARY KEY(order_id) NOT ENFORCED
    ) WITH (
      'connector' = 'mysql-cdc',
      'hostname' = 'localhost',
      'port' = '3306',
      'username' = 'flinkuser',
      'password' = 'flinkpw',
      'database-name' = 'mydb_.*', -- Ekspresi reguler untuk mencocokkan beberapa database yang di-shard.
      'table-name' = 'orders_.*'   -- Ekspresi reguler untuk mencocokkan beberapa tabel yang di-shard.
    );
    
    INSERT INTO holo_orders SELECT * FROM mysql_orders;

    Berdasarkan kode di atas, jika parameter scan.read-changelog-as-append-only.enabled diatur ke true dalam klausa WITH, hasil output bervariasi berdasarkan pengaturan primary key tabel hilir:

    • Jika primary key tabel hilir adalah order_id, hasil output hanya berisi perubahan terakhir untuk setiap primary key di tabel hulu. Misalnya, untuk primary key yang perubahan terakhirnya adalah operasi hapus, Anda dapat melihat catatan di tabel hilir dengan primary key yang sama dan op_type -D.

    • Jika primary key tabel hilir adalah gabungan dari order_id, operation_ts, dan op_type, hasil output berisi riwayat perubahan lengkap untuk setiap primary key di tabel hulu.

  • Dukungan ekspresi reguler

    Tabel sumber CDC MySQL mendukung penggunaan ekspresi reguler dalam nama tabel atau nama database untuk mencocokkan beberapa tabel atau database. Contoh kode berikut menunjukkan cara menentukan beberapa tabel menggunakan ekspresi reguler.

    CREATE TABLE products (
      db_name STRING METADATA FROM 'database_name' VIRTUAL,
      table_name STRING METADATA  FROM 'table_name' VIRTUAL,
      operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
      order_id INT,
      order_date TIMESTAMP(0),
      customer_name STRING,
      price DECIMAL(10, 5),
      product_id INT,
      order_status BOOLEAN,
      PRIMARY KEY(order_id) NOT ENFORCED
    ) WITH (
      'connector' = 'mysql-cdc',
      'hostname' = 'localhost',
      'port' = '3306',
      'username' = 'root',
      'password' = '123456',
      'database-name' = '(^(test).*|^(tpc).*|txc|.*[p$]|t{2})', -- Ekspresi reguler untuk mencocokkan beberapa database.
      'table-name' = '(t[5-8]|tt)' -- Ekspresi reguler untuk mencocokkan beberapa tabel.
    );

    Tabel berikut menjelaskan ekspresi reguler dalam contoh di atas:

    • ^(test).* adalah contoh pencocokan awalan. Ekspresi ini dapat mencocokkan nama database yang dimulai dengan "test", seperti "test1" dan "test2".

    • .*[p$] adalah contoh pencocokan akhiran. Ekspresi ini dapat mencocokkan nama database yang diakhiri dengan "p", seperti "cdcp" dan "edcp".

    • txc adalah pencocokan spesifik. Ekspresi ini dapat mencocokkan nama database yang persis "txc".

    Saat mencocokkan nama tabel yang memenuhi syarat sepenuhnya, CDC MySQL menggunakan nama database dan nama tabel untuk mengidentifikasi tabel secara unik. Polanya adalah database-name.table-name. Misalnya, pola pencocokan (^(test).*|^(tpc).*|txc|.*[p$]|t{2}).(t[ 5-8]|tt) dapat mencocokkan tabel dalam database, seperti txc.tt dan test2.test5.

    Penting

    Dalam konfigurasi pekerjaan SQL, table-name dan database-name tidak mendukung penggunaan koma (,) untuk menentukan beberapa tabel atau database.

    • Untuk mencocokkan beberapa tabel atau menggunakan beberapa ekspresi reguler, Anda dapat menghubungkannya dengan bilah vertikal (|) dan membungkusnya dalam tanda kurung. Misalnya, untuk membaca tabel "user" dan "product", Anda dapat mengonfigurasi table-name sebagai (user|product).

    • Jika ekspresi reguler berisi koma, Anda harus menulis ulang menggunakan operator bilah vertikal (|). Misalnya, ekspresi reguler mytable_\d{1, 2} harus ditulis ulang sebagai (mytable_\d{1}|mytable_\d{2}) yang setara untuk menghindari penggunaan koma.

  • Kontrol konkurensi

    Konektor MySQL mendukung pembacaan data penuh dengan beberapa thread konkuren, yang dapat meningkatkan efisiensi pemuatan data. Bersamaan dengan fitur penyetelan otomatis Autopilot di konsol Realtime Compute for Apache Flink, konektor dapat secara otomatis melakukan skala-masuk setelah pembacaan multi-thread selesai dan selama fase inkremental untuk menghemat sumber daya komputasi.

    Di Konsol pengembangan Realtime Compute for Apache Flink, Anda dapat mengatur konkurensi pekerjaan dalam mode Dasar atau Ahli pada halaman Konfigurasi Sumber Daya. Perbedaannya adalah sebagai berikut:

    • Konkurensi yang diatur dalam mode Dasar adalah konkurensi global untuk seluruh pekerjaan.基础模式

    • Mode Ahli mendukung pengaturan konkurensi untuk VERTEX tertentu sesuai kebutuhan.vertex并发

    Untuk informasi selengkapnya tentang konfigurasi sumber daya, lihat Konfigurasi informasi penerapan untuk pekerjaan.

    Penting

    Terlepas dari apakah Anda menggunakan mode Dasar atau Ahli, saat Anda mengatur konkurensi, rentang server-id yang dideklarasikan dalam tabel harus lebih besar dari atau sama dengan konkurensi pekerjaan. Misalnya, jika rentang server-id adalah 5404-5412, terdapat 8 server-id unik. Dalam kasus ini, pekerjaan dapat memiliki maksimal 8 thread konkuren. Pekerjaan berbeda untuk instans MySQL yang sama tidak boleh memiliki rentang server-id yang tumpang tindih. Artinya, setiap pekerjaan harus dikonfigurasi secara eksplisit dengan server-id yang berbeda.

  • Skala-masuk otomatis Autopilot

    Fase data penuh mengakumulasi banyak data historis. Untuk meningkatkan efisiensi pembacaan, data historis biasanya dibaca secara konkuren. Dalam fase log biner inkremental, pembacaan konkuren tunggal biasanya cukup karena volume data log biner kecil dan urutan global harus dipastikan. Persyaratan sumber daya yang berbeda dari fase penuh dan inkremental dapat diseimbangkan secara otomatis oleh fitur penyetelan otomatis untuk mencapai kinerja tinggi dan efisiensi sumber daya.

    Penyetelan otomatis memantau trafik setiap tugas di Sumber CDC MySQL. Saat pekerjaan memasuki fase log biner, jika hanya satu tugas yang bertanggung jawab untuk membaca log biner dan tugas lainnya idle, penyetelan otomatis secara otomatis mengurangi jumlah CU dan konkurensi Sumber. Untuk mengaktifkan penyetelan otomatis, Anda dapat mengatur mode penyetelan otomatis ke Aktif di halaman O&M pekerjaan.

    Catatan

    Interval pemicu minimum default untuk mengurangi tingkat paralelisme adalah 24 jam. Untuk informasi selengkapnya tentang parameter dan detail penyetelan otomatis, lihat Konfigurasi penyetelan otomatis.

  • Mode startup

    Anda dapat menggunakan opsi konfigurasi scan.startup.mode untuk menentukan mode startup untuk tabel sumber CDC MySQL. Opsi yang tersedia adalah sebagai berikut:

    • initial (default): Saat startup pertama, konektor melakukan pembacaan penuh tabel database dan kemudian beralih ke mode inkremental untuk membaca log biner.

    • earliest-offset: Melewati fase snapshot dan mulai membaca dari offset log biner yang tersedia paling awal.

    • latest-offset: Melewati fase snapshot dan mulai membaca dari akhir log biner. Dalam mode ini, tabel sumber hanya dapat membaca perubahan data yang terjadi setelah pekerjaan dimulai.

    • specific-offset: Melewati fase snapshot dan mulai membaca dari offset log biner tertentu. Offset dapat ditentukan oleh nama file log biner dan posisi, atau oleh set GTID.

    • timestamp: Melewati fase snapshot dan mulai membaca event log biner dari timestamp tertentu.

    Contoh penggunaan:

    CREATE TABLE mysql_source (...) WITH (
        'connector' = 'mysql-cdc',
        'scan.startup.mode' = 'earliest-offset', -- Mulai dari offset paling awal.
        'scan.startup.mode' = 'latest-offset', -- Mulai dari offset terbaru.
        'scan.startup.mode' = 'specific-offset', -- Mulai dari offset tertentu.
        'scan.startup.mode' = 'timestamp', -- Mulai dari timestamp tertentu.
        'scan.startup.specific-offset.file' = 'mysql-bin.000003', -- Tentukan nama file Binlog dalam mode specific-offset.
        'scan.startup.specific-offset.pos' = '4', -- Tentukan posisi Binlog dalam mode specific-offset.
        'scan.startup.specific-offset.gtid-set' = '24DA167-0C0C-11E8-8442-00059A3C7B00:1-19', -- Tentukan set GTID dalam mode specific-offset.
        'scan.startup.timestamp-millis' = '1667232000000' -- Tentukan timestamp startup dalam mode timestamp.
        ...
    )
    Penting
    • Sumber MySQL mencetak offset saat ini ke log pada level INFO selama checkpoint. Awalan log adalah Binlog offset on checkpoint {checkpoint-id}. Log ini dapat membantu Anda memulai pekerjaan dari offset checkpoint tertentu.

    • Jika tabel yang dibaca telah mengalami perubahan skema, memulai dari offset paling awal (earliest-offset), offset tertentu (specific-offset), atau timestamp (timestamp) dapat menyebabkan error. Hal ini karena pembaca Debezium secara internal menyimpan skema tabel terbaru, dan data sebelumnya dengan skema yang tidak cocok tidak dapat diurai dengan benar.

  • Tentang tabel sumber CDC tanpa primary key

    • Jika Anda menggunakan tabel tanpa primary key, Anda harus mengatur scan.incremental.snapshot.chunk.key-column. Anda hanya dapat memilih bidang non-null.

    • Semantik pemrosesan untuk tabel sumber CDC tanpa primary key ditentukan oleh perilaku kolom yang ditentukan oleh scan.incremental.snapshot.chunk.key-column:

      • Jika kolom yang ditentukan tidak diperbarui, semantik tepat-sekali dapat dijamin.

      • Jika kolom yang ditentukan diperbarui, hanya semantik setidaknya-sekali yang dapat dijamin. Namun, Anda dapat memastikan kebenaran data dengan menggabungkan kolom tersebut dengan sink hilir yang memiliki primary key dan menggunakan operasi idempoten.

  • Membaca log cadangan Alibaba Cloud RDS for MySQL

    Tabel sumber CDC MySQL mendukung pembacaan log cadangan dari Alibaba Cloud RDS for MySQL. Ini berguna dalam skenario di mana fase data penuh memakan waktu lama dan file log biner lokal telah dibersihkan secara otomatis, tetapi file cadangan yang diunggah secara otomatis atau manual masih ada.

    Contoh penggunaan:

    CREATE TABLE mysql_source (...) WITH (
        'connector' = 'mysql-cdc',
        'rds.region-id' = 'cn-beijing',
        'rds.access-key-id' = 'xxxxxxxxx', 
        'rds.access-key-secret' = 'xxxxxxxxx', 
        'rds.db-instance-id' = 'rm-xxxxxxxxxxxxxxxxx', 
        'rds.main-db-id' = '12345678',
        'rds.download.timeout' = '60s'
        ...
    )
  • Mengaktifkan penggunaan ulang Sumber CDC

    Pekerjaan yang menggunakan beberapa tabel sumber CDC MySQL dari instans yang sama akan menjalankan beberapa klien Binlog, sehingga meningkatkan beban pada database. Untuk informasi selengkapnya, lihat FAQ CDC MySQL.

    Solusi

    Realtime Compute for Apache Flink VVR 8.0.7 dan versi yang lebih baru mendukung penggunaan ulang Sumber CDC MySQL. Penggunaan ulang menggabungkan tabel sumber CDC MySQL yang dapat digabungkan. Penggabungan terjadi ketika tabel sumber memiliki item konfigurasi yang sama, kecuali nama database, nama tabel, dan server-id. Mesin secara otomatis menggabungkan sumber CDC MySQL dalam pekerjaan yang sama.

    Prosedur

    1. Gunakan perintah SET dalam pekerjaan SQL Anda:

      SET 'table.optimizer.source-merge.enabled' = 'true';
      
      # (Untuk VVR 8.0.8 dan 8.0.9) Atur juga item ini:
      SET 'sql-gateway.exec-plan.enabled' = 'false';
      Penggunaan ulang diaktifkan secara default di VVR 11.1 dan versi yang lebih baru.
    2. Mulai pekerjaan tanpa status. Mengubah konfigurasi penggunaan ulang Sumber mengubah topologi pekerjaan. Anda harus memulai pekerjaan tanpa status. Jika tidak, pekerjaan mungkin gagal dimulai atau data dapat hilang. Jika Sumber digabung, Anda dapat melihat node MergetableSourceScan.

    Penting
    • Setelah Anda mengaktifkan penggunaan ulang, kami tidak menyarankan agar Anda menonaktifkan operator chaining. Jika Anda mengatur pipeline.operator-chaining ke false, overhead serialisasi dan deserialisasi data meningkat. Semakin banyak Sumber yang digabung, semakin besar overhead-nya.

    • Di VVR 8.0.7, menonaktifkan operator chaining menyebabkan masalah serialisasi.

Mempercepat pembacaan Binlog

Saat konektor MySQL digunakan sebagai tabel sumber atau sumber data Ingesti Data, konektor tersebut mengurai file log biner untuk menghasilkan berbagai pesan perubahan selama fase inkremental. File log biner mencatat semua perubahan tabel dalam format biner. Anda dapat mempercepat penguraian file log biner dengan cara berikut.

  • Aktifkan konfigurasi filter penguraian

    • Gunakan item konfigurasi scan.only.deserialize.captured.tables.changelog.enabled: Hanya mengurai event perubahan dari tabel yang ditentukan.

  • Optimalkan parameter Debezium

    debezium.max.queue.size: 162580
    debezium.max.batch.size: 40960
    debezium.poll.interval.ms: 50
    • debezium.max.queue.size: Jumlah maksimum record yang dapat ditampung oleh antrian pemblokiran. Saat Debezium membaca aliran event dari database, ia menempatkan event dalam antrian pemblokiran sebelum menuliskannya ke downstream. Nilai default adalah 8192.

    • debezium.max.batch.size: Jumlah maksimum event yang diproses konektor dalam setiap iterasi. Nilai default adalah 2048.

    • debezium.poll.interval.ms: Jumlah milidetik yang harus ditunggu konektor sebelum meminta event perubahan baru. Nilai default adalah 1000 milidetik, atau 1 detik.

Contoh penggunaan:

CREATE TABLE mysql_source (...) WITH (
    'connector' = 'mysql-cdc',
    -- Konfigurasi Debezium
    'debezium.max.queue.size' = '162580',
    'debezium.max.batch.size' = '40960',
    'debezium.poll.interval.ms' = '50',
    -- Aktifkan filter parsing
    'scan.only.deserialize.captured.tables.changelog.enabled' = 'true',  -- Hanya mengurai event perubahan dari tabel yang ditentukan.
    ...
)
source:
  type: mysql
  name: MySQL Source
  hostname: ${mysql.hostname}
  port: ${mysql.port}
  username: ${mysql.username}
  password: ${mysql.password}
  tables: ${mysql.source.table}
  server-id: 7601-7604
  # Konfigurasi Debezium
  debezium.max.queue.size: 162580
  debezium.max.batch.size: 40960
  debezium.poll.interval.ms: 50
  # Aktifkan filter penguraian
  scan.only.deserialize.captured.tables.changelog.enabled: true

Kapasitas konsumsi log biner CDC MySQL Edisi Perusahaan adalah 85 MB/detik, sekitar dua kali lipat dari versi komunitas open source. Saat kecepatan pembuatan file log biner melebihi 85 MB/detik (yang berarti satu file 512 MB dihasilkan setiap 6 detik), latensi pekerjaan Flink terus meningkat. Latensi pemrosesan secara bertahap menurun setelah kecepatan pembuatan file log biner melambat. Saat file log biner berisi transaksi besar, hal ini dapat menyebabkan peningkatan sementara dalam latensi pemrosesan. Latensi menurun setelah log transaksi dibaca.

MySQL CDC DataStream API

Penting

Saat Anda membaca dan menulis data menggunakan API DataStream, Anda harus menggunakan konektor DataStream yang sesuai untuk terhubung ke Flink. Untuk informasi selengkapnya tentang cara menyiapkan konektor DataStream, lihat Cara menggunakan konektor DataStream.

Anda dapat membuat program API DataStream dan menggunakan MySqlSource. Bagian berikut memberikan contoh kode dan dependensi pom:

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
public class MySqlSourceExample {
  public static void main(String[] args) throws Exception {
    MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
        .hostname("yourHostname")
        .port(yourPort)
        .databaseList("yourDatabaseName") // set captured database
        .tableList("yourDatabaseName.yourTableName") // set captured table
        .username("yourUsername")
        .password("yourPassword")
        .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
        .build();
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    // enable checkpoint
    env.enableCheckpointing(3000);
    env
      .fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
      // set 4 parallel source tasks
      .setParallelism(4)
      .print().setParallelism(1); // use parallelism 1 for sink to keep message ordering
    env.execute("Print MySQL Snapshot + Binlog");
  }
}
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-core</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-base</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-common</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-java-bridge</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>ververica-connector-mysql</artifactId>
    <version>${vvr.version}</version>
</dependency>

Saat Anda membuat MySqlSource, Anda harus menentukan parameter berikut dalam kode:

Parameter

Deskripsi

hostname

Alamat IP atau hostname dari database MySQL.

port

Nomor port layanan database MySQL.

databaseList

Nama database MySQL.

Catatan

Nama database mendukung ekspresi reguler untuk membaca data dari beberapa database. Anda dapat menggunakan .* untuk mencocokkan semua database.

username

Nama pengguna untuk layanan database MySQL.

password

Kata sandi untuk layanan database MySQL.

deserializer

Deserializer yang mendeserialisasi catatan tipe SourceRecord ke tipe yang ditentukan. Nilai yang valid:

  • RowDataDebeziumDeserializeSchema: Mengonversi SourceRecord ke struktur data internal RowData Flink Table atau SQL.

  • JsonDebeziumDeserializationSchema: Mengonversi SourceRecord ke String berformat JSON.

Dependensi pom harus menentukan parameter berikut:

${vvr.version}

Versi mesin Alibaba Cloud Realtime Compute for Apache Flink, misalnya: 1.17-vvr-8.0.4-3.

Catatan

Harap gunakan nomor versi yang ditampilkan di Maven, karena kami mungkin merilis versi Hotfix secara berkala, dan pembaruan ini mungkin tidak diumumkan melalui saluran lain.

${flink.version}

Versi Apache Flink, misalnya: 1.17.2.

Penting

Harap gunakan versi Apache Flink yang sesuai dengan versi mesin Alibaba Cloud Realtime Compute for Apache Flink untuk menghindari masalah ketidakcocokan selama runtime pekerjaan. Untuk korespondensi versi, lihat Mesin DPI.

FAQ

Untuk informasi tentang masalah yang mungkin Anda temui saat menggunakan tabel sumber CDC, lihat Masalah CDC.