全部产品
Search
文档中心

Realtime Compute for Apache Flink:Distribusi data real-time menggunakan Flink CDC

更新时间:Nov 10, 2025

Topik ini menjelaskan praktik terbaik untuk menulis data real-time ke antrian pesan umum menggunakan pekerjaan YAML ingesti data.

Tulis data database lengkap dan inkremental ke Kafka secara real-time

Anda dapat mengimpor data dari MySQL ke Kafka menggunakan pekerjaan YAML ingesti data, lalu mendistribusikannya ke berbagai sistem downstream sesuai kebutuhan. Pendekatan ini menghindari koneksi langsung dari beberapa pekerjaan ke database bisnis dan mengurangi beban pemrosesan.

Sinkronkan data binary logging MySQL ke Kafka

Dalam beberapa skenario, Anda mungkin perlu menyimpan data binary logging mentah untuk tugas seperti audit atau replay data. Pekerjaan YAML ingesti data mendukung sinkronisasi data binary logging MySQL mentah ke Kafka, memungkinkan pembacaan data secara terdistribusi dan membantu mengatasi masalah hot spot data.

Misalnya, database kafka_test berisi dua tabel, customers dan products. Pekerjaan berikut menyinkronkan data dari kedua tabel tersebut ke dua topik dengan nama yang sama: customers dan products.

source:
  type: mysql
  name: MySQL Source
  hostname: ${secret_values.mysql.hostname}
  port: ${mysql.port}
  username: ${secret_values.mysql.username}
  password: ${secret_values.mysql.password}
  tables: kafka_test.\.*
  server-id: 8601-8604
  # (Opsional) Sinkronkan data lengkap dan inkremental dari tabel yang baru ditambahkan.
  scan.newly-added-table.enabled: true
  # (Opsional) Sinkronkan komentar tabel dan bidang.
  include-comments.enabled: true
  # (Opsional) Utamakan mendistribusikan chunk tak terbatas untuk menghindari potensi kesalahan OutOfMemory pada TaskManager.
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  # (Opsional) Aktifkan filter penguraian untuk mempercepat pembacaan.
  scan.only.deserialize.captured.tables.changelog.enabled: true
  # Kirim waktu perubahan data sebagai metadata.
  metadata-column.include-list: op_ts

sink:
  type: kafka
  name: Kafka Sink
  properties.bootstrap.servers: ${kafka.bootstraps.server}
  # Alibaba Cloud Kafka tidak mendukung penulisan idempoten atau transaksional. Nonaktifkan fitur idempotensi.
  properties.enable.idempotence: false
  # (Opsional) Tetapkan pemetaan antara tabel hulu dan topik Kafka.
  sink.tableId-to-topic.mapping: kafka_test.customers:customers;kafka_test.products:products

Kode berikut menunjukkan format isi pesan Kafka yang dihasilkan oleh pernyataan UPDATE pada tabel customers:

// debezium-json
{
  "before": {
    "id": 4,
    "name": "John",
    "address": "New York",
    "phone_number": "2222",
    "age": 12
  },
  "after": {
    "id": 4,
    "name": "John",
    "address": "New York",
    "phone_number": "1234",
    "age": 12
  },
  "op": "u",
  "source": {
    "db": "kafka_test",
    "table": "customers",
    "ts_ms": 1728528674000
  }
}

// canal-json
{
  "old": [
    {
      "id": 4,
      "name": "John",
      "address": "New York",
      "phone_number": "2222",
      "age": 12
    }
  ],
  "data": [
    {
      "id": 4,
      "name": "John",
      "address": "New York",
      "phone_number": "1234",
      "age": 12
    }
  ],
  "type": "UPDATE",
  "database": "kafka_test",
  "table": "customers",
  "pkNames": [
    "id"
  ],
  "ts": 1728528674000,
  "es": 0
}
Catatan
  • Log biner dapat ditulis dalam format json, canal-json, atau debezium-json (default). Untuk informasi selengkapnya, lihat Message Queue for Apache Kafka.

  • Jika Anda tidak menggunakan parameter sink.tableId-to-topic.mapping, topik dibuat di Kafka menggunakan format database.table. Misalnya, untuk tabel MySQL kafka_test.customers, nama topik yang sesuai di Kafka adalah kafka_test.customers. Anda dapat menggunakan sink.tableId-to-topic.mapping untuk mengonfigurasi pemetaan antara tabel hulu dan topik tujuan. Hal ini memungkinkan Anda mengubah nama topik tujuan sambil tetap mempertahankan nama tabel sumber dalam pesan Kafka. Untuk informasi selengkapnya, lihat Message Queue for Apache Kafka.

  • Secara default, semua data ditulis ke partisi 0 topik. Anda dapat menyesuaikan perilaku ini melalui konfigurasi partition.strategy. Untuk informasi selengkapnya, lihat Message Queue for Apache Kafka. Sebagai contoh, pengaturan partition.strategy: hash-by-key menulis data dari setiap tabel ke beberapa partisi berdasarkan nilai hash kunci primer, sehingga data dengan kunci primer yang sama dikirim ke partisi yang sama dan tetap terurut.

  • Alibaba Cloud Message Queue for Apache Kafka tidak mendukung penulisan idempoten atau transaksional. Saat menggunakannya sebagai tujuan ingesti data, Anda harus menambahkan item konfigurasi properties.enable.idempotence: false ke konfigurasi sink untuk menonaktifkan fitur idempotensi.

Ingesti real-time dari Kafka ke DLF

Menyinkronkan data binary logging MySQL ke Kafka memberikan solusi untuk menyimpan data MySQL di Kafka. Selanjutnya, Anda dapat mengonfigurasi pekerjaan YAML ingesti data untuk menyinkronkan data tersebut ke penyimpanan DLF.

Misalnya, topik Kafka bernama inventory berisi data dari dua tabel, customers dan products, dalam format debezium-json. Pekerjaan berikut menyinkronkan data dari kedua tabel tersebut ke tabel yang sesuai di DLF:

source:
  type: kafka
  name: Kafka Source
  properties.bootstrap.servers: ${kafka.bootstrap.servers}
  topic: inventory
  scan.startup.mode: earliest-offset
  value.format: debezium-json
  debezium-json.distributed-tables: true

sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  # (Opsional) Nama pengguna untuk mengirimkan pekerjaan. Tetapkan nama pengguna unik untuk setiap pekerjaan guna menghindari konflik.
  commit.user: your_job_name
  # (Opsional) Aktifkan deletion vectors untuk meningkatkan kinerja baca.
  table.properties.deletion-vectors.enabled: true

# debezium-json tidak menyertakan informasi kunci primer. Anda harus menambahkan kunci primer ke tabel.
transform:
  - source-table: \.*.\.*
    projection: \*
    primary-keys: id
Catatan
  • Sumber data Kafka mendukung pembacaan data dalam format canal-json dan debezium-json (default).

  • Jika format datanya adalah debezium-json, Anda harus menambahkan kunci primer ke tabel secara manual menggunakan aturan transformasi:

    transform:
      - source-table: \.*.\.*
        projection: \*
        primary-keys: id
  • Jika data satu tabel didistribusikan ke beberapa partisi, atau jika tabel-tabel dari partisi berbeda perlu digabung setelah sharding, tetapkan item konfigurasi debezium-json.distributed-tables atau canal-json.distributed-tables menjadi true.

  • Sumber data Kafka mendukung beberapa kebijakan inferensi skema. Anda dapat menetapkan kebijakan tersebut melalui item konfigurasi schema.inference.strategy. Untuk informasi selengkapnya tentang kebijakan inferensi skema dan sinkronisasi perubahan, lihat Message Queue for Apache Kafka.