This guide shows you how to build two Flink CDC pipelines that together move data from MySQL to a data lake in real time.
This guide shows you how to build two Flink CDC pipelines that together move data from MySQL to a data lake in real time:
Pipeline 1 — MySQL to Kafka: Capture MySQL binary logs and stream change events to ApsaraMQ for Kafka topics.
Pipeline 2 — Kafka to DLF: Read those change events from Kafka and ingest them into Data Lake Formation (DLF) via a Paimon sink.
MySQL (binlog) → [Pipeline 1] → ApsaraMQ for Kafka → [Pipeline 2] → DLF (Paimon)
Kafka acts as a durable, replayable buffer between the two jobs, decoupling capture from ingestion and enabling independent scaling, auditing, and replay.
Prerequisites
MySQL has binary logging enabled in
ROWformat.An ApsaraMQ for Kafka cluster is available and reachable from the Flink job.
Pipeline 1: Stream MySQL binary logs to Kafka
Overview
This pipeline connects to a MySQL instance, reads binary log change events from all tables in the kafka_test database, and writes them to ApsaraMQ for Kafka topics. Each source table maps to its own Kafka topic.
Minimal configuration
The following configuration is the minimum required to run the pipeline:
source:
type: mysql
name: MySQL Source
hostname: ${secret_values.mysql.hostname}
port: ${mysql.port}
username: ${secret_values.mysql.username}
password: ${secret_values.mysql.password}
tables: kafka_test\.\*
server-id: 8601-8604
sink:
type: kafka
name: Kafka Sink
properties.bootstrap.servers: ${kafka.bootstraps.server}
properties.enable.idempotence: false
You must set properties.enable.idempotence: false in the sink configuration. ApsaraMQ for Kafka does not support idempotent or transactional writes.
Full configuration with optional parameters
The following configuration extends the minimal configuration above with optional parameters. Add only the parameters relevant to your use case.
source:
type: mysql
name: MySQL Source
hostname: ${secret_values.mysql.hostname}
port: ${mysql.port}
username: ${secret_values.mysql.username}
password: ${secret_values.mysql.password}
tables: kafka_test\.\*
server-id: 8601-8604
# (Optional) Sync existing and incremental data from newly added tables.
scan.newly-added-table.enabled: true
# (Optional) Sync table and column comments.
include-comments.enabled: true
# (Optional) Prioritize distributing unbounded chunks during snapshot.
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (Optional) Deserialize changelog only for captured tables.
scan.only.deserialize.captured.tables.changelog.enabled: true
# (Optional) Include the data change timestamp as a metadata column.
metadata-column.include-list: op_ts
sink:
type: kafka
name: Kafka Sink
properties.bootstrap.servers: ${kafka.bootstraps.server}
# Required: ApsaraMQ for Kafka does not support idempotent or transactional writes.
properties.enable.idempotence: false
# (Optional) Set the mapping between upstream tables and Kafka topics.
sink.tableId-to-topic.mapping: kafka_test.customers:customers;kafka_test.products:products
Topic naming and mapping
By default, each source table maps to a Kafka topic named using the database.table format. For example, data from kafka_test.customers writes to a topic named kafka_test.customers.
To use custom topic names, set sink.tableId-to-topic.mapping. This option maps source table identifiers to Kafka topic names while preserving the original table name in the event payload for traceability. The example above maps kafka_test.customers to a topic named customers and kafka_test.products to a topic named products.
ApsaraMQ for Kafka supports json, canal-json, and debezium-json (default) output formats.
For all available configuration options, see Kafka connector.
Partition strategy
By default, all change events write to partition 0 of each topic (all-to-zero). This preserves global ordering within a topic but does not distribute load across partitions.
To distribute records across multiple partitions based on primary key hash, add the following to your sink configuration:
sink:
partition.strategy: hash-by-key
With hash-by-key, all change events for a given primary key value always go to the same partition, preserving per-key ordering while enabling parallel consumption.
Expected output
When you execute an UPDATE statement on the customers table, the job produces a Kafka message in the configured format.
Debezium JSON format (debezium-json, default):
{
"before": {
"id": 4,
"name": "John",
"address": "New York",
"phone_number": "2222",
"age": 12
},
"after": {
"id": 4,
"name": "John",
"address": "New York",
"phone_number": "1234",
"age": 12
},
"op": "u",
"source": {
"db": "kafka_test",
"table": "customers",
"ts_ms": 1728528674000
}
}
Canal JSON format (canal-json):
{
"old": [
{
"id": 4,
"name": "John",
"address": "New York",
"phone_number": "2222",
"age": 12
}
],
"data": [
{
"id": 4,
"name": "John",
"address": "New York",
"phone_number": "1234",
"age": 12
}
],
"type": "UPDATE",
"database": "kafka_test",
"table": "customers",
"pkNames": [
"id"
],
"ts": 1728528674000,
"es": 0
}
-
The Debezium JSON format omits primary key information from the message payload. If a downstream consumer needs to identify primary keys, use Canal JSON format, or define them explicitly in the
transformsection of a downstream pipeline (as shown in Pipeline 2 below).
Pipeline 2: Ingest Kafka change events into DLF
Overview
This pipeline reads change events from a Kafka topic and writes them to Data Lake Formation (DLF) using a Paimon sink. It is designed to consume the output produced by Pipeline 1.
Consider a Kafka topic inventory that contains debezium-json binary log data from two tables, customers and products. When a single Kafka topic carries change events from multiple source tables, you must set debezium-json.distributed-tables: true so the connector can correctly route each record to its destination table.
Configuration
source:
type: kafka
name: Kafka Source
properties.bootstrap.servers: ${kafka.bootstrap.servers}
topic: inventory
scan.startup.mode: earliest-offset
value.format: debezium-json
debezium-json.distributed-tables: true
transform:
- source-table: \.*.\.*
projection: \*
primary-keys: id
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (Optional) The username for commits. Set a unique commit user for each job to avoid conflicts.
commit.user: your_job_name
# (Optional) Enable deletion vectors to improve read performance.
table.properties.deletion-vectors.enabled: true
Key configuration details
Parameter |
Notes |
|---|---|
|
Set to |
|
The Debezium JSON format omits primary key information from the message payload. You must define primary keys in the |
|
Reads from the beginning of each Kafka topic on startup. |
|
Set a schema inference strategy for the Kafka source with the |
Usage notes
-
The Kafka source connector supports reading data in
canal-jsonanddebezium-json(default) formats. -
Set
debezium-json.distributed-tables: true(orcanal-json.distributed-tables: true) when data from multiple source tables is spread across partitions of the same topic, or when you need to merge distributed data. -
Because Debezium JSON omits primary key information, you must always define primary keys explicitly in the
transformmodule when using this format with a Paimon sink.