全部產品
Search
文件中心

Realtime Compute for Apache Flink:基於Flink CDC實現即時分發

更新時間:Sep 11, 2025

本文為您介紹使用資料攝入YAML作業將即時資料寫入常用訊息佇列的最佳實務。

資料庫全增量即時寫入Kafka

通過資料攝入YAML作業將來自MySQL的資料匯入到Kafka中後,您可以按照需要再將資料分發到下遊的不同系統中,避免多個作業直接連接業務庫,降低業務庫處理壓力。

MySQL Binlog資料同步到Kafka

在某些情境下,您希望儲存原始的Binlog資料,方便後續的資料審計、資料重放等工作。資料攝入YAML支援同步MySQL原始Binlog資料到Kafka,方便您分布式讀取Binlog資料,解決資料熱點問題。

假設資料庫kafka_test中有兩張表customers和products,下面的作業可以分別將表資料同步到customers和products兩個topic中。

source:
  type: mysql
  name: MySQL Source
  hostname: ${secret_values.mysql.hostname}
  port: ${mysql.port}
  username: ${secret_values.mysql.username}
  password: ${secret_values.mysql.password}
  tables: kafka_test.\.*
  server-id: 8601-8604
  #(可選)同步新增表的全量和增量資料
  scan.newly-added-table.enabled: true
  #(可選)同步表注釋和欄位注釋
  include-comments.enabled: true
  #(可選)優先分發無界的分區以避免可能出現的TaskManager OutOfMemory問題
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  #(可選)開啟解析過濾,加速讀取
  scan.only.deserialize.captured.tables.changelog.enabled: true
  # 將資料的變更時間作為中繼資料下發
  metadata-column.include-list: op_ts

sink:
  type: kafka
  name: Kafka Sink
  properties.bootstrap.servers: ${kafka.bootstraps.server}
  # 阿里雲Kafka不支援等冪和事務寫入,關閉等冪功能
  properties.enable.idempotence: false
  #(可選)設定上遊表與Kafka topic的映射關係
  sink.tableId-to-topic.mapping: kafka_test.customers:customers;kafka_test.products:products

customers表的一條Update語句產生Kafka訊息的訊息體格式如下:

// debezium-json
{
  "before": {
    "id": 4,
    "name": "John",
    "address": "New York",
    "phone_number": "2222",
    "age": 12
  },
  "after": {
    "id": 4,
    "name": "John",
    "address": "New York",
    "phone_number": "1234",
    "age": 12
  },
  "op": "u",
  "source": {
    "db": "kafka_test",
    "table": "customers",
    "ts_ms": 1728528674000
  }
}

// canal-json
{
  "old": [
    {
      "id": 4,
      "name": "John",
      "address": "New York",
      "phone_number": "2222",
      "age": 12
    }
  ],
  "data": [
    {
      "id": 4,
      "name": "John",
      "address": "New York",
      "phone_number": "1234",
      "age": 12
    }
  ],
  "type": "UPDATE",
  "database": "kafka_test",
  "table": "customers",
  "pkNames": [
    "id"
  ],
  "ts": 1728528674000,
  "es": 0
}
說明
  • 寫入的Binlog格式支援json、canal-json和debezium-json(預設),詳情請參見訊息佇列Kafka

  • 如果不使用sink.tableId-to-topic.mapping參數,在Kafka中會使用database.table的格式建立topic。例如MySQL表kafka_test.customers在Kafka中對應的topic名稱為kafka_test.customers。使用sink.tableId-to-topic.mapping可以配置上遊表與寫入topic的映射關係,能夠在寫出Kafka訊息保留源表表名的同時修改寫出的topic。詳情請參見訊息佇列Kafka

  • 預設所有資料寫入Topic的0號分區,可以使用partition.strategy配置進行調整,詳情請參見訊息佇列Kafka。例如可使用如下配置:partition.strategy: hash-by-key,每個表的資料會根據主鍵的雜湊值將資料寫到多個分區,並保證同一個主鍵的資料在同一個分區並且有序。

  • 阿里雲訊息佇列Kafka版不支援等冪和事務寫入,作為資料攝入目標端時,需要在資料攝入目標端添加配置項properties.enable.idempotence: false以關閉等冪寫入功能。

Kafka即時攝入到資料湖DLF

MySQL Binlog資料同步到Kafka為您提供了在Kafka中儲存MySQL資料的方案,進一步的,您可以配置資料攝入YAML同步Kafka資料到DLF儲存。

假設Kafka中名為inventory的topic中存有debezium-json格式的兩張表customers和products的資料,下面的作業可以將兩張表的資料分別同步到DLF對應的兩張表中:

source:
  type: kafka
  name: Kafka Source
  properties.bootstrap.servers: ${kafka.bootstrap.servers}
  topic: inventory
  scan.startup.mode: earliest-offset
  value.format: debezium-json
  debezium-json.distributed-tables: true

sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  #(可選)提交使用者名稱,建議為不同作業設定不同的提交使用者以避免衝突
  commit.user: your_job_name
  #(可選)開啟刪除向量,提升讀取效能
  table.properties.deletion-vectors.enabled: true

# debezium-json不包含主鍵資訊,需要另外為表添加主鍵  
transform:
  - source-table: \.*.\.*
    projection: \*
    primary-keys: id
說明
  • Kafka資料來源讀取的格式支援canal-json和debezium-json(預設)。

  • 當資料格式為debezium-json時,需要通過transform規則手動為表添加主鍵:

    transform:
      - source-table: \.*.\.*
        projection: \*
        primary-keys: id
  • 當單表的資料分布在多個分區中,或資料位元於不同分區中的表需要進行分庫分表合并時,需要將配置項debezium-json.distributed-tablescanal-json.distributed-tables設為true。

  • kafka資料來源支援多種Schema推導策略,可以通過配置項schema.inference.strategy設定,Schema推導和變更同步策略詳情請參見訊息佇列Kafka