全部产品
Search
文档中心

Realtime Compute for Apache Flink:Sinkronkan seluruh database MySQL ke Kafka menggunakan Flink CDC

更新时间:Nov 10, 2025

Topik ini menjelaskan cara menyinkronkan seluruh database MySQL ke Kafka untuk mengurangi tekanan yang ditimbulkan oleh banyak tugas terhadap database MySQL.

Informasi latar belakang

MySQL Change Data Capture (CDC) digunakan untuk mengambil data dari MySQL dan menyinkronkan modifikasi tabel secara real time. Teknologi ini sering diterapkan dalam skenario komputasi kompleks, seperti menggabungkan tabel dimensi dengan tabel data lainnya. Dalam praktiknya, beberapa pekerjaan mungkin bergantung pada tabel MySQL yang sama. Ketika banyak tugas memproses tabel MySQL yang sama, database MySQL harus membuka banyak koneksi, sehingga memberikan tekanan signifikan pada server dan jaringan MySQL.

Arsitektur

Untuk mengurangi tekanan pada database MySQL hulu, Realtime Compute for Apache Flink menyediakan fitur untuk menyinkronkan seluruh database MySQL ke Kafka. Solusi ini memperkenalkan Kafka sebagai lapisan perantara dan menggunakan pekerjaan ingesti data Flink CDC untuk menyinkronkan data ke Kafka.

Sebuah pekerjaan tunggal menyinkronkan data dari database MySQL hulu ke Kafka secara real time. Setiap tabel MySQL ditulis ke topik Kafka yang sesuai dalam mode upsert. Selanjutnya, konektor Upsert Kafka dapat membaca data dari topik tersebut alih-alih mengakses tabel MySQL secara langsung. Pendekatan ini secara efektif mengurangi tekanan yang ditimbulkan oleh banyak tugas terhadap database MySQL.

Gambar 1

Batasan

  • Tabel MySQL yang akan disinkronkan harus memiliki kunci primer.

  • Anda dapat menggunakan cluster Kafka yang dikelola sendiri, cluster EMR Kafka, atau ApsaraMQ for Kafka. Saat menggunakan ApsaraMQ for Kafka, Anda hanya dapat menggunakan titik akhir default.

  • Ruang penyimpanan cluster Kafka harus lebih besar daripada ruang penyimpanan data tabel sumber. Jika tidak, data dapat hilang karena ruang penyimpanan yang tidak mencukupi. Hal ini karena topik yang dibuat untuk sinkronisasi database penuh merupakan topik compacted. Dalam topik compacted, hanya pesan terbaru untuk setiap kunci pesan yang dipertahankan, tetapi data tersebut tidak kedaluwarsa. Akibatnya, ukuran penyimpanan topik compacted setara dengan ukuran tabel sumber.

Skenario

Sebagai contoh, dalam skenario analisis real-time untuk komentar pesanan, asumsikan Anda memiliki tiga tabel: tabel pengguna, tabel pesanan, dan tabel umpan balik. Data di setiap tabel ditunjukkan pada gambar berikut.mysql database

Untuk menampilkan informasi pesanan pengguna dan komentar pengguna, Anda perlu menggabungkan tabel pengguna untuk mengambil nama pengguna (bidang name). Berikut adalah contoh Pernyataan SQL-nya.

-- Gabungkan tabel pesanan dengan tabel pengguna untuk menampilkan nama pengguna dan nama produk untuk setiap pesanan.
SELECT order.id as order_id, product, user.name as user_name
FROM order LEFT JOIN user
ON order.user_id = user.id;

-- Gabungkan tabel umpan balik dengan tabel pengguna untuk menampilkan isi komentar dan nama pengguna yang sesuai.
SELECT feedback.id as feedback_id, comment, user.name as user_name
FROM feedback LEFT JOIN user
ON feedback.user_id = user.id;

Pada dua tugas SQL tersebut, tabel pengguna digunakan dalam kedua pekerjaan. Saat waktu proses, kedua pekerjaan membaca data lengkap dan data inkremental dari MySQL. Membaca data lengkap memerlukan koneksi MySQL, sedangkan membaca data inkremental memerlukan klien binary logging (Binlog). Seiring bertambahnya jumlah pekerjaan, jumlah koneksi MySQL dan klien Binlog juga meningkat, sehingga memberikan tekanan signifikan pada database hulu. Untuk mengurangi tekanan ini, Anda dapat menggunakan sintaks CDAS atau CTAS untuk menyinkronkan data MySQL hulu ke Kafka secara real time agar dapat dikonsumsi oleh banyak pekerjaan hilir.

Prasyarat

Persiapan

Buat instance ApsaraDB RDS for MySQL dan siapkan sumber data

  1. Buat database ApsaraDB RDS for MySQL. Untuk informasi selengkapnya, lihat Buat database.

    Buat database bernama order_dw untuk instance tujuan.

  2. Siapkan sumber data MySQL CDC.

    1. Pada halaman detail instance, klik Log On To Database di bagian atas halaman.

    2. Pada kotak dialog logon DMS, masukkan nama pengguna dan kata sandi untuk akun database yang telah Anda buat, lalu klik Log On.

    3. Setelah berhasil masuk, klik ganda database order_dw di sebelah kiri untuk beralih ke database tersebut.

    4. Pada bagian SQL Console, tulis pernyataan DDL untuk membuat ketiga tabel bisnis dan pernyataan untuk memasukkan data.

      CREATE TABLE `user` (
        id bigint not null primary key,
        name varchar(50) not null
      );
      
      CREATE TABLE `order` (
        id bigint not null primary key,
        product varchar(50) not null,
        user_id bigint not null
      );
      
      CREATE TABLE `feedback` (
        id bigint not null primary key,
        user_id bigint not null,
        comment varchar(50) not null
      );
      
      -- Siapkan data
      INSERT INTO `user` VALUES(1, 'Tom'),(2, 'Jerry');
      
      INSERT INTO `order` VALUES
      (1, 'Football', 2),
      (2, 'Basket', 1);
      
      INSERT INTO `feedback` VALUES
      (1, 1, 'Good.'),
      (2, 2, 'Very good');
  3. Klik Execute, lalu klik Execute Directly.

Prosedur

  1. Buat dan mulai tugas ingesti data Flink CDC untuk menyinkronkan data MySQL hulu ke Kafka secara real time. Hal ini memungkinkan banyak pekerjaan hilir mengonsumsi data tersebut. Pekerjaan sinkronisasi database penuh secara otomatis membuat topik. Anda dapat menentukan nama topik menggunakan modul route. Jumlah partisi dan replika untuk topik mengikuti konfigurasi default cluster Kafka, dan cleanup.policy diatur ke compact.

    Nama topik default

    Secara default, nama topik Kafka yang dibuat oleh tugas sinkronisasi database penuh menggunakan format `database_name.table_name`. Pekerjaan berikut membuat tiga topik: `order_dw.user`, `order_dw.order`, dan `order_dw.feedback`.

    1. Pada halaman Data Development > Data Ingestion, buat pekerjaan ingesti data Flink CDC dan salin kode berikut ke editor YAML.

      source:
        type: mysql
        name: MySQL Source
        hostname: #{hostname}
        port: 3306
        username: #{usernmae}
        password: #{password}
        tables: order_dw.\.*
        server-id: 28601-28604
      
      sink:
        type: upsert-kafka
        name: upsert-kafka Sink
        properties.bootstrap.servers: xxxx.alikafka.aliyuncs.com:9092
        # Parameter berikut diperlukan untuk ApsaraMQ for Kafka
        aliyun.kafka.accessKeyId: #{ak}
        aliyun.kafka.accessKeySecret: #{sk}
        aliyun.kafka.instanceId: #{instanceId}
        aliyun.kafka.endpoint: #{endpoint}
        aliyun.kafka.regionId: #{regionId}
    2. Di pojok kanan atas, klik Deploy untuk mendeploy pekerjaan.

    3. Di panel navigasi, pilih Operation Center > Job O&M. Temukan pekerjaan target dan klik Start pada kolom Actions. Pilih Stateless Start, lalu klik Start.

    Tentukan nama topik

    Untuk tugas sinkronisasi database penuh, Anda dapat menggunakan modul route untuk menentukan nama topik untuk setiap tabel. Pekerjaan berikut membuat tiga topik: `user1`, `order2`, dan `feedback3`.

    1. Pada halaman Data Development > Data Ingestion, buat pekerjaan ingesti data Flink CDC dan salin kode berikut ke editor YAML.

      source:
        type: mysql
        name: MySQL Source
        hostname: #{hostname}
        port: 3306
        username: #{usernmae}
        password: #{password}
        tables: order_dw.\.*
        server-id: 28601-28604
        
      route:
        - source-table: order_dw.user
          sink-table: user1
        - source-table: order_dw.order
          sink-table: order2
        - source-table: order_dw.feedback
          sink-table: feedback3
      
      sink:
        type: upsert-kafka
        name: upsert-kafka Sink
        properties.bootstrap.servers: xxxx.alikafka.aliyuncs.com:9092
        # Parameter berikut diperlukan untuk ApsaraMQ for Kafka
        aliyun.kafka.accessKeyId: #{ak}
        aliyun.kafka.accessKeySecret: #{sk}
        aliyun.kafka.instanceId: #{instanceId}
        aliyun.kafka.endpoint: #{endpoint}
        aliyun.kafka.regionId: #{regionId}
    2. Di pojok kanan atas, klik Deploy untuk mendeploy pekerjaan.

    3. Di panel navigasi, pilih Operation Center > Job O&M. Temukan pekerjaan target dan klik Start pada kolom Actions. Pilih Stateless Start, lalu klik Start.

    Tentukan nama topik secara batch

    Untuk tugas sinkronisasi database penuh, Anda dapat menggunakan modul route untuk menentukan pola guna menghasilkan nama topik secara batch. Pekerjaan berikut membuat tiga topik: `topic_user`, `topic_order`, dan `topic_feedback`.

    1. Pada halaman Data Development > Data Ingestion, buat pekerjaan ingesti data Flink CDC dan salin kode berikut ke editor YAML.

      source:
        type: mysql
        name: MySQL Source
        hostname: #{hostname}
        port: 3306
        username: #{usernmae}
        password: #{password}
        tables: order_dw.\.*
        server-id: 28601-28604
        
      route:
        - source-table: order_dw.\.*
          sink-table: topic_<>
          replace-symbol: <>
      
      sink:
        type: upsert-kafka
        name: upsert-kafka Sink
        properties.bootstrap.servers: xxxx.alikafka.aliyuncs.com:9092
        # Parameter berikut diperlukan untuk ApsaraMQ for Kafka
        aliyun.kafka.accessKeyId: #{ak}
        aliyun.kafka.accessKeySecret: #{sk}
        aliyun.kafka.instanceId: #{instanceId}
        aliyun.kafka.endpoint: #{endpoint}
        aliyun.kafka.regionId: #{regionId}
    2. Di pojok kanan atas, klik Deploy untuk mendeploy pekerjaan.

    3. Di panel navigasi, pilih Operation Center > Job O&M. Temukan pekerjaan target dan klik Start pada kolom Actions. Pilih Stateless Start, lalu klik Start.

  1. Mengonsumsi data Kafka secara real time.

    Data dari database MySQL hulu ditulis ke Kafka dalam format JSON. Satu topik Kafka dapat dikonsumsi oleh banyak pekerjaan hilir. Pekerjaan hilir mengonsumsi data dari topik untuk mengambil data terbaru dari tabel database. Anda dapat mengonsumsi data dari tabel yang disinkronkan ke Kafka dengan salah satu dari dua cara berikut:

    Mengonsumsi data langsung melalui katalog

    Baca data dari topik Kafka sebagai tabel sumber.

    1. Pada halaman Data Development > ETL, buat pekerjaan stream SQL dan salin kode berikut ke editor SQL.

      CREATE TEMPORARY TABLE print_user_proudct(
        order_id BIGINT,
        product STRING,
        user_name STRING
      ) WITH (
        'connector'='print',
        'logger'='true'
      );
      
      CREATE TEMPORARY TABLE print_user_feedback(
        feedback_id BIGINT,
        `comment` STRING,
        user_name STRING
      ) WITH (
        'connector'='print',
        'logger'='true'
      );
      
      BEGIN STATEMENT SET;      -- Diperlukan saat menulis ke banyak sink.
      
      -- Gabungkan informasi pesanan dengan tabel pengguna dalam katalog JSON Kafka untuk menampilkan nama pengguna dan nama produk untuk setiap pesanan.
      INSERT INTO print_user_proudct
      SELECT `order`.key_id as order_id, value_product as product, `user`.value_name as user_name
      FROM `kafka-catalog`.`kafka`.`order`/*+OPTIONS('properties.group.id'='<yourGroupName>', 'scan.startup.mode'='earliest-offset')*/  as `order` --Tentukan grup dan mode startup
      LEFT JOIN `kafka-catalog`.`kafka`.`user`/*+OPTIONS('properties.group.id'='<yourGroupName>', 'scan.startup.mode'='earliest-offset')*/ as `user` --Tentukan grup dan mode startup
      ON `order`.value_user_id = `user`.key_id;
      
      -- Gabungkan tabel komentar dengan tabel pengguna untuk menampilkan isi setiap komentar dan nama pengguna yang sesuai.
      INSERT INTO print_user_feedback
      SELECT feedback.key_id as feedback_id, value_comment as `comment`, `user`.value_name as user_name
      FROM `kafka-catalog`.`kafka`.feedback/*+OPTIONS('properties.group.id'='<yourGroupName>', 'scan.startup.mode'='earliest-offset')*/  as feedback  --Tentukan grup dan mode startup
      LEFT JOIN `kafka-catalog`.`kafka`.`user`/*+OPTIONS('properties.group.id'='<yourGroupName>', 'scan.startup.mode'='earliest-offset')*/ as `user` --Tentukan grup dan mode startup
      ON feedback.value_user_id = `user`.key_id;
      
      END;      -- Diperlukan saat menulis ke banyak sink.

      Contoh ini menggunakan konektor Print untuk mencetak hasil secara langsung. Anda juga dapat mengeluarkan hasil ke tabel sink konektor untuk analisis dan komputasi lebih lanjut. Untuk informasi selengkapnya tentang sintaks penulisan ke banyak sink, lihat Pernyataan INSERT INTO.

      Catatan

      Saat Anda mengonsumsi data secara langsung, skema yang diurai oleh katalog JSON Kafka mungkin berbeda dari skema tabel MySQL yang sesuai karena perubahan skema. Misalnya, bidang yang dihapus mungkin masih muncul, atau beberapa bidang mungkin bernilai null.

      Skema yang dibaca oleh katalog terdiri dari bidang-bidang dari data yang dikonsumsi. Jika suatu bidang telah dihapus tetapi pesannya belum kedaluwarsa, bidang tersebut mungkin masih muncul dengan nilai null. Anda tidak perlu menangani situasi ini.

    2. Di pojok kanan atas, klik Deploy untuk mendeploy pekerjaan.

    3. Di panel navigasi, pilih Operation Center > Job O&M. Temukan pekerjaan target dan klik Start pada kolom Actions. Pilih Stateless Start, lalu klik Start.

    Mengonsumsi data dengan membuat tabel sementara

    Tentukan skema kustom dan baca data dari tabel sementara.

    1. Pada halaman Data Development > ETL, buat pekerjaan stream SQL dan salin kode berikut ke editor SQL.

      CREATE TEMPORARY TABLE user_source (
        key_id BIGINT,
        value_name STRING
      ) WITH (
        'connector' = 'kafka',
        'topic' = 'user',
        'properties.bootstrap.servers' = '<yourKafkaBrokers>',
        'scan.startup.mode' = 'earliest-offset',
        'key.format' = 'json',
        'value.format' = 'json',
        'key.fields' = 'key_id',
        'key.fields-prefix' = 'key_',
        'value.fields-prefix' = 'value_',
        'value.fields-include' = 'EXCEPT_KEY',
        'value.json.infer-schema.flatten-nested-columns.enable' = 'false',
        'value.json.infer-schema.primitive-as-string' = 'false'
      );
      
      CREATE TEMPORARY TABLE order_source (
        key_id  BIGINT,
        value_product STRING,
        value_user_id BIGINT  
      ) WITH (
        'connector' = 'kafka',
        'topic' = 'order',
        'properties.bootstrap.servers' = '<yourKafkaBrokers>',
        'scan.startup.mode' = 'earliest-offset',
        'key.format' = 'json',
        'value.format' = 'json',
        'key.fields' = 'key_id',
        'key.fields-prefix' = 'key_',
        'value.fields-prefix' = 'value_',
        'value.fields-include' = 'EXCEPT_KEY',
        'value.json.infer-schema.flatten-nested-columns.enable' = 'false',
        'value.json.infer-schema.primitive-as-string' = 'false'
      );
      
      CREATE TEMPORARY TABLE feedback_source (
        key_id  BIGINT,
        value_user_id BIGINT,
        value_comment STRING
      ) WITH (
        'connector' = 'kafka',
        'topic' = 'feedback',
        'properties.bootstrap.servers' = '<yourKafkaBrokers>',
        'scan.startup.mode' = 'earliest-offset',
        'key.format' = 'json',
        'value.format' = 'json',
        'key.fields' = 'key_id',
        'key.fields-prefix' = 'key_',
        'value.fields-prefix' = 'value_',
        'value.fields-include' = 'EXCEPT_KEY',
        'value.json.infer-schema.flatten-nested-columns.enable' = 'false',
        'value.json.infer-schema.primitive-as-string' = 'false'
      );
      
      CREATE TEMPORARY TABLE print_user_proudct(
        order_id BIGINT,
        product STRING,
        user_name STRING
      ) WITH (
        'connector'='print',
        'logger'='true'
      );
      
      CREATE TEMPORARY TABLE print_user_feedback(
        feedback_id BIGINT,
        `comment` STRING,
        user_name STRING
      ) WITH (
        'connector'='print',
        'logger'='true'
      );
      
      BEGIN STATEMENT SET;      -- Diperlukan saat menulis ke banyak sink.
      -- Gabungkan informasi pesanan dengan tabel pengguna dalam katalog JSON Kafka untuk menampilkan nama pengguna dan nama produk untuk setiap pesanan.
      INSERT INTO print_user_proudct
      SELECT order_source.key_id as order_id, value_product as product, user_source.value_name as user_name
      FROM order_source LEFT JOIN user_source
      ON order_source.value_user_id = user_source.key_id;
      
      
      -- Gabungkan tabel komentar dengan tabel pengguna untuk menampilkan isi setiap komentar dan nama pengguna yang sesuai.
      INSERT INTO print_user_feedback
      SELECT feedback_source.key_id as feedback_id, value_comment as `comment`, user_source.value_name as user_name
      FROM feedback_source  LEFT JOIN user_source
      ON feedback_source.value_user_id = user_source.key_id;
      
      END;      -- Diperlukan saat menulis ke banyak sink.

      Contoh ini menggunakan konektor Print untuk mencetak hasil secara langsung. Anda juga dapat mengeluarkan hasil ke tabel sink konektor untuk analisis dan komputasi lebih lanjut. Untuk informasi selengkapnya tentang sintaks penulisan ke banyak sink, lihat Pernyataan INSERT INTO.

      Tabel berikut menjelaskan parameter untuk mengonfigurasi tabel sementara.

      Parameter

      Deskripsi

      Catatan

      connector

      Jenis konektor.

      Atur nilainya menjadi `kafka`.

      topic

      Nama topik.

      Konsisten dengan Katalog JSON Kafka.

      properties.bootstrap.servers

      Alamat broker Kafka.

      Formatnya adalah host:port,host:port,host:port. Pisahkan beberapa alamat dengan koma (,).

      scan.startup.mode

      Offset awal tempat membaca data di Kafka.

      Nilai yang valid:

      • earliest-offset: Membaca data dari offset paling awal partisi Kafka.

      • latest-offset: Membaca data dari offset terbaru.

      • group-offsets (default): Membaca data dari offset yang telah dikomit oleh properties.group.id yang ditentukan.

      • timestamp: Membaca data dari timestamp yang ditentukan oleh scan.startup.timestamp-millis.

      • specific-offsets: Membaca data dari offset yang ditentukan oleh scan.startup.specific-offsets.

      Catatan

      Parameter ini hanya berlaku saat pekerjaan dimulai tanpa state. Jika pekerjaan dimulai ulang dari checkpoint atau state-nya dilanjutkan, pekerjaan akan memprioritaskan kemajuan yang disimpan dalam state untuk melanjutkan pembacaan.

      key.format

      Format yang digunakan oleh konektor Flink Kafka untuk serialisasi atau deserialisasi kunci pesan Kafka.

      Atur nilainya menjadi `json`.

      key.fields

      Bidang dalam tabel sumber atau sink yang sesuai dengan kunci pesan Kafka.

      Pisahkan beberapa nama bidang dengan titik koma (;). Contoh: field1;field2.

      key.fields-prefix

      Awalan kustom untuk semua kunci pesan Kafka guna menghindari konflik nama dengan bidang dalam nilai pesan atau metadata.

      Nilai ini harus sama dengan nilai parameter key.fields-prefix dalam katalog JSON Kafka.

      value.format

      Format yang digunakan oleh konektor Flink Kafka untuk serialisasi atau deserialisasi nilai pesan Kafka.

      Atur nilainya menjadi `json`.

      value.fields-prefix

      Awalan kustom untuk semua nilai pesan Kafka guna menghindari konflik nama dengan bidang dalam kunci pesan atau metadata.

      Nilai ini harus sama dengan nilai parameter value.fields-prefix dalam katalog JSON Kafka.

      value.fields-include

      Kebijakan untuk nilai pesan saat memproses bidang kunci pesan.

      Atur nilainya menjadi `EXCEPT_KEY`. Ini menunjukkan bahwa nilai pesan tidak mengandung bidang dari kunci pesan.

      value.json.infer-schema.flatten-nested-columns.enable

      Menentukan apakah akan memperluas kolom bersarang secara rekursif dalam JSON nilai pesan Kafka.

      Nilai ini sesuai dengan nilai parameter infer-schema.flatten-nested-columns.enable dalam katalog.

      value.json.infer-schema.primitive-as-string

      Menentukan apakah akan menginferensi semua tipe data primitif sebagai tipe STRING dalam nilai pesan Kafka.

      Nilai ini sesuai dengan nilai parameter infer-schema.primitive-as-string dalam katalog.

    2. Di pojok kanan atas, klik Deploy untuk mendeploy pekerjaan.

    3. Di panel navigasi, pilih Operation Center > Job O&M. Temukan pekerjaan target dan klik Start pada kolom Actions. Pilih Stateless Start, lalu klik Start.

  2. Lihat hasil pekerjaan.

    1. Di panel navigasi, pilih Operation Center > Job O&M, lalu klik pekerjaan target.

    2. Pada tab Job Logs, klik tab Running Task Managers, lalu klik tugas di bawah Path, ID.

    3. Klik Logs dan cari informasi log terkait PrintSinkOutputWriter.

      1.png

Referensi