Use Flink CDC to write real-time change data from MySQL or Kafka into Data Lake Formation (DLF).
DLF is a fully managed platform that provides unified metadata, data storage, and data management, including metadata management, permission management, and storage optimization. Data ingestion jobs write to DLF through its Paimon Catalog, which lets you ingest entire large-scale databases into the data lake in real time. For more information about DLF, see What is Data Lake Formation?.
Prerequisites
Before you begin, ensure that you have:
-
A DLF instance with the Paimon Catalog enabled
-
The URI of your DLF Rest Catalog Server (format:
http://<region-id>-vpc.dlf.aliyuncs.com, for example,http://cn-hangzhou-vpc.dlf.aliyuncs.com) -
The name of your DLF Catalog warehouse
DLF sink configuration
All examples in this topic use the following Paimon sink parameters to connect to DLF.
|
Parameter |
Required |
Description |
|
|
Yes |
Metastore type. Set to |
|
|
Yes |
Token provider. Set to |
|
|
Yes |
URI of the DLF Rest Catalog Server. Format: |
|
|
Yes |
Name of the DLF Catalog. |
|
|
No |
Set to |
Usage notes
-
Do not add file merging or bucket-related parameters (such as
bucketandnum-sorted-run.compaction-trigger) to the sink configuration. DLF manages file merging automatically, and adding these parameters causes conflicts. -
For MySQL source parameters, see MySQL.
Synchronize an entire MySQL database to DLF
The following CDC YAML job synchronizes an entire MySQL database to DLF. The source section captures all tables matching mysql_test.*, and the sink section uses DLF's Paimon Catalog as the destination.
source:
type: mysql
name: MySQL Source
hostname: ${mysql.hostname}
port: ${mysql.port}
username: ${mysql.username}
password: ${mysql.password}
tables: mysql_test.\.*
server-id: 8601-8604
# (Optional) Synchronize data from tables newly created in the incremental phase.
scan.binlog.newly-added-table.enabled: true
# (Optional) Synchronize table and field comments.
include-comments.enabled: true
# (Optional) Prioritize the distribution of unbounded chunks to prevent potential TaskManager OutOfMemory issues.
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (Optional) Enable parsing filters to accelerate reads.
scan.only.deserialize.captured.tables.changelog.enabled: true
sink:
type: paimon
# The Metastore type. Set to rest.
catalog.properties.metastore: rest
# The token provider. Set to dlf.
catalog.properties.token.provider: dlf
# The URI to access the DLF Rest Catalog Server.
catalog.properties.uri: dlf_uri
# The name of the DLF Catalog.
catalog.properties.warehouse: your_warehouse
# (Optional) Enable deletion vectors to improve read performance.
table.properties.deletion-vectors.enabled: true
Write to partitioned tables in DLF
Source tables in data ingestion jobs don't include partition field information. To write to a partitioned table in DLF, add a transform block and specify partition-keys for each source table. For details on partition-keys syntax, see Flink CDC Data Ingestion Job Development Reference.
source:
type: mysql
name: MySQL Source
hostname: ${mysql.hostname}
port: ${mysql.port}
username: ${mysql.username}
password: ${mysql.password}
tables: mysql_test.\.*
server-id: 8601-8604
# (Optional) Synchronize data from tables newly created in the incremental phase.
scan.binlog.newly-added-table.enabled: true
# (Optional) Synchronize table and field comments.
include-comments.enabled: true
# (Optional) Prioritize the distribution of unbounded chunks to prevent potential TaskManager OutOfMemory issues.
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (Optional) Enable parsing filters to accelerate reads.
scan.only.deserialize.captured.tables.changelog.enabled: true
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (Optional) Enable deletion vectors to improve read performance.
table.properties.deletion-vectors.enabled: true
transform:
- source-table: mysql_test.tbl1
# Set partition fields.
partition-keys: id,pt
- source-table: mysql_test.tbl2
partition-keys: id,pt
Write to append-only tables in DLF
Source tables contain full CDC change data, including delete operations. To convert delete operations to insert operations (soft delete) and write all changes to an append-only table in DLF, add __data_event_type__ to the projection and set converter-after-transform to SOFT_DELETE in the transform block. This ensures every change event — including deletes — is recorded in the destination table as an insert row.
For more information, see Flink CDC data ingestion job development reference.
source:
type: mysql
name: MySQL Source
hostname: ${mysql.hostname}
port: ${mysql.port}
username: ${mysql.username}
password: ${mysql.password}
tables: mysql_test.\.*
server-id: 8601-8604
# (Optional) Synchronize data from tables newly created in the incremental phase.
scan.binlog.newly-added-table.enabled: true
# (Optional) Synchronize table and field comments.
include-comments.enabled: true
# (Optional) Prioritize the distribution of unbounded chunks to prevent potential TaskManager OutOfMemory issues.
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (Optional) Enable parsing filters to accelerate reads.
scan.only.deserialize.captured.tables.changelog.enabled: true
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (Optional) Enable deletion vectors to improve read performance.
table.properties.deletion-vectors.enabled: true
transform:
- source-table: mysql_test.tbl1
# Set partition fields.
partition-keys: id,pt
# Add the change type as a new field, and convert deletes to inserts.
projection: \*, __data_event_type__ AS op_type
converter-after-transform: SOFT_DELETE
- source-table: mysql_test.tbl2
partition-keys: id,pt
projection: \*, __data_event_type__ AS op_type
converter-after-transform: SOFT_DELETE
Synchronize Kafka CDC data to DLF in real time
If MySQL data is already flowing to Kafka — for example through real-time distribution — configure a CDC YAML job to read from Kafka and write to DLF.
The following example reads from the inventory Kafka topic, which stores change data for the customers and products tables in Debezium JSON format, and synchronizes each table to its corresponding destination table in DLF. Because debezium-json messages don't carry primary key information, the transform block manually adds id as the primary key.
source:
type: kafka
name: Kafka Source
properties.bootstrap.servers: ${kafka.bootstrap.servers}
topic: inventory
scan.startup.mode: earliest-offset
value.format: debezium-json
debezium-json.distributed-tables: true
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (Optional) Enable deletion vectors to improve read performance.
table.properties.deletion-vectors.enabled: true
# debezium-json does not include primary key information. Add primary keys using a transform rule.
transform:
- source-table: \.*.\.*
projection: \*
primary-keys: id
Kafka source notes:
-
Supported formats:
canal-json,debezium-json(default), andjson. -
Schema evolution: If a Kafka message schema changes (for example, a new field is added), the change is automatically synchronized to the Paimon table schema.
-
Primary keys with
debezium-json:debezium-jsonmessages don't record primary key information. Add primary keys manually using a transform rule:transform: - source-table: \.*.\.* projection: \* primary-keys: id -
Distributed tables: If a single table's data is spread across multiple Kafka partitions, or to merge tables from different partitions using sharding, set
debezium-json.distributed-tablesorcanal-json.distributed-tablestotrue. -
Schema inference policies: The Kafka source supports multiple schema inference policies, configured with
schema.inference.strategy. For details, see Message Queue for Kafka.