Topik ini menjelaskan cara menyinkronkan seluruh database MySQL ke Kafka untuk mengurangi tekanan yang ditimbulkan oleh banyak tugas terhadap database MySQL.
Informasi latar belakang
MySQL Change Data Capture (CDC) digunakan untuk mengambil data dari MySQL dan menyinkronkan modifikasi tabel secara real time. Teknologi ini sering diterapkan dalam skenario komputasi kompleks, seperti menggabungkan tabel dimensi dengan tabel data lainnya. Dalam praktiknya, beberapa pekerjaan mungkin bergantung pada tabel MySQL yang sama. Ketika banyak tugas memproses tabel MySQL yang sama, database MySQL harus membuka banyak koneksi, sehingga memberikan tekanan signifikan pada server dan jaringan MySQL.
Arsitektur
Untuk mengurangi tekanan pada database MySQL hulu, Realtime Compute for Apache Flink menyediakan fitur untuk menyinkronkan seluruh database MySQL ke Kafka. Solusi ini memperkenalkan Kafka sebagai lapisan perantara dan menggunakan pekerjaan ingesti data Flink CDC untuk menyinkronkan data ke Kafka.
Sebuah pekerjaan tunggal menyinkronkan data dari database MySQL hulu ke Kafka secara real time. Setiap tabel MySQL ditulis ke topik Kafka yang sesuai dalam mode upsert. Selanjutnya, konektor Upsert Kafka dapat membaca data dari topik tersebut alih-alih mengakses tabel MySQL secara langsung. Pendekatan ini secara efektif mengurangi tekanan yang ditimbulkan oleh banyak tugas terhadap database MySQL.

Batasan
Tabel MySQL yang akan disinkronkan harus memiliki kunci primer.
Anda dapat menggunakan cluster Kafka yang dikelola sendiri, cluster EMR Kafka, atau ApsaraMQ for Kafka. Saat menggunakan ApsaraMQ for Kafka, Anda hanya dapat menggunakan titik akhir default.
Ruang penyimpanan cluster Kafka harus lebih besar daripada ruang penyimpanan data tabel sumber. Jika tidak, data dapat hilang karena ruang penyimpanan yang tidak mencukupi. Hal ini karena topik yang dibuat untuk sinkronisasi database penuh merupakan topik compacted. Dalam topik compacted, hanya pesan terbaru untuk setiap kunci pesan yang dipertahankan, tetapi data tersebut tidak kedaluwarsa. Akibatnya, ukuran penyimpanan topik compacted setara dengan ukuran tabel sumber.
Skenario
Sebagai contoh, dalam skenario analisis real-time untuk komentar pesanan, asumsikan Anda memiliki tiga tabel: tabel pengguna, tabel pesanan, dan tabel umpan balik. Data di setiap tabel ditunjukkan pada gambar berikut.
Untuk menampilkan informasi pesanan pengguna dan komentar pengguna, Anda perlu menggabungkan tabel pengguna untuk mengambil nama pengguna (bidang name). Berikut adalah contoh Pernyataan SQL-nya.
-- Gabungkan tabel pesanan dengan tabel pengguna untuk menampilkan nama pengguna dan nama produk untuk setiap pesanan.
SELECT order.id as order_id, product, user.name as user_name
FROM order LEFT JOIN user
ON order.user_id = user.id;
-- Gabungkan tabel umpan balik dengan tabel pengguna untuk menampilkan isi komentar dan nama pengguna yang sesuai.
SELECT feedback.id as feedback_id, comment, user.name as user_name
FROM feedback LEFT JOIN user
ON feedback.user_id = user.id;Pada dua tugas SQL tersebut, tabel pengguna digunakan dalam kedua pekerjaan. Saat waktu proses, kedua pekerjaan membaca data lengkap dan data inkremental dari MySQL. Membaca data lengkap memerlukan koneksi MySQL, sedangkan membaca data inkremental memerlukan klien binary logging (Binlog). Seiring bertambahnya jumlah pekerjaan, jumlah koneksi MySQL dan klien Binlog juga meningkat, sehingga memberikan tekanan signifikan pada database hulu. Untuk mengurangi tekanan ini, Anda dapat menggunakan sintaks CDAS atau CTAS untuk menyinkronkan data MySQL hulu ke Kafka secara real time agar dapat dikonsumsi oleh banyak pekerjaan hilir.
Prasyarat
Anda telah mengaktifkan Realtime Compute for Apache Flink. Untuk informasi selengkapnya, lihat Aktifkan Realtime Compute for Apache Flink.
Anda telah mengaktifkan ApsaraMQ for Kafka. Untuk informasi selengkapnya, lihat Deploy instance ApsaraMQ for Kafka.
Anda telah mengaktifkan ApsaraDB RDS for MySQL. Untuk informasi selengkapnya, lihat Buat instance ApsaraDB RDS for MySQL.
Realtime Compute for Apache Flink, ApsaraDB RDS for MySQL, dan ApsaraMQ for Kafka harus berada dalam VPC yang sama. Jika tidak berada dalam VPC yang sama, Anda harus membuat koneksi jaringan lintas VPC atau menggunakan Internet untuk akses. Untuk informasi selengkapnya, lihat Bagaimana cara mengakses layanan lintas VPC? dan Bagaimana cara mengakses Internet?.
Saat menggunakan Pengguna Resource Access Management (RAM) atau Peran RAM untuk mengakses sumber daya, pastikan Pengguna RAM atau Peran RAM tersebut memiliki izin yang diperlukan.
Persiapan
Buat instance ApsaraDB RDS for MySQL dan siapkan sumber data
Buat database ApsaraDB RDS for MySQL. Untuk informasi selengkapnya, lihat Buat database.
Buat database bernama
order_dwuntuk instance tujuan.Siapkan sumber data MySQL CDC.
Pada halaman detail instance, klik Log On To Database di bagian atas halaman.
Pada kotak dialog logon DMS, masukkan nama pengguna dan kata sandi untuk akun database yang telah Anda buat, lalu klik Log On.
Setelah berhasil masuk, klik ganda database
order_dwdi sebelah kiri untuk beralih ke database tersebut.Pada bagian SQL Console, tulis pernyataan DDL untuk membuat ketiga tabel bisnis dan pernyataan untuk memasukkan data.
CREATE TABLE `user` ( id bigint not null primary key, name varchar(50) not null ); CREATE TABLE `order` ( id bigint not null primary key, product varchar(50) not null, user_id bigint not null ); CREATE TABLE `feedback` ( id bigint not null primary key, user_id bigint not null, comment varchar(50) not null ); -- Siapkan data INSERT INTO `user` VALUES(1, 'Tom'),(2, 'Jerry'); INSERT INTO `order` VALUES (1, 'Football', 2), (2, 'Basket', 1); INSERT INTO `feedback` VALUES (1, 1, 'Good.'), (2, 2, 'Very good');
Klik Execute, lalu klik Execute Directly.
Prosedur
Buat dan mulai tugas ingesti data Flink CDC untuk menyinkronkan data MySQL hulu ke Kafka secara real time. Hal ini memungkinkan banyak pekerjaan hilir mengonsumsi data tersebut. Pekerjaan sinkronisasi database penuh secara otomatis membuat topik. Anda dapat menentukan nama topik menggunakan modul route. Jumlah partisi dan replika untuk topik mengikuti konfigurasi default cluster Kafka, dan cleanup.policy diatur ke compact.
Nama topik default
Secara default, nama topik Kafka yang dibuat oleh tugas sinkronisasi database penuh menggunakan format `database_name.table_name`. Pekerjaan berikut membuat tiga topik: `order_dw.user`, `order_dw.order`, dan `order_dw.feedback`.
Pada halaman , buat pekerjaan ingesti data Flink CDC dan salin kode berikut ke editor YAML.
source: type: mysql name: MySQL Source hostname: #{hostname} port: 3306 username: #{usernmae} password: #{password} tables: order_dw.\.* server-id: 28601-28604 sink: type: upsert-kafka name: upsert-kafka Sink properties.bootstrap.servers: xxxx.alikafka.aliyuncs.com:9092 # Parameter berikut diperlukan untuk ApsaraMQ for Kafka aliyun.kafka.accessKeyId: #{ak} aliyun.kafka.accessKeySecret: #{sk} aliyun.kafka.instanceId: #{instanceId} aliyun.kafka.endpoint: #{endpoint} aliyun.kafka.regionId: #{regionId}Di pojok kanan atas, klik Deploy untuk mendeploy pekerjaan.
Di panel navigasi, pilih . Temukan pekerjaan target dan klik Start pada kolom Actions. Pilih Stateless Start, lalu klik Start.
Tentukan nama topik
Untuk tugas sinkronisasi database penuh, Anda dapat menggunakan modul route untuk menentukan nama topik untuk setiap tabel. Pekerjaan berikut membuat tiga topik: `user1`, `order2`, dan `feedback3`.
Pada halaman , buat pekerjaan ingesti data Flink CDC dan salin kode berikut ke editor YAML.
source: type: mysql name: MySQL Source hostname: #{hostname} port: 3306 username: #{usernmae} password: #{password} tables: order_dw.\.* server-id: 28601-28604 route: - source-table: order_dw.user sink-table: user1 - source-table: order_dw.order sink-table: order2 - source-table: order_dw.feedback sink-table: feedback3 sink: type: upsert-kafka name: upsert-kafka Sink properties.bootstrap.servers: xxxx.alikafka.aliyuncs.com:9092 # Parameter berikut diperlukan untuk ApsaraMQ for Kafka aliyun.kafka.accessKeyId: #{ak} aliyun.kafka.accessKeySecret: #{sk} aliyun.kafka.instanceId: #{instanceId} aliyun.kafka.endpoint: #{endpoint} aliyun.kafka.regionId: #{regionId}Di pojok kanan atas, klik Deploy untuk mendeploy pekerjaan.
Di panel navigasi, pilih . Temukan pekerjaan target dan klik Start pada kolom Actions. Pilih Stateless Start, lalu klik Start.
Tentukan nama topik secara batch
Untuk tugas sinkronisasi database penuh, Anda dapat menggunakan modul route untuk menentukan pola guna menghasilkan nama topik secara batch. Pekerjaan berikut membuat tiga topik: `topic_user`, `topic_order`, dan `topic_feedback`.
Pada halaman , buat pekerjaan ingesti data Flink CDC dan salin kode berikut ke editor YAML.
source: type: mysql name: MySQL Source hostname: #{hostname} port: 3306 username: #{usernmae} password: #{password} tables: order_dw.\.* server-id: 28601-28604 route: - source-table: order_dw.\.* sink-table: topic_<> replace-symbol: <> sink: type: upsert-kafka name: upsert-kafka Sink properties.bootstrap.servers: xxxx.alikafka.aliyuncs.com:9092 # Parameter berikut diperlukan untuk ApsaraMQ for Kafka aliyun.kafka.accessKeyId: #{ak} aliyun.kafka.accessKeySecret: #{sk} aliyun.kafka.instanceId: #{instanceId} aliyun.kafka.endpoint: #{endpoint} aliyun.kafka.regionId: #{regionId}Di pojok kanan atas, klik Deploy untuk mendeploy pekerjaan.
Di panel navigasi, pilih . Temukan pekerjaan target dan klik Start pada kolom Actions. Pilih Stateless Start, lalu klik Start.
Mengonsumsi data Kafka secara real time.
Data dari database MySQL hulu ditulis ke Kafka dalam format JSON. Satu topik Kafka dapat dikonsumsi oleh banyak pekerjaan hilir. Pekerjaan hilir mengonsumsi data dari topik untuk mengambil data terbaru dari tabel database. Anda dapat mengonsumsi data dari tabel yang disinkronkan ke Kafka dengan salah satu dari dua cara berikut:
Mengonsumsi data langsung melalui katalog
Baca data dari topik Kafka sebagai tabel sumber.
Pada halaman , buat pekerjaan stream SQL dan salin kode berikut ke editor SQL.
CREATE TEMPORARY TABLE print_user_proudct( order_id BIGINT, product STRING, user_name STRING ) WITH ( 'connector'='print', 'logger'='true' ); CREATE TEMPORARY TABLE print_user_feedback( feedback_id BIGINT, `comment` STRING, user_name STRING ) WITH ( 'connector'='print', 'logger'='true' ); BEGIN STATEMENT SET; -- Diperlukan saat menulis ke banyak sink. -- Gabungkan informasi pesanan dengan tabel pengguna dalam katalog JSON Kafka untuk menampilkan nama pengguna dan nama produk untuk setiap pesanan. INSERT INTO print_user_proudct SELECT `order`.key_id as order_id, value_product as product, `user`.value_name as user_name FROM `kafka-catalog`.`kafka`.`order`/*+OPTIONS('properties.group.id'='<yourGroupName>', 'scan.startup.mode'='earliest-offset')*/ as `order` --Tentukan grup dan mode startup LEFT JOIN `kafka-catalog`.`kafka`.`user`/*+OPTIONS('properties.group.id'='<yourGroupName>', 'scan.startup.mode'='earliest-offset')*/ as `user` --Tentukan grup dan mode startup ON `order`.value_user_id = `user`.key_id; -- Gabungkan tabel komentar dengan tabel pengguna untuk menampilkan isi setiap komentar dan nama pengguna yang sesuai. INSERT INTO print_user_feedback SELECT feedback.key_id as feedback_id, value_comment as `comment`, `user`.value_name as user_name FROM `kafka-catalog`.`kafka`.feedback/*+OPTIONS('properties.group.id'='<yourGroupName>', 'scan.startup.mode'='earliest-offset')*/ as feedback --Tentukan grup dan mode startup LEFT JOIN `kafka-catalog`.`kafka`.`user`/*+OPTIONS('properties.group.id'='<yourGroupName>', 'scan.startup.mode'='earliest-offset')*/ as `user` --Tentukan grup dan mode startup ON feedback.value_user_id = `user`.key_id; END; -- Diperlukan saat menulis ke banyak sink.Contoh ini menggunakan konektor Print untuk mencetak hasil secara langsung. Anda juga dapat mengeluarkan hasil ke tabel sink konektor untuk analisis dan komputasi lebih lanjut. Untuk informasi selengkapnya tentang sintaks penulisan ke banyak sink, lihat Pernyataan INSERT INTO.
CatatanSaat Anda mengonsumsi data secara langsung, skema yang diurai oleh katalog JSON Kafka mungkin berbeda dari skema tabel MySQL yang sesuai karena perubahan skema. Misalnya, bidang yang dihapus mungkin masih muncul, atau beberapa bidang mungkin bernilai null.
Skema yang dibaca oleh katalog terdiri dari bidang-bidang dari data yang dikonsumsi. Jika suatu bidang telah dihapus tetapi pesannya belum kedaluwarsa, bidang tersebut mungkin masih muncul dengan nilai null. Anda tidak perlu menangani situasi ini.
Di pojok kanan atas, klik Deploy untuk mendeploy pekerjaan.
Di panel navigasi, pilih . Temukan pekerjaan target dan klik Start pada kolom Actions. Pilih Stateless Start, lalu klik Start.
Mengonsumsi data dengan membuat tabel sementara
Tentukan skema kustom dan baca data dari tabel sementara.
Pada halaman , buat pekerjaan stream SQL dan salin kode berikut ke editor SQL.
CREATE TEMPORARY TABLE user_source ( key_id BIGINT, value_name STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'user', 'properties.bootstrap.servers' = '<yourKafkaBrokers>', 'scan.startup.mode' = 'earliest-offset', 'key.format' = 'json', 'value.format' = 'json', 'key.fields' = 'key_id', 'key.fields-prefix' = 'key_', 'value.fields-prefix' = 'value_', 'value.fields-include' = 'EXCEPT_KEY', 'value.json.infer-schema.flatten-nested-columns.enable' = 'false', 'value.json.infer-schema.primitive-as-string' = 'false' ); CREATE TEMPORARY TABLE order_source ( key_id BIGINT, value_product STRING, value_user_id BIGINT ) WITH ( 'connector' = 'kafka', 'topic' = 'order', 'properties.bootstrap.servers' = '<yourKafkaBrokers>', 'scan.startup.mode' = 'earliest-offset', 'key.format' = 'json', 'value.format' = 'json', 'key.fields' = 'key_id', 'key.fields-prefix' = 'key_', 'value.fields-prefix' = 'value_', 'value.fields-include' = 'EXCEPT_KEY', 'value.json.infer-schema.flatten-nested-columns.enable' = 'false', 'value.json.infer-schema.primitive-as-string' = 'false' ); CREATE TEMPORARY TABLE feedback_source ( key_id BIGINT, value_user_id BIGINT, value_comment STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'feedback', 'properties.bootstrap.servers' = '<yourKafkaBrokers>', 'scan.startup.mode' = 'earliest-offset', 'key.format' = 'json', 'value.format' = 'json', 'key.fields' = 'key_id', 'key.fields-prefix' = 'key_', 'value.fields-prefix' = 'value_', 'value.fields-include' = 'EXCEPT_KEY', 'value.json.infer-schema.flatten-nested-columns.enable' = 'false', 'value.json.infer-schema.primitive-as-string' = 'false' ); CREATE TEMPORARY TABLE print_user_proudct( order_id BIGINT, product STRING, user_name STRING ) WITH ( 'connector'='print', 'logger'='true' ); CREATE TEMPORARY TABLE print_user_feedback( feedback_id BIGINT, `comment` STRING, user_name STRING ) WITH ( 'connector'='print', 'logger'='true' ); BEGIN STATEMENT SET; -- Diperlukan saat menulis ke banyak sink. -- Gabungkan informasi pesanan dengan tabel pengguna dalam katalog JSON Kafka untuk menampilkan nama pengguna dan nama produk untuk setiap pesanan. INSERT INTO print_user_proudct SELECT order_source.key_id as order_id, value_product as product, user_source.value_name as user_name FROM order_source LEFT JOIN user_source ON order_source.value_user_id = user_source.key_id; -- Gabungkan tabel komentar dengan tabel pengguna untuk menampilkan isi setiap komentar dan nama pengguna yang sesuai. INSERT INTO print_user_feedback SELECT feedback_source.key_id as feedback_id, value_comment as `comment`, user_source.value_name as user_name FROM feedback_source LEFT JOIN user_source ON feedback_source.value_user_id = user_source.key_id; END; -- Diperlukan saat menulis ke banyak sink.Contoh ini menggunakan konektor Print untuk mencetak hasil secara langsung. Anda juga dapat mengeluarkan hasil ke tabel sink konektor untuk analisis dan komputasi lebih lanjut. Untuk informasi selengkapnya tentang sintaks penulisan ke banyak sink, lihat Pernyataan INSERT INTO.
Tabel berikut menjelaskan parameter untuk mengonfigurasi tabel sementara.
Parameter
Deskripsi
Catatan
connector
Jenis konektor.
Atur nilainya menjadi `kafka`.
topic
Nama topik.
Konsisten dengan Katalog JSON Kafka.
properties.bootstrap.servers
Alamat broker Kafka.
Formatnya adalah
host:port,host:port,host:port. Pisahkan beberapa alamat dengan koma (,).scan.startup.mode
Offset awal tempat membaca data di Kafka.
Nilai yang valid:
earliest-offset: Membaca data dari offset paling awal partisi Kafka.
latest-offset: Membaca data dari offset terbaru.
group-offsets (default): Membaca data dari offset yang telah dikomit oleh properties.group.id yang ditentukan.
timestamp: Membaca data dari timestamp yang ditentukan oleh scan.startup.timestamp-millis.
specific-offsets: Membaca data dari offset yang ditentukan oleh scan.startup.specific-offsets.
Catatan
Parameter ini hanya berlaku saat pekerjaan dimulai tanpa state. Jika pekerjaan dimulai ulang dari checkpoint atau state-nya dilanjutkan, pekerjaan akan memprioritaskan kemajuan yang disimpan dalam state untuk melanjutkan pembacaan.
key.format
Format yang digunakan oleh konektor Flink Kafka untuk serialisasi atau deserialisasi kunci pesan Kafka.
Atur nilainya menjadi `json`.
key.fields
Bidang dalam tabel sumber atau sink yang sesuai dengan kunci pesan Kafka.
Pisahkan beberapa nama bidang dengan titik koma (;). Contoh:
field1;field2.key.fields-prefix
Awalan kustom untuk semua kunci pesan Kafka guna menghindari konflik nama dengan bidang dalam nilai pesan atau metadata.
Nilai ini harus sama dengan nilai parameter key.fields-prefix dalam katalog JSON Kafka.
value.format
Format yang digunakan oleh konektor Flink Kafka untuk serialisasi atau deserialisasi nilai pesan Kafka.
Atur nilainya menjadi `json`.
value.fields-prefix
Awalan kustom untuk semua nilai pesan Kafka guna menghindari konflik nama dengan bidang dalam kunci pesan atau metadata.
Nilai ini harus sama dengan nilai parameter value.fields-prefix dalam katalog JSON Kafka.
value.fields-include
Kebijakan untuk nilai pesan saat memproses bidang kunci pesan.
Atur nilainya menjadi `EXCEPT_KEY`. Ini menunjukkan bahwa nilai pesan tidak mengandung bidang dari kunci pesan.
value.json.infer-schema.flatten-nested-columns.enable
Menentukan apakah akan memperluas kolom bersarang secara rekursif dalam JSON nilai pesan Kafka.
Nilai ini sesuai dengan nilai parameter infer-schema.flatten-nested-columns.enable dalam katalog.
value.json.infer-schema.primitive-as-string
Menentukan apakah akan menginferensi semua tipe data primitif sebagai tipe STRING dalam nilai pesan Kafka.
Nilai ini sesuai dengan nilai parameter infer-schema.primitive-as-string dalam katalog.
Di pojok kanan atas, klik Deploy untuk mendeploy pekerjaan.
Di panel navigasi, pilih . Temukan pekerjaan target dan klik Start pada kolom Actions. Pilih Stateless Start, lalu klik Start.
Lihat hasil pekerjaan.
Di panel navigasi, pilih , lalu klik pekerjaan target.
Pada tab Job Logs, klik tab Running Task Managers, lalu klik tugas di bawah Path, ID.
Klik Logs dan cari informasi log terkait
PrintSinkOutputWriter.