全部产品
Search
文档中心

Realtime Compute for Apache Flink:Sinkronisasi seluruh database MySQL ke Kafka menggunakan Flink CDC

更新时间:Jan 30, 2026

Topik ini menjelaskan cara menyinkronkan seluruh database MySQL ke Kafka. Metode ini membantu mengurangi beban yang ditimbulkan oleh beberapa job terhadap database MySQL.

Informasi latar belakang

Tabel Change Data Capture (CDC) MySQL mengambil data dari MySQL dan menyinkronkan modifikasi tabel secara real time. Tabel-tabel ini sering digunakan untuk komputasi kompleks. Misalnya, Anda dapat menggunakan tabel CDC MySQL sebagai tabel dimensi dan menggabungkannya (join) dengan tabel data lainnya. Satu tabel MySQL dapat menjadi dependensi bagi beberapa job. Saat job-job tersebut memproses data dari tabel yang sama, database MySQL akan membuat beberapa koneksi, sehingga menciptakan beban berat pada server dan jaringan MySQL.

Arsitektur solusi

Untuk mengurangi beban pada database MySQL hulu, Realtime Compute for Apache Flink memungkinkan Anda menyinkronkan seluruh database MySQL ke Kafka. Solusi ini menggunakan Kafka sebagai lapisan perantara. Sebuah job ingesti data Flink CDC menyinkronkan data ke Kafka.

Satu Pekerjaan menyinkronkan data dari database MySQL upstream ke Kafka secara real-time. Setiap tabel MySQL dituliskan ke topik Kafka yang sesuai dalam mode upsert. Pekerjaan downstream kemudian menggunakan konektor Upsert Kafka untuk membaca data dari topik-topik tersebut, bukan dengan mengakses tabel MySQL secara langsung. Pendekatan ini secara signifikan mengurangi beban pada database MySQL.

Gambar 1

Batasan

  • Tabel MySQL yang akan disinkronkan harus memiliki primary key.

  • Anda dapat menggunakan cluster Kafka yang dikelola sendiri, cluster EMR Kafka, atau ApsaraMQ for Kafka. Saat menggunakan ApsaraMQ for Kafka, Anda harus menggunakan endpoint default.

  • Cluster Kafka harus memiliki ruang penyimpanan yang lebih besar daripada tabel sumber. Jika tidak, kehilangan data dapat terjadi akibat ruang penyimpanan yang tidak mencukupi. Topik yang dibuat untuk sinkronisasi database dikompaksi. Pada topik yang dikompaksi, hanya pesan terbaru untuk setiap kunci yang dipertahankan, dan datanya tidak pernah kedaluwarsa. Artinya, topik tersebut menyimpan jumlah data yang kira-kira sama dengan ukuran tabel sumber.

Kasus penggunaan

Pertimbangkan analisis real-time terhadap ulasan pesanan. Anda memiliki tiga tabel: user, order, dan feedback. Gambar berikut menunjukkan data dalam tabel-tabel tersebut.mysql database

Untuk menampilkan detail pesanan dan ulasan pengguna, Anda harus melakukan join tabel `user` untuk mengambil username dari field `name`. Pernyataan SQL berikut menunjukkan cara melakukannya.

-- Gabungkan informasi pesanan dengan tabel user untuk menampilkan username dan nama produk untuk setiap pesanan.
SELECT order.id as order_id, product, user.name as user_name
FROM order LEFT JOIN user
ON order.user_id = user.id;

-- Gabungkan ulasan dengan tabel user untuk menampilkan isi setiap ulasan dan username yang sesuai.
SELECT feedback.id as feedback_id, comment, user.name as user_name
FROM feedback LEFT JOIN user
ON feedback.user_id = user.id;

Dalam kedua task SQL tersebut, tabel user digunakan oleh kedua job. Saat runtime, masing-masing job membaca data lengkap dan data inkremental dari MySQL. Pembacaan data lengkap memerlukan koneksi ke MySQL, sedangkan pembacaan data inkremental memerlukan client binary logging (Binlog). Seiring peningkatan jumlah job, kebutuhan akan koneksi MySQL dan sumber daya client Binlog juga meningkat, sehingga memberikan beban berat pada database hulu. Untuk mengurangi beban tersebut, Anda dapat menggunakan job ingest data Flink CDC untuk menyinkronkan data MySQL hulu ke Kafka secara real-time. Data tersebut kemudian tersedia untuk dikonsumsi oleh beberapa job hilir.

Prasyarat

Persiapan

Buat instans ApsaraDB RDS for MySQL dan siapkan sumber data

  1. Buat database. Untuk informasi selengkapnya, lihat Buat database.

    Buat database bernama order_dw untuk instans tujuan.

  2. Siapkan sumber data MySQL CDC.

    1. Pada halaman produk instans, klik Log on to Database di bagian atas.

    2. Pada halaman login DMS yang muncul, masukkan username dan password untuk akun database yang telah Anda buat, lalu klik Logon.

    3. Setelah login, double-click database order_dw di sebelah kiri untuk beralih ke database tersebut.

    4. Pada SQL Console, tulis pernyataan DDL untuk membuat tiga tabel bisnis dan pernyataan untuk memasukkan data.

      CREATE TABLE `user` (
        id bigint not null primary key,
        name varchar(50) not null
      );
      
      CREATE TABLE `order` (
        id bigint not null primary key,
        product varchar(50) not null,
        user_id bigint not null
      );
      
      CREATE TABLE `feedback` (
        id bigint not null primary key,
        user_id bigint not null,
        comment varchar(50) not null
      );
      
      -- Siapkan data
      INSERT INTO `user` VALUES(1, 'Tom'),(2, 'Jerry');
      
      INSERT INTO `order` VALUES
      (1, 'Football', 2),
      (2, 'Basket', 1);
      
      INSERT INTO `feedback` VALUES
      (1, 1, 'Good.'),
      (2, 2, 'Very good');
  3. Klik Execute, lalu klik Execute Directly.

Prosedur

  1. Buat dan jalankan tugas ingesti data Flink CDC untuk menyinkronkan data dari database MySQL upstream ke Kafka secara real-time, sehingga data tersedia untuk berbagai pekerjaan downstream. Pekerjaan tersebut secara otomatis membuat topik. Anda dapat menentukan nama topik menggunakan modul `route`. Jumlah partisi dan replika topik mengikuti pengaturan default Kluster Kafka, sedangkan `cleanup.policy` diatur ke `compact`.

    Nama topik default

    Task sinkronisasi database penuh membuat topik Kafka menggunakan format penamaan default yang menggabungkan nama database MySQL dan nama tabel, dipisahkan oleh titik. Misalnya, sebuah job dapat membuat topik seperti order_dw.user, order_dw.order, dan order_dw.feedback.

    1. Pada halaman Data Studio > Data Ingestion, buat job ingesti data Flink CDC baru, lalu salin kode berikut ke editor YAML.

      source:
        type: mysql
        name: MySQL Source
        hostname: #{hostname}
        port: 3306
        username: #{usernmae}
        password: #{password}
        tables: order_dw.\.*
        server-id: 28601-28604
        # (Opsional) Sinkronkan data dari tabel yang baru dibuat selama fase inkremental.
        scan.binlog.newly-added-table.enabled: true
        # (Opsional) Sinkronkan komentar tabel dan field.
        include-comments.enabled: true
        # (Opsional) Utamakan chunk tak terbatas untuk mencegah potensi masalah OutOfMemory pada TaskManager.
        scan.incremental.snapshot.unbounded-chunk-first.enabled: true
        # (Opsional) Aktifkan filter parsing untuk mempercepat pembacaan.
        scan.only.deserialize.captured.tables.changelog.enabled: true
      
      sink:
        type: upsert-kafka
        name: upsert-kafka Sink
        properties.bootstrap.servers: xxxx.alikafka.aliyuncs.com:9092
        # Parameter berikut diperlukan untuk ApsaraMQ for Kafka
        aliyun.kafka.accessKeyId: #{ak}
        aliyun.kafka.accessKeySecret: #{sk}
        aliyun.kafka.instanceId: #{instanceId}
        aliyun.kafka.endpoint: #{endpoint}
        aliyun.kafka.regionId: #{regionId}
    2. Di pojok kanan atas, klik Deploy untuk mendeploy job.

    3. Pada panel navigasi kiri, pilih Operation Center > Job O&M. Pada kolom Actions job target, klik Start. Pilih Stateless Start, lalu klik Start.

    Tentukan nama topik

    Anda dapat menggunakan modul `route` dalam task sinkronisasi database untuk menentukan nama topik untuk setiap tabel. Job berikut membuat tiga topik: `user1`, `order2`, dan `feedback3`.

    1. Pada halaman Data Studio > Data Ingestion, buat job ingesti data Flink CDC baru, lalu salin kode berikut ke editor YAML.

      source:
        type: mysql
        name: MySQL Source
        hostname: #{hostname}
        port: 3306
        username: #{usernmae}
        password: #{password}
        tables: order_dw.\.*
        server-id: 28601-28604
        # (Opsional) Sinkronkan data dari tabel yang baru dibuat selama fase inkremental.
        scan.binlog.newly-added-table.enabled: true
        # (Opsional) Sinkronkan komentar tabel dan field.
        include-comments.enabled: true
        # (Opsional) Utamakan chunk tak terbatas untuk mencegah potensi masalah OutOfMemory pada TaskManager.
        scan.incremental.snapshot.unbounded-chunk-first.enabled: true
        # (Opsional) Aktifkan filter parsing untuk mempercepat pembacaan.
        scan.only.deserialize.captured.tables.changelog.enabled: true
        
      route:
        - source-table: order_dw.user
          sink-table: user1
        - source-table: order_dw.order
          sink-table: order2
        - source-table: order_dw.feedback
          sink-table: feedback3
      
      sink:
        type: upsert-kafka
        name: upsert-kafka Sink
        properties.bootstrap.servers: xxxx.alikafka.aliyuncs.com:9092
        # Parameter berikut diperlukan untuk ApsaraMQ for Kafka
        aliyun.kafka.accessKeyId: #{ak}
        aliyun.kafka.accessKeySecret: #{sk}
        aliyun.kafka.instanceId: #{instanceId}
        aliyun.kafka.endpoint: #{endpoint}
        aliyun.kafka.regionId: #{regionId}
    2. Di pojok kanan atas, klik Deploy untuk mendeploy job.

    3. Pada panel navigasi kiri, pilih Operation Center > Job O&M. Pada kolom Actions job target, klik Start. Pilih Stateless Start, lalu klik Start.

    Tentukan nama topik secara batch

    Anda dapat menggunakan modul `route` dalam task sinkronisasi database untuk menentukan pola nama topik yang dihasilkan secara batch. Job berikut membuat tiga topik: `topic_user`, `topic_order`, dan `topic_feedback`.

    1. Pada halaman Data Studio > Data Ingestion, buat job ingesti data Flink CDC baru, lalu salin kode berikut ke editor YAML.

      source:
        type: mysql
        name: MySQL Source
        hostname: #{hostname}
        port: 3306
        username: #{usernmae}
        password: #{password}
        tables: order_dw.\.*
        server-id: 28601-28604
        # (Opsional) Sinkronkan data dari tabel yang baru dibuat selama fase inkremental.
        scan.binlog.newly-added-table.enabled: true
        # (Opsional) Sinkronkan komentar tabel dan field.
        include-comments.enabled: true
        # (Opsional) Utamakan chunk tak terbatas untuk mencegah potensi masalah OutOfMemory pada TaskManager.
        scan.incremental.snapshot.unbounded-chunk-first.enabled: true
        # (Opsional) Aktifkan filter parsing untuk mempercepat pembacaan.
        scan.only.deserialize.captured.tables.changelog.enabled: true
        
      route:
        - source-table: order_dw.\.*
          sink-table: topic_<>
          replace-symbol: <>
      
      sink:
        type: upsert-kafka
        name: upsert-kafka Sink
        properties.bootstrap.servers: xxxx.alikafka.aliyuncs.com:9092
        # Parameter berikut diperlukan untuk ApsaraMQ for Kafka
        aliyun.kafka.accessKeyId: #{ak}
        aliyun.kafka.accessKeySecret: #{sk}
        aliyun.kafka.instanceId: #{instanceId}
        aliyun.kafka.endpoint: #{endpoint}
        aliyun.kafka.regionId: #{regionId}
    2. Di pojok kanan atas, klik Deploy untuk mendeploy job.

    3. Pada panel navigasi kiri, pilih Operation Center > Job O&M. Pada kolom Actions job target, klik Start. Pilih Stateless Start, lalu klik Start.

  1. Konsumsi data Kafka secara real time.

    Data dari database MySQL sumber ditulis ke Kafka dalam format JSON. Satu topik Kafka dapat dikonsumsi oleh beberapa pekerjaan hilir yang membaca data dari topik tersebut untuk mendapatkan data tabel terbaru. Anda dapat mengonsumsi data dari tabel yang disinkronkan dengan dua cara berikut:

    Konsumsi data langsung melalui katalog

    Baca data dari topik Kafka sebagai tabel sumber.

    1. Pada halaman Data Development > ETL, buat job stream SQL baru dan salin kode berikut ke editor SQL.

      CREATE TEMPORARY TABLE print_user_proudct(
        order_id BIGINT,
        product STRING,
        user_name STRING
      ) WITH (
        'connector'='print',
        'logger'='true'
      );
      
      CREATE TEMPORARY TABLE print_user_feedback(
        feedback_id BIGINT,
        `comment` STRING,
        user_name STRING
      ) WITH (
        'connector'='print',
        'logger'='true'
      );
      
      BEGIN STATEMENT SET;      -- Diperlukan saat menulis ke beberapa sink.
      
      -- Gabungkan informasi pesanan dengan tabel user dalam katalog JSON Kafka untuk menampilkan username dan nama produk untuk setiap pesanan.
      INSERT INTO print_user_proudct
      SELECT `order`.key_id as order_id, value_product as product, `user`.value_name as user_name
      FROM `kafka-catalog`.`kafka`.`order`/*+OPTIONS('properties.group.id'='<yourGroupName>', 'scan.startup.mode'='earliest-offset')*/  as `order` -- Tentukan group dan startup mode
      LEFT JOIN `kafka-catalog`.`kafka`.`user`/*+OPTIONS('properties.group.id'='<yourGroupName>', 'scan.startup.mode'='earliest-offset')*/ as `user` -- Tentukan group dan startup mode
      ON `order`.value_user_id = `user`.key_id;
      
      -- Gabungkan ulasan dengan tabel user untuk menampilkan isi setiap ulasan dan username yang sesuai.
      INSERT INTO print_user_feedback
      SELECT feedback.key_id as feedback_id, value_comment as `comment`, `user`.value_name as user_name
      FROM `kafka-catalog`.`kafka`.feedback/*+OPTIONS('properties.group.id'='<yourGroupName>', 'scan.startup.mode'='earliest-offset')*/  as feedback  -- Tentukan group dan startup mode
      LEFT JOIN `kafka-catalog`.`kafka`.`user`/*+OPTIONS('properties.group.id'='<yourGroupName>', 'scan.startup.mode'='earliest-offset')*/ as `user` -- Tentukan group dan startup mode
      ON feedback.value_user_id = `user`.key_id;
      
      END;      -- Diperlukan saat menulis ke beberapa sink.

      Contoh ini menggunakan konektor Print untuk mencetak hasil secara langsung. Anda juga dapat mengeluarkan hasil ke tabel sink konektor untuk analisis dan komputasi lebih lanjut. Untuk informasi selengkapnya tentang sintaks menulis ke beberapa sink, lihat Pernyataan INSERT INTO.

      Catatan

      Jika skema tabel MySQL diubah selama sinkronisasi data, skema yang diurai oleh katalog JSON Kafka mungkin tidak sinkron dengan skema tabel aktual. Misalnya, jika field dihapus dari tabel MySQL tetapi masih muncul dalam skema yang diurai dari katalog, nilai untuk field tersebut mungkin null.

      Skema yang dibaca dari Katalog terdiri dari field-field dari data yang dikonsumsi. Jika suatu field dihapus tetapi pesan yang memuatnya belum kedaluwarsa, skema mungkin mencakup field yang sudah tidak ada lagi. Nilai field tersebut adalah null. Situasi ini tidak memerlukan penanganan khusus.

    2. Di pojok kanan atas, klik Deploy untuk mendeploy job.

    3. Pada panel navigasi kiri, pilih Operation Center > Job O&M. Pada kolom Actions job target, klik Start. Pilih Stateless Start, lalu klik Start.

    Konsumsi data dengan membuat tabel temporary

    Definisikan skema kustom dan baca data dari tabel temporary.

    1. Pada halaman Data Development > ETL, Anda dapat membuat job stream SQL baru dan salin kode berikut ke editor SQL.

      CREATE TEMPORARY TABLE user_source (
        key_id BIGINT,
        value_name STRING
      ) WITH (
        'connector' = 'kafka',
        'topic' = 'user',
        'properties.bootstrap.servers' = '<yourKafkaBrokers>',
        'scan.startup.mode' = 'earliest-offset',
        'key.format' = 'json',
        'value.format' = 'json',
        'key.fields' = 'key_id',
        'key.fields-prefix' = 'key_',
        'value.fields-prefix' = 'value_',
        'value.fields-include' = 'EXCEPT_KEY',
        'value.json.infer-schema.flatten-nested-columns.enable' = 'false',
        'value.json.infer-schema.primitive-as-string' = 'false'
      );
      
      CREATE TEMPORARY TABLE order_source (
        key_id  BIGINT,
        value_product STRING,
        value_user_id BIGINT  
      ) WITH (
        'connector' = 'kafka',
        'topic' = 'order',
        'properties.bootstrap.servers' = '<yourKafkaBrokers>',
        'scan.startup.mode' = 'earliest-offset',
        'key.format' = 'json',
        'value.format' = 'json',
        'key.fields' = 'key_id',
        'key.fields-prefix' = 'key_',
        'value.fields-prefix' = 'value_',
        'value.fields-include' = 'EXCEPT_KEY',
        'value.json.infer-schema.flatten-nested-columns.enable' = 'false',
        'value.json.infer-schema.primitive-as-string' = 'false'
      );
      
      CREATE TEMPORARY TABLE feedback_source (
        key_id  BIGINT,
        value_user_id BIGINT,
        value_comment STRING
      ) WITH (
        'connector' = 'kafka',
        'topic' = 'feedback',
        'properties.bootstrap.servers' = '<yourKafkaBrokers>',
        'scan.startup.mode' = 'earliest-offset',
        'key.format' = 'json',
        'value.format' = 'json',
        'key.fields' = 'key_id',
        'key.fields-prefix' = 'key_',
        'value.fields-prefix' = 'value_',
        'value.fields-include' = 'EXCEPT_KEY',
        'value.json.infer-schema.flatten-nested-columns.enable' = 'false',
        'value.json.infer-schema.primitive-as-string' = 'false'
      );
      
      CREATE TEMPORARY TABLE print_user_proudct(
        order_id BIGINT,
        product STRING,
        user_name STRING
      ) WITH (
        'connector'='print',
        'logger'='true'
      );
      
      CREATE TEMPORARY TABLE print_user_feedback(
        feedback_id BIGINT,
        `comment` STRING,
        user_name STRING
      ) WITH (
        'connector'='print',
        'logger'='true'
      );
      
      BEGIN STATEMENT SET;      -- Diperlukan saat menulis ke beberapa sink.
      -- Gabungkan informasi pesanan dengan tabel user dalam katalog JSON Kafka untuk menampilkan username dan nama produk untuk setiap pesanan.
      INSERT INTO print_user_proudct
      SELECT order_source.key_id as order_id, value_product as product, user_source.value_name as user_name
      FROM order_source LEFT JOIN user_source
      ON order_source.value_user_id = user_source.key_id;
      
      
      -- Gabungkan ulasan dengan tabel user untuk menampilkan isi setiap ulasan dan username yang sesuai.
      INSERT INTO print_user_feedback
      SELECT feedback_source.key_id as feedback_id, value_comment as `comment`, user_source.value_name as user_name
      FROM feedback_source  LEFT JOIN user_source
      ON feedback_source.value_user_id = user_source.key_id;
      
      END;      -- Diperlukan saat menulis ke beberapa sink.

      Contoh ini menggunakan konektor Print untuk mencetak hasil secara langsung. Anda juga dapat mengeluarkan hasil ke tabel sink konektor untuk analisis dan komputasi lebih lanjut. Untuk informasi selengkapnya tentang sintaks menulis ke beberapa sink, lihat Pernyataan INSERT INTO.

      Tabel berikut menjelaskan parameter untuk mengonfigurasi tabel temporary.

      Parameter

      Deskripsi

      Catatan

      connector

      Jenis konektor.

      Nilainya harus `kafka`.

      topic

      Nama topik yang sesuai.

      Harus sama dengan deskripsi dalam katalog JSON Kafka.

      properties.bootstrap.servers

      Alamat broker Kafka.

      Formatnya adalah host:port,host:port,host:port. Gunakan koma (,) untuk memisahkan alamat.

      scan.startup.mode

      Offset awal untuk membaca data dari Kafka.

      Nilai yang valid:

      • earliest-offset: Baca dari partisi paling awal di Kafka.

      • latest-offset: Baca dari offset terbaru di Kafka.

      • group-offsets (default): Baca dari offset yang telah dikomit oleh properties.group.id yang ditentukan.

      • timestamp: Baca dari timestamp yang ditentukan oleh scan.startup.timestamp-millis.

      • specific-offsets: Baca dari offset yang ditentukan oleh scan.startup.specific-offsets.

      Catatan

      Parameter ini hanya berlaku saat job dimulai tanpa state. Jika job dimulai ulang dari checkpoint atau memulihkan state-nya, job akan memprioritaskan menggunakan progres yang disimpan dalam state untuk melanjutkan pembacaan.

      key.format

      Format yang digunakan oleh konektor Flink Kafka untuk serialisasi atau deserialisasi kunci pesan Kafka.

      Nilainya harus `json`.

      key.fields

      Field-field dalam tabel sumber atau sink yang sesuai dengan kunci pesan Kafka.

      Gunakan titik koma (;) untuk memisahkan beberapa nama field. Contohnya, field1;field2.

      key.fields-prefix

      Awalan kustom untuk semua kunci pesan Kafka guna menghindari konflik nama dengan field dalam nilai pesan atau metadata.

      Harus sama dengan nilai parameter key.fields-prefix dalam katalog JSON Kafka.

      value.format

      Format yang digunakan oleh konektor Flink Kafka untuk serialisasi atau deserialisasi nilai pesan Kafka.

      Nilainya harus `json`.

      value.fields-prefix

      Awalan kustom untuk semua nilai pesan Kafka guna menghindari konflik nama dengan field dalam kunci pesan atau metadata.

      Harus sama dengan nilai parameter value.fields-prefix dalam katalog JSON Kafka.

      value.fields-include

      Menentukan kebijakan penanganan field kunci pesan dalam isi pesan.

      Nilai tetap EXCEPT_KEY menunjukkan bahwa isi pesan tidak mengandung field kunci pesan.

      value.json.infer-schema.flatten-nested-columns.enable

      Menentukan apakah akan memperluas kolom bersarang secara rekursif dalam JSON nilai pesan Kafka.

      Sesuai dengan nilai parameter infer-schema.flatten-nested-columns.enable dalam katalog.

      value.json.infer-schema.primitive-as-string

      Menentukan apakah akan menginferensi semua tipe data primitif sebagai tipe String dalam nilai pesan Kafka.

      Sesuai dengan nilai parameter infer-schema.primitive-as-string dalam katalog.

    2. Di pojok kanan atas, klik Deploy untuk mendeploy job.

    3. Pada panel navigasi kiri, pilih Operation Center > Job O&M. Pada kolom Actions job target, klik Start. Pilih Stateless Start, lalu klik Start.

  2. Lihat hasil pekerjaan.

    1. Pada panel navigasi kiri, pilih Operation Center > Job O&M, lalu klik job target.

    2. Pada tab Job Logs, klik task di bawah Path, ID pada tab Running Task Managers.

    3. Klik Logs dan cari informasi log terkait PrintSinkOutputWriter di halaman tersebut.

      1.png

Referensi