Realtime Compute for Apache Flink dapat berlangganan ke AnalyticDB for MySQL untuk menangkap dan memproses data perubahan database secara real time, sehingga memungkinkan sinkronisasi data dan komputasi aliran yang efisien. Topik ini menjelaskan cara menggunakan Flink untuk berlangganan ke log biner AnalyticDB for MySQL.
Prasyarat
Kluster AnalyticDB for MySQL merupakan Enterprise Edition, Basic Edition, Data Lakehouse Edition, atau Data Warehouse Edition dalam mode elastis.
Versi minor kluster AnalyticDB for MySQL adalah 3.2.1.0 atau lebih baru.
CatatanUntuk melihat dan memperbarui versi minor kluster AnalyticDB for MySQL, masuk ke Konsol AnalyticDB for MySQL dan buka bagian Configuration Information pada halaman Cluster Information.
Mesin komputasi real-time Flink adalah Ververica Runtime (VVR) 8.0.4 atau lebih baru.
AnalyticDB for MySQL kluster dan ruang kerja Flink yang sepenuhnya dikelola berada dalam VPC yang sama.
Anda telah menambahkan blok CIDR dari ruang kerja Flink ke daftar putih AnalyticDB for MySQL.
Batasan
Fitur binary logging tidak dapat diaktifkan untuk tabel XUANWU_V2. Oleh karena itu, Anda tidak dapat menggunakan langganan log biner untuk melakukan sinkronisasi data atau komputasi aliran pada tabel XUANWU_V2 di kluster AnalyticDB for MySQL.
Flink hanya dapat memproses log biner dari AnalyticDB for MySQL untuk tipe data dasar dan tipe data JSON kompleks.
Flink tidak memproses catatan dalam log biner AnalyticDB for MySQL yang terkait dengan operasi DDL atau operasi penghapusan partisi otomatis pada tabel partisi.
Langkah 1: Aktifkan fitur binary logging
Aktifkan fitur binary logging untuk sebuah tabel di kluster AnalyticDB for MySQL sumber. Dalam contoh ini, digunakan tabel bernama source_table.
CatatanAnda hanya dapat mengaktifkan fitur binary logging untuk tabel di AnalyticDB for MySQL.
Aktifkan fitur binary logging saat membuat tabel
CREATE TABLE source_table ( `id` INT, `num` BIGINT, PRIMARY KEY (`id`) )DISTRIBUTED BY HASH (id) BINLOG=true;Aktifkan fitur binary logging untuk tabel yang sudah ada
ALTER TABLE source_table BINLOG=true;(Opsional) Ubah periode retensi log biner.
Anda dapat memodifikasi parameter
binlog_ttluntuk mengubah periode retensi log biner. Nilai default parameter ini adalah 6h. Jalankan pernyataan berikut untuk mengubah periode retensi log biner menjadi 1 hari untuk tabel source_table:ALTER TABLE source_table binlog_ttl='1d';Parameter
binlog_ttlmendukung nilai dalam format berikut:Milidetik: angka murni. Misalnya,
60berarti 60 milidetik.Detik: angka + s. Misalnya,
30sberarti 30 detik.Jam: angka + h. Misalnya,
2hberarti 2 jam.Hari: angka + d. Misalnya,
1dberarti 1 hari.
CatatanUntuk kluster dengan versi kernel 3.2.1.9 atau lebih baru (untuk versi 3.2.1), 3.2.2.14 atau lebih baru (untuk versi 3.2.2), 3.2.3.8 atau lebih baru (untuk versi 3.2.3), 3.2.4.4 atau lebih baru (untuk versi 3.2.4), atau 3.2.5.1 atau lebih baru (untuk versi 3.2.5), periode retensi maksimum log biner adalah 365 hari. Untuk kluster dengan versi kernel yang lebih lama dari versi tersebut, periode retensi maksimum log biner adalah 21 hari.
Kami menyarankan agar Anda mengatur periode retensi log biner ke nilai yang lebih besar dari atau sama dengan nilai default parameter
binlog_ttl. Jika Anda mengatur periode retensi ke nilai yang kecil, log biner mungkin dihapus dan sinkronisasi data gagal.Untuk menanyakan periode retensi log biner saat ini, jalankan pernyataan
SHOW CREATE TABLE source_table;.
Langkah 2: Unggah konektor AnalyticDB for MySQL ke Flink
Unduh konektor.
Masuk ke Konsol Realtime Compute for Apache Flink.
Pada tab Fully Managed Flink, temukan ruang kerja yang ingin Anda kelola dan klik Console di kolom Actions.
Di panel navigasi sebelah kiri, klik Connectors.
Pada halaman Connectors, klik Create Custom Connector.
Unggah konektor yang telah Anda unduh dan klik Next.
Klik Finish. Konektor kustom akan muncul dalam daftar konektor.
Langkah 3: Berlangganan log biner
Masuk ke Konsol Realtime Compute for Apache Flink dan buat pekerjaan SQL.
Buat tabel sumber untuk terhubung ke AnalyticDB for MySQL dan membaca data log biner dari tabel tertentu (source_table).
CatatanKunci primer yang didefinisikan dalam pernyataan DDL Flink harus identik dengan kunci primer di tabel fisik kluster AnalyticDB for MySQL, termasuk nama kuncinya. Jika tidak identik, kebenaran data akan terpengaruh.
Tipe data di Flink harus kompatibel dengan tipe data di AnalyticDB for MySQL. Untuk informasi selengkapnya, lihat Pemetaan tipe.
CREATE TEMPORARY TABLE adb_source ( `id` INT, `num` BIGINT, PRIMARY KEY (`id`) NOT ENFORCED ) WITH ( 'connector' = 'adb-mysql-cdc', 'hostname' = 'amv-2zepb9n1l58ct01z50000****.ads.aliyuncs.com', 'username' = 'testUser', 'password' = 'Test12****', 'database-name' = 'binlog', 'table-name' = 'source_table' );Tabel berikut menjelaskan parameter dalam klausa WITH.
Parameter
Wajib
Nilai default
Tipe data
Deskripsi
connector
Ya
Tidak ada
STRING
Konektor yang digunakan.
Parameter ini wajib. Tetapkan nilainya ke
adb-mysql-cdc.hostname
Ya
Tidak ada
STRING
Titik akhir VPC AnalyticDB for MySQL.
username
Ya
Tidak ada
STRING
Akun database AnalyticDB for MySQL.
password
Ya
Tidak ada
STRING
Kata sandi akun database AnalyticDB for MySQL.
database-name
Ya
Tidak ada
STRING
Nama database AnalyticDB for MySQL.
Karena AnalyticDB for MySQL menerapkan binary logging tingkat tabel, Anda hanya dapat menentukan satu database.
table-name
Ya
Tidak ada
STRING
Nama tabel di database AnalyticDB for MySQL.
Karena AnalyticDB for MySQL menerapkan binary logging tingkat tabel, Anda hanya dapat menentukan satu tabel.
port
Tidak
3306
INTEGER
Nomor port.
scan.incremental.snapshot.enabled
Tidak
true
BOOLEAN
Snapshot inkremental.
Fitur ini diaktifkan secara default. Snapshot inkremental adalah mekanisme baru untuk membaca snapshot tabel. Dibandingkan dengan mekanisme snapshot lama, mekanisme snapshot inkremental memberikan manfaat berikut:
Saat membaca snapshot, sumber mendukung pembacaan konkuren.
Saat membaca snapshot, sumber mendukung checkpoint pada granularitas chunk.
Sebelum membaca snapshot, sumber tidak perlu mendapatkan izin kunci database.
scan.incremental.snapshot.chunk.size
Tidak
8096
INTEGER
Ukuran chunk snapshot tabel, yaitu jumlah baris yang dikandung oleh satu chunk.
Saat pembacaan snapshot inkremental diaktifkan, tabel dibagi menjadi beberapa chunk untuk dibaca.
scan.snapshot.fetch.size
Tidak
1024
INTEGER
Jumlah maksimum baris yang dapat dibaca setiap kali membaca snapshot tabel.
scan.startup.mode
Tidak
initial
STRING
Mode startup untuk konsumsi data.
Nilai yang valid:
initial (default): Saat pekerjaan pertama kali dijalankan, sistem memindai semua data historis lalu membaca data log biner terbaru.
earliest-offset: Pekerjaan tidak memindai data historis dan mulai membaca data dari log biner paling awal yang tersedia.
specific-offset: Tidak memindai data historis lengkap dan dimulai dari offset log biner yang Anda tentukan. Anda dapat menentukan offset ini dengan mengonfigurasi kedua parameter
scan.startup.specific-offset.filedanscan.startup.specific-offset.posuntuk menentukan file log biner awal dan offset-nya.latest-offset: Saat pekerjaan pertama kali dijalankan, sistem tidak memindai data historis dan mulai membaca data dari akhir log biner (log biner terbaru). Artinya, pekerjaan hanya membaca perubahan terbaru yang terjadi setelah konektor dijalankan.
timestamp: Tidak memindai data historis lengkap. Konektor mulai membaca log biner dari timestamp tertentu. Timestamp ditentukan dalam milidetik (ms) melalui parameter
scan.startup.timestamp-millis.
PentingSaat menggunakan mode startup earliest-offset, specific-offset, atau timestamp, pastikan skema tabel yang sesuai tetap tidak berubah dari posisi konsumsi log biner yang ditentukan hingga waktu pekerjaan dimulai. Hal ini mencegah kegagalan pekerjaan akibat evolusi skema.
scan.startup.specific-offset.file
Tidak
Tidak ada
STRING
Dalam mode startup specific-offset, parameter ini menentukan nama file log biner pada offset awal.
Untuk mendapatkan nama file log biner terbaru, jalankan pernyataan
SHOW MASTER STATUS for table_name;.scan.startup.specific-offset.pos
Tidak
Tidak ada
LONG
Dalam mode startup specific-offset, parameter ini menentukan posisi dalam file log biner pada offset awal.
Anda dapat menjalankan perintah
SHOW MASTER STATUS for table_name;untuk mengambil posisi log biner terbaru.scan.startup.specific-offset.skip-events
Tidak
Tidak ada
LONG
Jumlah event yang dilewati setelah offset awal yang ditentukan.
scan.startup.specific-offset.skip-rows
Tidak
Tidak ada
LONG
Jumlah baris data yang dilewati setelah offset awal yang ditentukan.
scan.startup.timestamp-millis
Tidak
Tidak ada
LONG
Saat menggunakan mode waktu tertentu untuk memulai pekerjaan, parameter ini menentukan offset awal dalam milidetik.
Saat menggunakan konfigurasi ini,
scan.startup.modeharus diatur ke timestamp. Timestamp dalam satuan milidetik (ms).server-time-zone
Tidak
Tidak ada
STRING
Zona waktu sesi pada server database.
Contoh: "Asia/Shanghai". Parameter ini mengontrol, di AnalyticDB for MySQL, bagaimana tipe TIMESTAMP dikonversi ke tipe STRING. Jika parameter ini tidak diatur,
ZONELD.SYSTEMDEFAULT()digunakan untuk menentukan zona waktu server.debezium.min.row.count.to.stream.result
Tidak
1000
INTEGER
Jika jumlah baris dalam tabel lebih besar dari nilai ini, konektor akan melakukan streaming hasilnya.
Jika Anda mengatur parameter ini ke
0, semua pemeriksaan ukuran tabel dilewati, dan semua hasil selalu di-streaming selama snapshot.connect.timeout
Tidak
30s
DURATION
Waktu maksimum menunggu koneksi ke server database hingga timeout sebelum sistem mencoba koneksi ulang.
Unit default adalah detik (s).
connect.max-retries
Tidak
3
INTEGER
Jumlah maksimum percobaan ulang setelah koneksi ke layanan database gagal.
Buat tabel tujuan untuk menyimpan data yang telah diproses. Contoh ini menggunakan AnalyticDB for MySQL sebagai tujuan. Untuk informasi selengkapnya tentang konektor yang didukung oleh Flink, lihat Konektor yang didukung.
CREATE TABLE target_table ( `id` INT, `num` BIGINT, PRIMARY KEY (`id`) )Buat tabel sink untuk terhubung ke tabel tujuan yang Anda buat pada langkah sebelumnya. Tabel sink menulis data yang telah diproses ke tabel tertentu di AnalyticDB for MySQL.
CREATE TEMPORARY TABLE adb_sink ( `id` INT, `num` BIGINT, PRIMARY KEY (`id`) NOT ENFORCED ) WITH ( 'connector' = 'adb3.0', 'url' = 'jdbc:mysql://amv-2zepb9n1l58ct01z50000****.ads.aliyuncs.com:3306/flinktest', 'userName' = 'testUser', 'password' = 'Test12****', 'tableName' = 'target_table' );Untuk informasi selengkapnya tentang parameter WITH dan pemetaan tipe untuk tabel sink, lihat Konektor AnalyticDB for MySQL V3.0.
Sinkronkan perubahan data sumber yang ditangkap ke tabel sink. Tabel sink kemudian menulis data ke tujuan.
INSERT INTO adb_sink SELECT * FROM adb_source;Klik Save.
Klik Depth Check.
Fitur validasi memeriksa semantik SQL pekerjaan, konektivitas jaringan, dan metadata tabel yang digunakan. Anda juga dapat mengklik SQL Optimization di area hasil untuk melihat peringatan risiko SQL dan saran optimasi.
(Opsional) Klik Debug.
Anda dapat menggunakan fitur debugging pekerjaan untuk mensimulasikan eksekusi pekerjaan, memeriksa hasil keluaran, dan memverifikasi logika bisnis pernyataan SELECT atau INSERT. Hal ini meningkatkan efisiensi pengembangan dan mengurangi risiko kualitas data.
Klik Deploy.
Setelah mengembangkan dan memvalidasi pekerjaan, Anda dapat mendeploy pekerjaan ke lingkungan produksi. Setelah pekerjaan dideploy, Anda dapat membuka halaman O&M untuk menjalankan pekerjaan.
(Opsional) Lihat informasi tentang log biner.
CatatanSetelah Anda menjalankan pernyataan SQL berikut untuk menanyakan informasi log biner, nilai 0 akan dikembalikan jika Anda hanya mengaktifkan fitur binary logging. Informasi log hanya ditampilkan setelah Anda berlangganan log biner.
Untuk menanyakan nama file dan lokasi log biner terbaru, jalankan pernyataan berikut:
SHOW MASTER STATUS FOR source_table;Untuk menanyakan semua log biner historis yang belum dihapus dan ukurannya, jalankan pernyataan berikut:
SHOW BINARY LOGS FOR source_table;
Pemetaan tipe
Tabel berikut menjelaskan pemetaan tipe data antara AnalyticDB for MySQL dan Flink.
AnalyticDB for MySQL tipe bidang | Flink tipe bidang |
BOOLEAN | BOOLEAN |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INT | INT |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DECIMAL(p,s) atau NUMERIC(p,s) | DECIMAL(p,s) |
VARCHAR | STRING |
BINARY | BYTES |
DATE | DATE |
TIME | TIME |
DATETIME | TIMESTAMP |
TIMESTAMP | TIMESTAMP |
POINT | STRING |
JSON | STRING |