全部产品
Search
文档中心

AnalyticDB:Gunakan Flink untuk berlangganan log biner

更新时间:Nov 19, 2025

Realtime Compute for Apache Flink dapat berlangganan ke AnalyticDB for MySQL untuk menangkap dan memproses data perubahan database secara real time, sehingga memungkinkan sinkronisasi data dan komputasi aliran yang efisien. Topik ini menjelaskan cara menggunakan Flink untuk berlangganan ke log biner AnalyticDB for MySQL.

Prasyarat

  • Kluster AnalyticDB for MySQL merupakan Enterprise Edition, Basic Edition, Data Lakehouse Edition, atau Data Warehouse Edition dalam mode elastis.

  • Versi minor kluster AnalyticDB for MySQL adalah 3.2.1.0 atau lebih baru.

    Catatan

    Untuk melihat dan memperbarui versi minor kluster AnalyticDB for MySQL, masuk ke Konsol AnalyticDB for MySQL dan buka bagian Configuration Information pada halaman Cluster Information.

  • Mesin komputasi real-time Flink adalah Ververica Runtime (VVR) 8.0.4 atau lebih baru.

  • AnalyticDB for MySQL kluster dan ruang kerja Flink yang sepenuhnya dikelola berada dalam VPC yang sama.

  • Anda telah menambahkan blok CIDR dari ruang kerja Flink ke daftar putih AnalyticDB for MySQL.

Batasan

  • Fitur binary logging tidak dapat diaktifkan untuk tabel XUANWU_V2. Oleh karena itu, Anda tidak dapat menggunakan langganan log biner untuk melakukan sinkronisasi data atau komputasi aliran pada tabel XUANWU_V2 di kluster AnalyticDB for MySQL.

  • Flink hanya dapat memproses log biner dari AnalyticDB for MySQL untuk tipe data dasar dan tipe data JSON kompleks.

  • Flink tidak memproses catatan dalam log biner AnalyticDB for MySQL yang terkait dengan operasi DDL atau operasi penghapusan partisi otomatis pada tabel partisi.

Langkah 1: Aktifkan fitur binary logging

  1. Aktifkan fitur binary logging untuk sebuah tabel di kluster AnalyticDB for MySQL sumber. Dalam contoh ini, digunakan tabel bernama source_table.

    Catatan

    Anda hanya dapat mengaktifkan fitur binary logging untuk tabel di AnalyticDB for MySQL.

    Aktifkan fitur binary logging saat membuat tabel

    CREATE TABLE source_table (
      `id` INT,
      `num` BIGINT,
      PRIMARY KEY (`id`)
    )DISTRIBUTED BY HASH (id) BINLOG=true;

    Aktifkan fitur binary logging untuk tabel yang sudah ada

    ALTER TABLE source_table BINLOG=true;
  2. (Opsional) Ubah periode retensi log biner.

    Anda dapat memodifikasi parameter binlog_ttl untuk mengubah periode retensi log biner. Nilai default parameter ini adalah 6h. Jalankan pernyataan berikut untuk mengubah periode retensi log biner menjadi 1 hari untuk tabel source_table:

    ALTER TABLE source_table binlog_ttl='1d';

    Parameter binlog_ttl mendukung nilai dalam format berikut:

    • Milidetik: angka murni. Misalnya, 60 berarti 60 milidetik.

    • Detik: angka + s. Misalnya, 30s berarti 30 detik.

    • Jam: angka + h. Misalnya, 2h berarti 2 jam.

    • Hari: angka + d. Misalnya, 1d berarti 1 hari.

    Catatan
    • Untuk kluster dengan versi kernel 3.2.1.9 atau lebih baru (untuk versi 3.2.1), 3.2.2.14 atau lebih baru (untuk versi 3.2.2), 3.2.3.8 atau lebih baru (untuk versi 3.2.3), 3.2.4.4 atau lebih baru (untuk versi 3.2.4), atau 3.2.5.1 atau lebih baru (untuk versi 3.2.5), periode retensi maksimum log biner adalah 365 hari. Untuk kluster dengan versi kernel yang lebih lama dari versi tersebut, periode retensi maksimum log biner adalah 21 hari.

    • Kami menyarankan agar Anda mengatur periode retensi log biner ke nilai yang lebih besar dari atau sama dengan nilai default parameter binlog_ttl. Jika Anda mengatur periode retensi ke nilai yang kecil, log biner mungkin dihapus dan sinkronisasi data gagal.

    • Untuk menanyakan periode retensi log biner saat ini, jalankan pernyataan SHOW CREATE TABLE source_table;.

Langkah 2: Unggah konektor AnalyticDB for MySQL ke Flink

  1. Unduh konektor.

  2. Masuk ke Konsol Realtime Compute for Apache Flink.

  3. Pada tab Fully Managed Flink, temukan ruang kerja yang ingin Anda kelola dan klik Console di kolom Actions.

  4. Di panel navigasi sebelah kiri, klik Connectors.

  5. Pada halaman Connectors, klik Create Custom Connector.

  6. Unggah konektor yang telah Anda unduh dan klik Next.

  7. Klik Finish. Konektor kustom akan muncul dalam daftar konektor.

Langkah 3: Berlangganan log biner

  1. Masuk ke Konsol Realtime Compute for Apache Flink dan buat pekerjaan SQL.

  2. Buat tabel sumber untuk terhubung ke AnalyticDB for MySQL dan membaca data log biner dari tabel tertentu (source_table).

    Catatan
    • Kunci primer yang didefinisikan dalam pernyataan DDL Flink harus identik dengan kunci primer di tabel fisik kluster AnalyticDB for MySQL, termasuk nama kuncinya. Jika tidak identik, kebenaran data akan terpengaruh.

    • Tipe data di Flink harus kompatibel dengan tipe data di AnalyticDB for MySQL. Untuk informasi selengkapnya, lihat Pemetaan tipe.

    CREATE TEMPORARY TABLE adb_source (
      `id` INT,
      `num` BIGINT,
      PRIMARY KEY (`id`) NOT ENFORCED
    ) WITH (
      'connector' = 'adb-mysql-cdc',
      'hostname' = 'amv-2zepb9n1l58ct01z50000****.ads.aliyuncs.com',
      'username' = 'testUser',
      'password' = 'Test12****',
      'database-name' = 'binlog',
      'table-name' = 'source_table'
    );

    Tabel berikut menjelaskan parameter dalam klausa WITH.

    Parameter

    Wajib

    Nilai default

    Tipe data

    Deskripsi

    connector

    Ya

    Tidak ada

    STRING

    Konektor yang digunakan.

    Parameter ini wajib. Tetapkan nilainya ke adb-mysql-cdc.

    hostname

    Ya

    Tidak ada

    STRING

    Titik akhir VPC AnalyticDB for MySQL.

    username

    Ya

    Tidak ada

    STRING

    Akun database AnalyticDB for MySQL.

    password

    Ya

    Tidak ada

    STRING

    Kata sandi akun database AnalyticDB for MySQL.

    database-name

    Ya

    Tidak ada

    STRING

    Nama database AnalyticDB for MySQL.

    Karena AnalyticDB for MySQL menerapkan binary logging tingkat tabel, Anda hanya dapat menentukan satu database.

    table-name

    Ya

    Tidak ada

    STRING

    Nama tabel di database AnalyticDB for MySQL.

    Karena AnalyticDB for MySQL menerapkan binary logging tingkat tabel, Anda hanya dapat menentukan satu tabel.

    port

    Tidak

    3306

    INTEGER

    Nomor port.

    scan.incremental.snapshot.enabled

    Tidak

    true

    BOOLEAN

    Snapshot inkremental.

    Fitur ini diaktifkan secara default. Snapshot inkremental adalah mekanisme baru untuk membaca snapshot tabel. Dibandingkan dengan mekanisme snapshot lama, mekanisme snapshot inkremental memberikan manfaat berikut:

    • Saat membaca snapshot, sumber mendukung pembacaan konkuren.

    • Saat membaca snapshot, sumber mendukung checkpoint pada granularitas chunk.

    • Sebelum membaca snapshot, sumber tidak perlu mendapatkan izin kunci database.

    scan.incremental.snapshot.chunk.size

    Tidak

    8096

    INTEGER

    Ukuran chunk snapshot tabel, yaitu jumlah baris yang dikandung oleh satu chunk.

    Saat pembacaan snapshot inkremental diaktifkan, tabel dibagi menjadi beberapa chunk untuk dibaca.

    scan.snapshot.fetch.size

    Tidak

    1024

    INTEGER

    Jumlah maksimum baris yang dapat dibaca setiap kali membaca snapshot tabel.

    scan.startup.mode

    Tidak

    initial

    STRING

    Mode startup untuk konsumsi data.

    Nilai yang valid:

    • initial (default): Saat pekerjaan pertama kali dijalankan, sistem memindai semua data historis lalu membaca data log biner terbaru.

    • earliest-offset: Pekerjaan tidak memindai data historis dan mulai membaca data dari log biner paling awal yang tersedia.

    • specific-offset: Tidak memindai data historis lengkap dan dimulai dari offset log biner yang Anda tentukan. Anda dapat menentukan offset ini dengan mengonfigurasi kedua parameter scan.startup.specific-offset.file dan scan.startup.specific-offset.pos untuk menentukan file log biner awal dan offset-nya.

    • latest-offset: Saat pekerjaan pertama kali dijalankan, sistem tidak memindai data historis dan mulai membaca data dari akhir log biner (log biner terbaru). Artinya, pekerjaan hanya membaca perubahan terbaru yang terjadi setelah konektor dijalankan.

    • timestamp: Tidak memindai data historis lengkap. Konektor mulai membaca log biner dari timestamp tertentu. Timestamp ditentukan dalam milidetik (ms) melalui parameter scan.startup.timestamp-millis.

    Penting

    Saat menggunakan mode startup earliest-offset, specific-offset, atau timestamp, pastikan skema tabel yang sesuai tetap tidak berubah dari posisi konsumsi log biner yang ditentukan hingga waktu pekerjaan dimulai. Hal ini mencegah kegagalan pekerjaan akibat evolusi skema.

    scan.startup.specific-offset.file

    Tidak

    Tidak ada

    STRING

    Dalam mode startup specific-offset, parameter ini menentukan nama file log biner pada offset awal.

    Untuk mendapatkan nama file log biner terbaru, jalankan pernyataan SHOW MASTER STATUS for table_name;.

    scan.startup.specific-offset.pos

    Tidak

    Tidak ada

    LONG

    Dalam mode startup specific-offset, parameter ini menentukan posisi dalam file log biner pada offset awal.

    Anda dapat menjalankan perintah SHOW MASTER STATUS for table_name; untuk mengambil posisi log biner terbaru.

    scan.startup.specific-offset.skip-events

    Tidak

    Tidak ada

    LONG

    Jumlah event yang dilewati setelah offset awal yang ditentukan.

    scan.startup.specific-offset.skip-rows

    Tidak

    Tidak ada

    LONG

    Jumlah baris data yang dilewati setelah offset awal yang ditentukan.

    scan.startup.timestamp-millis

    Tidak

    Tidak ada

    LONG

    Saat menggunakan mode waktu tertentu untuk memulai pekerjaan, parameter ini menentukan offset awal dalam milidetik.

    Saat menggunakan konfigurasi ini, scan.startup.mode harus diatur ke timestamp. Timestamp dalam satuan milidetik (ms).

    server-time-zone

    Tidak

    Tidak ada

    STRING

    Zona waktu sesi pada server database.

    Contoh: "Asia/Shanghai". Parameter ini mengontrol, di AnalyticDB for MySQL, bagaimana tipe TIMESTAMP dikonversi ke tipe STRING. Jika parameter ini tidak diatur, ZONELD.SYSTEMDEFAULT() digunakan untuk menentukan zona waktu server.

    debezium.min.row.count.to.stream.result

    Tidak

    1000

    INTEGER

    Jika jumlah baris dalam tabel lebih besar dari nilai ini, konektor akan melakukan streaming hasilnya.

    Jika Anda mengatur parameter ini ke 0, semua pemeriksaan ukuran tabel dilewati, dan semua hasil selalu di-streaming selama snapshot.

    connect.timeout

    Tidak

    30s

    DURATION

    Waktu maksimum menunggu koneksi ke server database hingga timeout sebelum sistem mencoba koneksi ulang.

    Unit default adalah detik (s).

    connect.max-retries

    Tidak

    3

    INTEGER

    Jumlah maksimum percobaan ulang setelah koneksi ke layanan database gagal.

  3. Buat tabel tujuan untuk menyimpan data yang telah diproses. Contoh ini menggunakan AnalyticDB for MySQL sebagai tujuan. Untuk informasi selengkapnya tentang konektor yang didukung oleh Flink, lihat Konektor yang didukung.

    CREATE TABLE target_table (
      `id` INT,
      `num` BIGINT,
      PRIMARY KEY (`id`)
    )
  4. Buat tabel sink untuk terhubung ke tabel tujuan yang Anda buat pada langkah sebelumnya. Tabel sink menulis data yang telah diproses ke tabel tertentu di AnalyticDB for MySQL.

    CREATE TEMPORARY TABLE adb_sink (
      `id` INT,
      `num` BIGINT,
      PRIMARY KEY (`id`) NOT ENFORCED
    ) WITH (
      'connector' = 'adb3.0',
      'url' = 'jdbc:mysql://amv-2zepb9n1l58ct01z50000****.ads.aliyuncs.com:3306/flinktest',
      'userName' = 'testUser',
      'password' = 'Test12****',
      'tableName' = 'target_table'
    );

    Untuk informasi selengkapnya tentang parameter WITH dan pemetaan tipe untuk tabel sink, lihat Konektor AnalyticDB for MySQL V3.0.

  5. Sinkronkan perubahan data sumber yang ditangkap ke tabel sink. Tabel sink kemudian menulis data ke tujuan.

    INSERT INTO adb_sink
    SELECT * FROM adb_source;
  6. Klik Save.

  7. Klik Depth Check.

    Fitur validasi memeriksa semantik SQL pekerjaan, konektivitas jaringan, dan metadata tabel yang digunakan. Anda juga dapat mengklik SQL Optimization di area hasil untuk melihat peringatan risiko SQL dan saran optimasi.

  8. (Opsional) Klik Debug.

    Anda dapat menggunakan fitur debugging pekerjaan untuk mensimulasikan eksekusi pekerjaan, memeriksa hasil keluaran, dan memverifikasi logika bisnis pernyataan SELECT atau INSERT. Hal ini meningkatkan efisiensi pengembangan dan mengurangi risiko kualitas data.

  9. Klik Deploy.

    Setelah mengembangkan dan memvalidasi pekerjaan, Anda dapat mendeploy pekerjaan ke lingkungan produksi. Setelah pekerjaan dideploy, Anda dapat membuka halaman O&M untuk menjalankan pekerjaan.

  10. (Opsional) Lihat informasi tentang log biner.

    Catatan

    Setelah Anda menjalankan pernyataan SQL berikut untuk menanyakan informasi log biner, nilai 0 akan dikembalikan jika Anda hanya mengaktifkan fitur binary logging. Informasi log hanya ditampilkan setelah Anda berlangganan log biner.

    • Untuk menanyakan nama file dan lokasi log biner terbaru, jalankan pernyataan berikut:

      SHOW MASTER STATUS FOR source_table;
    • Untuk menanyakan semua log biner historis yang belum dihapus dan ukurannya, jalankan pernyataan berikut:

      SHOW BINARY LOGS FOR source_table;

Pemetaan tipe

Tabel berikut menjelaskan pemetaan tipe data antara AnalyticDB for MySQL dan Flink.

AnalyticDB for MySQL tipe bidang

Flink tipe bidang

BOOLEAN

BOOLEAN

TINYINT

TINYINT

SMALLINT

SMALLINT

INT

INT

BIGINT

BIGINT

FLOAT

FLOAT

DOUBLE

DOUBLE

DECIMAL(p,s) atau NUMERIC(p,s)

DECIMAL(p,s)

VARCHAR

STRING

BINARY

BYTES

DATE

DATE

TIME

TIME

DATETIME

TIMESTAMP

TIMESTAMP

TIMESTAMP

POINT

STRING

JSON

STRING