All Products
Search
Document Center

Realtime Compute for Apache Flink:Distribute data in real time with Flink CDC and Kafka

Last Updated:Mar 17, 2026

This guide shows you how to build two Flink CDC pipelines that together move data from MySQL to a data lake in real time.

This guide shows you how to build two Flink CDC pipelines that together move data from MySQL to a data lake in real time:

  • Pipeline 1 — MySQL to Kafka: Capture MySQL binary logs and stream change events to ApsaraMQ for Kafka topics.

  • Pipeline 2 — Kafka to DLF: Read those change events from Kafka and ingest them into Data Lake Formation (DLF) via a Paimon sink.

MySQL (binlog) → [Pipeline 1] → ApsaraMQ for Kafka → [Pipeline 2] → DLF (Paimon)

Kafka acts as a durable, replayable buffer between the two jobs, decoupling capture from ingestion and enabling independent scaling, auditing, and replay.

Prerequisites

  • MySQL has binary logging enabled in ROW format.

  • An ApsaraMQ for Kafka cluster is available and reachable from the Flink job.

Pipeline 1: Stream MySQL binary logs to Kafka

Overview

This pipeline connects to a MySQL instance, reads binary log change events from all tables in the kafka_test database, and writes them to ApsaraMQ for Kafka topics. Each source table maps to its own Kafka topic.

Minimal configuration

The following configuration is the minimum required to run the pipeline:

source:
  type: mysql
  name: MySQL Source
  hostname: ${secret_values.mysql.hostname}
  port: ${mysql.port}
  username: ${secret_values.mysql.username}
  password: ${secret_values.mysql.password}
  tables: kafka_test\.\*
  server-id: 8601-8604

sink:
  type: kafka
  name: Kafka Sink
  properties.bootstrap.servers: ${kafka.bootstraps.server}
  properties.enable.idempotence: false
Warning

You must set properties.enable.idempotence: false in the sink configuration. ApsaraMQ for Kafka does not support idempotent or transactional writes.

Full configuration with optional parameters

The following configuration extends the minimal configuration above with optional parameters. Add only the parameters relevant to your use case.

source:
  type: mysql
  name: MySQL Source
  hostname: ${secret_values.mysql.hostname}
  port: ${mysql.port}
  username: ${secret_values.mysql.username}
  password: ${secret_values.mysql.password}
  tables: kafka_test\.\*
  server-id: 8601-8604
  # (Optional) Sync existing and incremental data from newly added tables.
  scan.newly-added-table.enabled: true
  # (Optional) Sync table and column comments.
  include-comments.enabled: true
  # (Optional) Prioritize distributing unbounded chunks during snapshot.
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  # (Optional) Deserialize changelog only for captured tables.
  scan.only.deserialize.captured.tables.changelog.enabled: true
  # (Optional) Include the data change timestamp as a metadata column.
  metadata-column.include-list: op_ts

sink:
  type: kafka
  name: Kafka Sink
  properties.bootstrap.servers: ${kafka.bootstraps.server}
  # Required: ApsaraMQ for Kafka does not support idempotent or transactional writes.
  properties.enable.idempotence: false
  # (Optional) Set the mapping between upstream tables and Kafka topics.
  sink.tableId-to-topic.mapping: kafka_test.customers:customers;kafka_test.products:products

Topic naming and mapping

By default, each source table maps to a Kafka topic named using the database.table format. For example, data from kafka_test.customers writes to a topic named kafka_test.customers.

To use custom topic names, set sink.tableId-to-topic.mapping. This option maps source table identifiers to Kafka topic names while preserving the original table name in the event payload for traceability. The example above maps kafka_test.customers to a topic named customers and kafka_test.products to a topic named products.

ApsaraMQ for Kafka supports json, canal-json, and debezium-json (default) output formats.

For all available configuration options, see Kafka connector.

Partition strategy

By default, all change events write to partition 0 of each topic (all-to-zero). This preserves global ordering within a topic but does not distribute load across partitions.

To distribute records across multiple partitions based on primary key hash, add the following to your sink configuration:

sink:
  partition.strategy: hash-by-key

With hash-by-key, all change events for a given primary key value always go to the same partition, preserving per-key ordering while enabling parallel consumption.

Expected output

When you execute an UPDATE statement on the customers table, the job produces a Kafka message in the configured format.

Debezium JSON format (debezium-json, default):

{
  "before": {
    "id": 4,
    "name": "John",
    "address": "New York",
    "phone_number": "2222",
    "age": 12
  },
  "after": {
    "id": 4,
    "name": "John",
    "address": "New York",
    "phone_number": "1234",
    "age": 12
  },
  "op": "u",
  "source": {
    "db": "kafka_test",
    "table": "customers",
    "ts_ms": 1728528674000
  }
}

Canal JSON format (canal-json):

{
  "old": [
    {
      "id": 4,
      "name": "John",
      "address": "New York",
      "phone_number": "2222",
      "age": 12
    }
  ],
  "data": [
    {
      "id": 4,
      "name": "John",
      "address": "New York",
      "phone_number": "1234",
      "age": 12
    }
  ],
  "type": "UPDATE",
  "database": "kafka_test",
  "table": "customers",
  "pkNames": [
    "id"
  ],
  "ts": 1728528674000,
  "es": 0
}
Note
  • The Debezium JSON format omits primary key information from the message payload. If a downstream consumer needs to identify primary keys, use Canal JSON format, or define them explicitly in the transform section of a downstream pipeline (as shown in Pipeline 2 below).

Pipeline 2: Ingest Kafka change events into DLF

Overview

This pipeline reads change events from a Kafka topic and writes them to Data Lake Formation (DLF) using a Paimon sink. It is designed to consume the output produced by Pipeline 1.

Consider a Kafka topic inventory that contains debezium-json binary log data from two tables, customers and products. When a single Kafka topic carries change events from multiple source tables, you must set debezium-json.distributed-tables: true so the connector can correctly route each record to its destination table.

Configuration

source:
  type: kafka
  name: Kafka Source
  properties.bootstrap.servers: ${kafka.bootstrap.servers}
  topic: inventory
  scan.startup.mode: earliest-offset
  value.format: debezium-json
  debezium-json.distributed-tables: true

transform:
  - source-table: \.*.\.*
    projection: \*
    primary-keys: id

sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  # (Optional) The username for commits. Set a unique commit user for each job to avoid conflicts.
  commit.user: your_job_name
  # (Optional) Enable deletion vectors to improve read performance.
  table.properties.deletion-vectors.enabled: true

Key configuration details

Parameter

Notes

debezium-json.distributed-tables

Set to true when a Kafka topic contains records from more than one source table. Without it, the connector cannot determine which destination table each record belongs to. For Canal JSON format, use canal-json.distributed-tables: true instead.

transform / primary-keys

The Debezium JSON format omits primary key information from the message payload. You must define primary keys in the transform section so that Paimon can perform upserts correctly. The example applies id as the primary key to all tables matched by \.*.\.*.

scan.startup.mode: earliest-offset

Reads from the beginning of each Kafka topic on startup.

schema.inference.strategy

Set a schema inference strategy for the Kafka source with the schema.inference.strategy option. For more information, see Kafka connector.

Usage notes

Note
  • The Kafka source connector supports reading data in canal-json and debezium-json (default) formats.

  • Set debezium-json.distributed-tables: true (or canal-json.distributed-tables: true) when data from multiple source tables is spread across partitions of the same topic, or when you need to merge distributed data.

  • Because Debezium JSON omits primary key information, you must always define primary keys explicitly in the transform module when using this format with a Paimon sink.