All Products
Search
Document Center

Realtime Compute for Apache Flink:Distribute data in real time with Flink CDC and Kafka

Last Updated:Nov 14, 2025

This guide shows how to write real-time data to a message queue with Flink CDC.

Stream data to Kafka in real time

Leverage Kafka as an intermediate buffer between your source and destination systems to effectively offload processing and reduce the load on the source.

Synchronize MySQL binary logs to Kafka

To enable real-time data auditing and replay capabilities, create a data ingestion job that streams binary logs directly from MySQL to Kafka. This strategy facilitates distributed reading of the binary logging data, thereby preventing data hotspots and ensuring a more resilient data pipeline.

Example scenario: Consider a MySQL database named kafka_test containing two tables: customers and products. The following job configuration synchronizes the data from these tables to their corresponding Kafka topics, customers and products, in real time.

source:
  type: mysql
  name: MySQL Source
  hostname: ${secret_values.mysql.hostname}
  port: ${mysql.port}
  username: ${secret_values.mysql.username}
  password: ${secret_values.mysql.password}
  tables: kafka_test.\.*
  server-id: 8601-8604
  # (Optional) Sync existing and incremental data from new tables.
  scan.newly-added-table.enabled: true
  # (Optional) Sync table and field comments.
  include-comments.enabled: true
  # (Optional) Prioritize distributing unbounded chunks to avoid potential TM OOMs.
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  # (Optional) Deserialize changelog of captured tables.
  scan.only.deserialize.captured.tables.changelog.enabled: true
  # Send data change time as metadata.
  metadata-column.include-list: op_ts

sink:
  type: kafka
  name: Kafka Sink
  properties.bootstrap.servers: ${kafka.bootstraps.server}
  # Disable idempotence, because ApsaraMQ for Kafka does not support idempotent or transactional writes.
  properties.enable.idempotence: false
  # (Optional) Set the mapping between upstream tables and Kafka topics.
  sink.tableId-to-topic.mapping: kafka_test.customers:customers;kafka_test.products:products

An UPDATE statement on the customers table produces a Kafka message with the following body:

// debezium-json
{
  "before": {
    "id": 4,
    "name": "John",
    "address": "New York",
    "phone_number": "2222",
    "age": 12
  },
  "after": {
    "id": 4,
    "name": "John",
    "address": "New York",
    "phone_number": "1234",
    "age": 12
  },
  "op": "u",
  "source": {
    "db": "kafka_test",
    "table": "customers",
    "ts_ms": 1728528674000
  }
}

// canal-json
{
  "old": [
    {
      "id": 4,
      "name": "John",
      "address": "New York",
      "phone_number": "2222",
      "age": 12
    }
  ],
  "data": [
    {
      "id": 4,
      "name": "John",
      "address": "New York",
      "phone_number": "1234",
      "age": 12
    }
  ],
  "type": "UPDATE",
  "database": "kafka_test",
  "table": "customers",
  "pkNames": [
    "id"
  ],
  "ts": 1728528674000,
  "es": 0
}
Note
  • ApsaraMQ for Kafka supports json, canal-json, or debezium-json (default) formats.

  • Configure the sink.tableId-to-topic.mapping option to define how source table identifiers (like MySQL table names) are mapped to destination Kafka topic names. This option allows you to explicitly set custom Kafka topics for your source tables while still preserving the original table name information for traceability. If this option is not configured, Kafka topics will default to a database.table naming format, meaning data from kafka_test.customers would be written to a topic named kafka_test.customers. For more information, see Kafka connector.

  • By default, all data is directed to partition 0 of a topic. To customize this behavior, utilize the partition.strategy configuration. For example, setting partition.strategy: hash-by-key ensures that data from each table is distributed across multiple partitions based on the hash of its primary key. This strategy guarantees that all records associated with a specific primary key are consistently written to the same partition, thereby preserving their original order.

  • ApsaraMQ for Kafka does not support idempotence or transactional writes. Therefore, you must add the properties.enable.idempotence: false option in the sink configuration to disable idempotence.

Ingest data from Kafka to DLF in real time

Create another data ingestion job to stream the data from Kafka to Data Lake Formation (DLF).

Consider a Kafka topic inventory containing debezium-json binary log data of two tables, customers and products. The following code synchronizes the data to DLF.

source:
  type: kafka
  name: Kafka Source
  properties.bootstrap.servers: ${kafka.bootstrap.servers}
  topic: inventory
  scan.startup.mode: earliest-offset
  value.format: debezium-json
  debezium-json.distributed-tables: true

sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  # (Optional) The username for commits. Set a unique commit user for each job to avoid conflicts.
  commit.user: your_job_name
  # (Optional) Enable deletion vectors to improve read performance.
  table.properties.deletion-vectors.enabled: true

# debezium-json does not include primary key information. You must add primary keys to the tables.
transform:
  - source-table: \.*.\.*
    projection: \*
    primary-keys: id
Note
  • The Kafka source connector supports reading data in canal-json and debezium-json (default) formats.

  • Since Debezium JSON format omits primary key information, you must explicitly define it within the transform module:

    transform:
      - source-table: \.*.\.*
        projection: \*
        primary-keys: id
  • Set the debezium-json.distributed-tables or canal-json.distributed-tables option to true if data is spread across multiple partitions, or if you need to merge distributed data.

  • Set a schema inference strategy for the Kafka source with the schema.inference.strategy option. For more information, see Kafka connector.