This guide shows how to write real-time data to a message queue with Flink CDC.
Stream data to Kafka in real time
Leverage Kafka as an intermediate buffer between your source and destination systems to effectively offload processing and reduce the load on the source.
Synchronize MySQL binary logs to Kafka
To enable real-time data auditing and replay capabilities, create a data ingestion job that streams binary logs directly from MySQL to Kafka. This strategy facilitates distributed reading of the binary logging data, thereby preventing data hotspots and ensuring a more resilient data pipeline.
Example scenario: Consider a MySQL database named kafka_test containing two tables: customers and products. The following job configuration synchronizes the data from these tables to their corresponding Kafka topics, customers and products, in real time.
source:
type: mysql
name: MySQL Source
hostname: ${secret_values.mysql.hostname}
port: ${mysql.port}
username: ${secret_values.mysql.username}
password: ${secret_values.mysql.password}
tables: kafka_test.\.*
server-id: 8601-8604
# (Optional) Sync existing and incremental data from new tables.
scan.newly-added-table.enabled: true
# (Optional) Sync table and field comments.
include-comments.enabled: true
# (Optional) Prioritize distributing unbounded chunks to avoid potential TM OOMs.
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (Optional) Deserialize changelog of captured tables.
scan.only.deserialize.captured.tables.changelog.enabled: true
# Send data change time as metadata.
metadata-column.include-list: op_ts
sink:
type: kafka
name: Kafka Sink
properties.bootstrap.servers: ${kafka.bootstraps.server}
# Disable idempotence, because ApsaraMQ for Kafka does not support idempotent or transactional writes.
properties.enable.idempotence: false
# (Optional) Set the mapping between upstream tables and Kafka topics.
sink.tableId-to-topic.mapping: kafka_test.customers:customers;kafka_test.products:productsAn UPDATE statement on the customers table produces a Kafka message with the following body:
// debezium-json
{
"before": {
"id": 4,
"name": "John",
"address": "New York",
"phone_number": "2222",
"age": 12
},
"after": {
"id": 4,
"name": "John",
"address": "New York",
"phone_number": "1234",
"age": 12
},
"op": "u",
"source": {
"db": "kafka_test",
"table": "customers",
"ts_ms": 1728528674000
}
}
// canal-json
{
"old": [
{
"id": 4,
"name": "John",
"address": "New York",
"phone_number": "2222",
"age": 12
}
],
"data": [
{
"id": 4,
"name": "John",
"address": "New York",
"phone_number": "1234",
"age": 12
}
],
"type": "UPDATE",
"database": "kafka_test",
"table": "customers",
"pkNames": [
"id"
],
"ts": 1728528674000,
"es": 0
}ApsaraMQ for Kafka supports
json,canal-json, ordebezium-json(default) formats.Configure the
sink.tableId-to-topic.mappingoption to define how source table identifiers (like MySQL table names) are mapped to destination Kafka topic names. This option allows you to explicitly set custom Kafka topics for your source tables while still preserving the original table name information for traceability. If this option is not configured, Kafka topics will default to adatabase.tablenaming format, meaning data fromkafka_test.customerswould be written to a topic namedkafka_test.customers. For more information, see Kafka connector.By default, all data is directed to partition 0 of a topic. To customize this behavior, utilize the
partition.strategyconfiguration. For example, settingpartition.strategy: hash-by-keyensures that data from each table is distributed across multiple partitions based on the hash of its primary key. This strategy guarantees that all records associated with a specific primary key are consistently written to the same partition, thereby preserving their original order.ApsaraMQ for Kafka does not support idempotence or transactional writes. Therefore, you must add the
properties.enable.idempotence: falseoption in the sink configuration to disable idempotence.
Ingest data from Kafka to DLF in real time
Create another data ingestion job to stream the data from Kafka to Data Lake Formation (DLF).
Consider a Kafka topic inventory containing debezium-json binary log data of two tables, customers and products. The following code synchronizes the data to DLF.
source:
type: kafka
name: Kafka Source
properties.bootstrap.servers: ${kafka.bootstrap.servers}
topic: inventory
scan.startup.mode: earliest-offset
value.format: debezium-json
debezium-json.distributed-tables: true
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (Optional) The username for commits. Set a unique commit user for each job to avoid conflicts.
commit.user: your_job_name
# (Optional) Enable deletion vectors to improve read performance.
table.properties.deletion-vectors.enabled: true
# debezium-json does not include primary key information. You must add primary keys to the tables.
transform:
- source-table: \.*.\.*
projection: \*
primary-keys: idThe Kafka source connector supports reading data in
canal-jsonanddebezium-json(default) formats.Since Debezium JSON format omits primary key information, you must explicitly define it within the transform module:
transform: - source-table: \.*.\.* projection: \* primary-keys: idSet the
debezium-json.distributed-tablesorcanal-json.distributed-tablesoption totrueif data is spread across multiple partitions, or if you need to merge distributed data.Set a schema inference strategy for the Kafka source with the
schema.inference.strategyoption. For more information, see Kafka connector.