Topik ini menjelaskan cara menyinkronkan seluruh database MySQL ke Kafka. Metode ini membantu mengurangi beban yang ditimbulkan oleh beberapa job terhadap database MySQL.
Informasi latar belakang
Tabel Change Data Capture (CDC) MySQL mengambil data dari MySQL dan menyinkronkan modifikasi tabel secara real time. Tabel-tabel ini sering digunakan untuk komputasi kompleks. Misalnya, Anda dapat menggunakan tabel CDC MySQL sebagai tabel dimensi dan menggabungkannya (join) dengan tabel data lainnya. Satu tabel MySQL dapat menjadi dependensi bagi beberapa job. Saat job-job tersebut memproses data dari tabel yang sama, database MySQL akan membuat beberapa koneksi, sehingga menciptakan beban berat pada server dan jaringan MySQL.
Arsitektur solusi
Untuk mengurangi beban pada database MySQL hulu, Realtime Compute for Apache Flink memungkinkan Anda menyinkronkan seluruh database MySQL ke Kafka. Solusi ini menggunakan Kafka sebagai lapisan perantara. Sebuah job ingesti data Flink CDC menyinkronkan data ke Kafka.
Satu Pekerjaan menyinkronkan data dari database MySQL upstream ke Kafka secara real-time. Setiap tabel MySQL dituliskan ke topik Kafka yang sesuai dalam mode upsert. Pekerjaan downstream kemudian menggunakan konektor Upsert Kafka untuk membaca data dari topik-topik tersebut, bukan dengan mengakses tabel MySQL secara langsung. Pendekatan ini secara signifikan mengurangi beban pada database MySQL.

Batasan
Tabel MySQL yang akan disinkronkan harus memiliki primary key.
Anda dapat menggunakan cluster Kafka yang dikelola sendiri, cluster EMR Kafka, atau ApsaraMQ for Kafka. Saat menggunakan ApsaraMQ for Kafka, Anda harus menggunakan endpoint default.
Cluster Kafka harus memiliki ruang penyimpanan yang lebih besar daripada tabel sumber. Jika tidak, kehilangan data dapat terjadi akibat ruang penyimpanan yang tidak mencukupi. Topik yang dibuat untuk sinkronisasi database dikompaksi. Pada topik yang dikompaksi, hanya pesan terbaru untuk setiap kunci yang dipertahankan, dan datanya tidak pernah kedaluwarsa. Artinya, topik tersebut menyimpan jumlah data yang kira-kira sama dengan ukuran tabel sumber.
Kasus penggunaan
Pertimbangkan analisis real-time terhadap ulasan pesanan. Anda memiliki tiga tabel: user, order, dan feedback. Gambar berikut menunjukkan data dalam tabel-tabel tersebut.
Untuk menampilkan detail pesanan dan ulasan pengguna, Anda harus melakukan join tabel `user` untuk mengambil username dari field `name`. Pernyataan SQL berikut menunjukkan cara melakukannya.
-- Gabungkan informasi pesanan dengan tabel user untuk menampilkan username dan nama produk untuk setiap pesanan.
SELECT order.id as order_id, product, user.name as user_name
FROM order LEFT JOIN user
ON order.user_id = user.id;
-- Gabungkan ulasan dengan tabel user untuk menampilkan isi setiap ulasan dan username yang sesuai.
SELECT feedback.id as feedback_id, comment, user.name as user_name
FROM feedback LEFT JOIN user
ON feedback.user_id = user.id;Dalam kedua task SQL tersebut, tabel user digunakan oleh kedua job. Saat runtime, masing-masing job membaca data lengkap dan data inkremental dari MySQL. Pembacaan data lengkap memerlukan koneksi ke MySQL, sedangkan pembacaan data inkremental memerlukan client binary logging (Binlog). Seiring peningkatan jumlah job, kebutuhan akan koneksi MySQL dan sumber daya client Binlog juga meningkat, sehingga memberikan beban berat pada database hulu. Untuk mengurangi beban tersebut, Anda dapat menggunakan job ingest data Flink CDC untuk menyinkronkan data MySQL hulu ke Kafka secara real-time. Data tersebut kemudian tersedia untuk dikonsumsi oleh beberapa job hilir.
Prasyarat
Realtime Compute for Apache Flink telah diaktifkan. Untuk informasi selengkapnya, lihat Aktifkan Realtime Compute for Apache Flink.
ApsaraMQ for Kafka telah diaktifkan. Untuk informasi selengkapnya, lihat Deploy instans ApsaraMQ for Kafka.
ApsaraDB RDS for MySQL telah diaktifkan. Untuk informasi selengkapnya, lihat Buat instans ApsaraDB RDS for MySQL.
Realtime Compute for Apache Flink, ApsaraDB RDS for MySQL, dan ApsaraMQ for Kafka harus berada dalam VPC yang sama. Jika berada di VPC berbeda, Anda harus mengaktifkan komunikasi jaringan cross-VPC atau mengaksesnya melalui Internet. Untuk informasi selengkapnya, lihat Bagaimana cara mengakses layanan lain lintas VPC? dan Bagaimana cara mengakses Internet?.
Jika Anda menggunakan identitas seperti Pengguna Resource Access Management (RAM) atau Peran RAM, pastikan identitas tersebut memiliki izin yang diperlukan untuk mengakses resource.
Persiapan
Buat instans ApsaraDB RDS for MySQL dan siapkan sumber data
Buat database. Untuk informasi selengkapnya, lihat Buat database.
Buat database bernama
order_dwuntuk instans tujuan.Siapkan sumber data MySQL CDC.
Pada halaman produk instans, klik Log on to Database di bagian atas.
Pada halaman login DMS yang muncul, masukkan username dan password untuk akun database yang telah Anda buat, lalu klik Logon.
Setelah login, double-click database
order_dwdi sebelah kiri untuk beralih ke database tersebut.Pada SQL Console, tulis pernyataan DDL untuk membuat tiga tabel bisnis dan pernyataan untuk memasukkan data.
CREATE TABLE `user` ( id bigint not null primary key, name varchar(50) not null ); CREATE TABLE `order` ( id bigint not null primary key, product varchar(50) not null, user_id bigint not null ); CREATE TABLE `feedback` ( id bigint not null primary key, user_id bigint not null, comment varchar(50) not null ); -- Siapkan data INSERT INTO `user` VALUES(1, 'Tom'),(2, 'Jerry'); INSERT INTO `order` VALUES (1, 'Football', 2), (2, 'Basket', 1); INSERT INTO `feedback` VALUES (1, 1, 'Good.'), (2, 2, 'Very good');
Klik Execute, lalu klik Execute Directly.
Prosedur
Buat dan jalankan tugas ingesti data Flink CDC untuk menyinkronkan data dari database MySQL upstream ke Kafka secara real-time, sehingga data tersedia untuk berbagai pekerjaan downstream. Pekerjaan tersebut secara otomatis membuat topik. Anda dapat menentukan nama topik menggunakan modul `route`. Jumlah partisi dan replika topik mengikuti pengaturan default Kluster Kafka, sedangkan `cleanup.policy` diatur ke `compact`.
Nama topik default
Task sinkronisasi database penuh membuat topik Kafka menggunakan format penamaan default yang menggabungkan nama database MySQL dan nama tabel, dipisahkan oleh titik. Misalnya, sebuah job dapat membuat topik seperti order_dw.user, order_dw.order, dan order_dw.feedback.
Pada halaman , buat job ingesti data Flink CDC baru, lalu salin kode berikut ke editor YAML.
source: type: mysql name: MySQL Source hostname: #{hostname} port: 3306 username: #{usernmae} password: #{password} tables: order_dw.\.* server-id: 28601-28604 # (Opsional) Sinkronkan data dari tabel yang baru dibuat selama fase inkremental. scan.binlog.newly-added-table.enabled: true # (Opsional) Sinkronkan komentar tabel dan field. include-comments.enabled: true # (Opsional) Utamakan chunk tak terbatas untuk mencegah potensi masalah OutOfMemory pada TaskManager. scan.incremental.snapshot.unbounded-chunk-first.enabled: true # (Opsional) Aktifkan filter parsing untuk mempercepat pembacaan. scan.only.deserialize.captured.tables.changelog.enabled: true sink: type: upsert-kafka name: upsert-kafka Sink properties.bootstrap.servers: xxxx.alikafka.aliyuncs.com:9092 # Parameter berikut diperlukan untuk ApsaraMQ for Kafka aliyun.kafka.accessKeyId: #{ak} aliyun.kafka.accessKeySecret: #{sk} aliyun.kafka.instanceId: #{instanceId} aliyun.kafka.endpoint: #{endpoint} aliyun.kafka.regionId: #{regionId}Di pojok kanan atas, klik Deploy untuk mendeploy job.
Pada panel navigasi kiri, pilih . Pada kolom Actions job target, klik Start. Pilih Stateless Start, lalu klik Start.
Tentukan nama topik
Anda dapat menggunakan modul `route` dalam task sinkronisasi database untuk menentukan nama topik untuk setiap tabel. Job berikut membuat tiga topik: `user1`, `order2`, dan `feedback3`.
Pada halaman , buat job ingesti data Flink CDC baru, lalu salin kode berikut ke editor YAML.
source: type: mysql name: MySQL Source hostname: #{hostname} port: 3306 username: #{usernmae} password: #{password} tables: order_dw.\.* server-id: 28601-28604 # (Opsional) Sinkronkan data dari tabel yang baru dibuat selama fase inkremental. scan.binlog.newly-added-table.enabled: true # (Opsional) Sinkronkan komentar tabel dan field. include-comments.enabled: true # (Opsional) Utamakan chunk tak terbatas untuk mencegah potensi masalah OutOfMemory pada TaskManager. scan.incremental.snapshot.unbounded-chunk-first.enabled: true # (Opsional) Aktifkan filter parsing untuk mempercepat pembacaan. scan.only.deserialize.captured.tables.changelog.enabled: true route: - source-table: order_dw.user sink-table: user1 - source-table: order_dw.order sink-table: order2 - source-table: order_dw.feedback sink-table: feedback3 sink: type: upsert-kafka name: upsert-kafka Sink properties.bootstrap.servers: xxxx.alikafka.aliyuncs.com:9092 # Parameter berikut diperlukan untuk ApsaraMQ for Kafka aliyun.kafka.accessKeyId: #{ak} aliyun.kafka.accessKeySecret: #{sk} aliyun.kafka.instanceId: #{instanceId} aliyun.kafka.endpoint: #{endpoint} aliyun.kafka.regionId: #{regionId}Di pojok kanan atas, klik Deploy untuk mendeploy job.
Pada panel navigasi kiri, pilih . Pada kolom Actions job target, klik Start. Pilih Stateless Start, lalu klik Start.
Tentukan nama topik secara batch
Anda dapat menggunakan modul `route` dalam task sinkronisasi database untuk menentukan pola nama topik yang dihasilkan secara batch. Job berikut membuat tiga topik: `topic_user`, `topic_order`, dan `topic_feedback`.
Pada halaman , buat job ingesti data Flink CDC baru, lalu salin kode berikut ke editor YAML.
source: type: mysql name: MySQL Source hostname: #{hostname} port: 3306 username: #{usernmae} password: #{password} tables: order_dw.\.* server-id: 28601-28604 # (Opsional) Sinkronkan data dari tabel yang baru dibuat selama fase inkremental. scan.binlog.newly-added-table.enabled: true # (Opsional) Sinkronkan komentar tabel dan field. include-comments.enabled: true # (Opsional) Utamakan chunk tak terbatas untuk mencegah potensi masalah OutOfMemory pada TaskManager. scan.incremental.snapshot.unbounded-chunk-first.enabled: true # (Opsional) Aktifkan filter parsing untuk mempercepat pembacaan. scan.only.deserialize.captured.tables.changelog.enabled: true route: - source-table: order_dw.\.* sink-table: topic_<> replace-symbol: <> sink: type: upsert-kafka name: upsert-kafka Sink properties.bootstrap.servers: xxxx.alikafka.aliyuncs.com:9092 # Parameter berikut diperlukan untuk ApsaraMQ for Kafka aliyun.kafka.accessKeyId: #{ak} aliyun.kafka.accessKeySecret: #{sk} aliyun.kafka.instanceId: #{instanceId} aliyun.kafka.endpoint: #{endpoint} aliyun.kafka.regionId: #{regionId}Di pojok kanan atas, klik Deploy untuk mendeploy job.
Pada panel navigasi kiri, pilih . Pada kolom Actions job target, klik Start. Pilih Stateless Start, lalu klik Start.
Konsumsi data Kafka secara real time.
Data dari database MySQL sumber ditulis ke Kafka dalam format JSON. Satu topik Kafka dapat dikonsumsi oleh beberapa pekerjaan hilir yang membaca data dari topik tersebut untuk mendapatkan data tabel terbaru. Anda dapat mengonsumsi data dari tabel yang disinkronkan dengan dua cara berikut:
Konsumsi data langsung melalui katalog
Baca data dari topik Kafka sebagai tabel sumber.
Pada halaman , buat job stream SQL baru dan salin kode berikut ke editor SQL.
CREATE TEMPORARY TABLE print_user_proudct( order_id BIGINT, product STRING, user_name STRING ) WITH ( 'connector'='print', 'logger'='true' ); CREATE TEMPORARY TABLE print_user_feedback( feedback_id BIGINT, `comment` STRING, user_name STRING ) WITH ( 'connector'='print', 'logger'='true' ); BEGIN STATEMENT SET; -- Diperlukan saat menulis ke beberapa sink. -- Gabungkan informasi pesanan dengan tabel user dalam katalog JSON Kafka untuk menampilkan username dan nama produk untuk setiap pesanan. INSERT INTO print_user_proudct SELECT `order`.key_id as order_id, value_product as product, `user`.value_name as user_name FROM `kafka-catalog`.`kafka`.`order`/*+OPTIONS('properties.group.id'='<yourGroupName>', 'scan.startup.mode'='earliest-offset')*/ as `order` -- Tentukan group dan startup mode LEFT JOIN `kafka-catalog`.`kafka`.`user`/*+OPTIONS('properties.group.id'='<yourGroupName>', 'scan.startup.mode'='earliest-offset')*/ as `user` -- Tentukan group dan startup mode ON `order`.value_user_id = `user`.key_id; -- Gabungkan ulasan dengan tabel user untuk menampilkan isi setiap ulasan dan username yang sesuai. INSERT INTO print_user_feedback SELECT feedback.key_id as feedback_id, value_comment as `comment`, `user`.value_name as user_name FROM `kafka-catalog`.`kafka`.feedback/*+OPTIONS('properties.group.id'='<yourGroupName>', 'scan.startup.mode'='earliest-offset')*/ as feedback -- Tentukan group dan startup mode LEFT JOIN `kafka-catalog`.`kafka`.`user`/*+OPTIONS('properties.group.id'='<yourGroupName>', 'scan.startup.mode'='earliest-offset')*/ as `user` -- Tentukan group dan startup mode ON feedback.value_user_id = `user`.key_id; END; -- Diperlukan saat menulis ke beberapa sink.Contoh ini menggunakan konektor Print untuk mencetak hasil secara langsung. Anda juga dapat mengeluarkan hasil ke tabel sink konektor untuk analisis dan komputasi lebih lanjut. Untuk informasi selengkapnya tentang sintaks menulis ke beberapa sink, lihat Pernyataan INSERT INTO.
CatatanJika skema tabel MySQL diubah selama sinkronisasi data, skema yang diurai oleh katalog JSON Kafka mungkin tidak sinkron dengan skema tabel aktual. Misalnya, jika field dihapus dari tabel MySQL tetapi masih muncul dalam skema yang diurai dari katalog, nilai untuk field tersebut mungkin null.
Skema yang dibaca dari Katalog terdiri dari field-field dari data yang dikonsumsi. Jika suatu field dihapus tetapi pesan yang memuatnya belum kedaluwarsa, skema mungkin mencakup field yang sudah tidak ada lagi. Nilai field tersebut adalah null. Situasi ini tidak memerlukan penanganan khusus.
Di pojok kanan atas, klik Deploy untuk mendeploy job.
Pada panel navigasi kiri, pilih . Pada kolom Actions job target, klik Start. Pilih Stateless Start, lalu klik Start.
Konsumsi data dengan membuat tabel temporary
Definisikan skema kustom dan baca data dari tabel temporary.
Pada halaman , Anda dapat membuat job stream SQL baru dan salin kode berikut ke editor SQL.
CREATE TEMPORARY TABLE user_source ( key_id BIGINT, value_name STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'user', 'properties.bootstrap.servers' = '<yourKafkaBrokers>', 'scan.startup.mode' = 'earliest-offset', 'key.format' = 'json', 'value.format' = 'json', 'key.fields' = 'key_id', 'key.fields-prefix' = 'key_', 'value.fields-prefix' = 'value_', 'value.fields-include' = 'EXCEPT_KEY', 'value.json.infer-schema.flatten-nested-columns.enable' = 'false', 'value.json.infer-schema.primitive-as-string' = 'false' ); CREATE TEMPORARY TABLE order_source ( key_id BIGINT, value_product STRING, value_user_id BIGINT ) WITH ( 'connector' = 'kafka', 'topic' = 'order', 'properties.bootstrap.servers' = '<yourKafkaBrokers>', 'scan.startup.mode' = 'earliest-offset', 'key.format' = 'json', 'value.format' = 'json', 'key.fields' = 'key_id', 'key.fields-prefix' = 'key_', 'value.fields-prefix' = 'value_', 'value.fields-include' = 'EXCEPT_KEY', 'value.json.infer-schema.flatten-nested-columns.enable' = 'false', 'value.json.infer-schema.primitive-as-string' = 'false' ); CREATE TEMPORARY TABLE feedback_source ( key_id BIGINT, value_user_id BIGINT, value_comment STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'feedback', 'properties.bootstrap.servers' = '<yourKafkaBrokers>', 'scan.startup.mode' = 'earliest-offset', 'key.format' = 'json', 'value.format' = 'json', 'key.fields' = 'key_id', 'key.fields-prefix' = 'key_', 'value.fields-prefix' = 'value_', 'value.fields-include' = 'EXCEPT_KEY', 'value.json.infer-schema.flatten-nested-columns.enable' = 'false', 'value.json.infer-schema.primitive-as-string' = 'false' ); CREATE TEMPORARY TABLE print_user_proudct( order_id BIGINT, product STRING, user_name STRING ) WITH ( 'connector'='print', 'logger'='true' ); CREATE TEMPORARY TABLE print_user_feedback( feedback_id BIGINT, `comment` STRING, user_name STRING ) WITH ( 'connector'='print', 'logger'='true' ); BEGIN STATEMENT SET; -- Diperlukan saat menulis ke beberapa sink. -- Gabungkan informasi pesanan dengan tabel user dalam katalog JSON Kafka untuk menampilkan username dan nama produk untuk setiap pesanan. INSERT INTO print_user_proudct SELECT order_source.key_id as order_id, value_product as product, user_source.value_name as user_name FROM order_source LEFT JOIN user_source ON order_source.value_user_id = user_source.key_id; -- Gabungkan ulasan dengan tabel user untuk menampilkan isi setiap ulasan dan username yang sesuai. INSERT INTO print_user_feedback SELECT feedback_source.key_id as feedback_id, value_comment as `comment`, user_source.value_name as user_name FROM feedback_source LEFT JOIN user_source ON feedback_source.value_user_id = user_source.key_id; END; -- Diperlukan saat menulis ke beberapa sink.Contoh ini menggunakan konektor Print untuk mencetak hasil secara langsung. Anda juga dapat mengeluarkan hasil ke tabel sink konektor untuk analisis dan komputasi lebih lanjut. Untuk informasi selengkapnya tentang sintaks menulis ke beberapa sink, lihat Pernyataan INSERT INTO.
Tabel berikut menjelaskan parameter untuk mengonfigurasi tabel temporary.
Parameter
Deskripsi
Catatan
connector
Jenis konektor.
Nilainya harus `kafka`.
topic
Nama topik yang sesuai.
Harus sama dengan deskripsi dalam katalog JSON Kafka.
properties.bootstrap.servers
Alamat broker Kafka.
Formatnya adalah
host:port,host:port,host:port. Gunakan koma (,) untuk memisahkan alamat.scan.startup.mode
Offset awal untuk membaca data dari Kafka.
Nilai yang valid:
earliest-offset: Baca dari partisi paling awal di Kafka.
latest-offset: Baca dari offset terbaru di Kafka.
group-offsets (default): Baca dari offset yang telah dikomit oleh properties.group.id yang ditentukan.
timestamp: Baca dari timestamp yang ditentukan oleh scan.startup.timestamp-millis.
specific-offsets: Baca dari offset yang ditentukan oleh scan.startup.specific-offsets.
Catatan
Parameter ini hanya berlaku saat job dimulai tanpa state. Jika job dimulai ulang dari checkpoint atau memulihkan state-nya, job akan memprioritaskan menggunakan progres yang disimpan dalam state untuk melanjutkan pembacaan.
key.format
Format yang digunakan oleh konektor Flink Kafka untuk serialisasi atau deserialisasi kunci pesan Kafka.
Nilainya harus `json`.
key.fields
Field-field dalam tabel sumber atau sink yang sesuai dengan kunci pesan Kafka.
Gunakan titik koma (;) untuk memisahkan beberapa nama field. Contohnya,
field1;field2.key.fields-prefix
Awalan kustom untuk semua kunci pesan Kafka guna menghindari konflik nama dengan field dalam nilai pesan atau metadata.
Harus sama dengan nilai parameter key.fields-prefix dalam katalog JSON Kafka.
value.format
Format yang digunakan oleh konektor Flink Kafka untuk serialisasi atau deserialisasi nilai pesan Kafka.
Nilainya harus `json`.
value.fields-prefix
Awalan kustom untuk semua nilai pesan Kafka guna menghindari konflik nama dengan field dalam kunci pesan atau metadata.
Harus sama dengan nilai parameter value.fields-prefix dalam katalog JSON Kafka.
value.fields-include
Menentukan kebijakan penanganan field kunci pesan dalam isi pesan.
Nilai tetap EXCEPT_KEY menunjukkan bahwa isi pesan tidak mengandung field kunci pesan.
value.json.infer-schema.flatten-nested-columns.enable
Menentukan apakah akan memperluas kolom bersarang secara rekursif dalam JSON nilai pesan Kafka.
Sesuai dengan nilai parameter infer-schema.flatten-nested-columns.enable dalam katalog.
value.json.infer-schema.primitive-as-string
Menentukan apakah akan menginferensi semua tipe data primitif sebagai tipe String dalam nilai pesan Kafka.
Sesuai dengan nilai parameter infer-schema.primitive-as-string dalam katalog.
Di pojok kanan atas, klik Deploy untuk mendeploy job.
Pada panel navigasi kiri, pilih . Pada kolom Actions job target, klik Start. Pilih Stateless Start, lalu klik Start.
Lihat hasil pekerjaan.
Pada panel navigasi kiri, pilih , lalu klik job target.
Pada tab Job Logs, klik task di bawah Path, ID pada tab Running Task Managers.
Klik Logs dan cari informasi log terkait
PrintSinkOutputWriterdi halaman tersebut.