Flink Change Data Capture (CDC) is a data ingestion tool offered by Realtime Compute for Apache Flink. It supports syncing entire databases from sources to your data lakehouse. This topic guides you through using Flink CDC to ingest data to a DLF catalog in real time via Paimon REST.
By the end of this topic, you will have:
-
Created a DLF catalog as the ingestion destination
-
Configured a Flink CDC YAML job with the Paimon sink pointing to DLF
-
Run examples for common ingestion scenarios: full database sync, partitioned tables, append-only tables, and Kafka sources
Prerequisites
Before you begin, make sure you have:
-
A Realtime Compute for Apache Flink workspace. See Create a workspace.
-
The Realtime Compute for Apache Flink workspace and DLF catalogs in the same region.
-
The VPC of your Realtime Compute for Apache Flink workspace added to DLF's VPC whitelist. See Configure a VPC whitelist.
Engine requirements
Your Realtime Compute for Apache Flink job must run Ververica Runtime (VVR) version 11.1.0 or later.
Create a DLF catalog
See Get started with DLF.
Create and configure a data ingestion job
-
Create a YAML draft to ingest data using Flink CDC. For more information, see Develop Flink CDC jobs for data ingestion (Beta).
-
Configure the sink module:
sink: type: paimon catalog.properties.metastore: rest catalog.properties.uri: dlf_uri catalog.properties.warehouse: your_warehouse catalog.properties.token.provider: dlf # (Optional) The commit user. Set different commit users for different jobs to avoid conflicts. commit.user: your_job_name # (Optional) Enable deletion vectors to improve read performance. table.properties.deletion-vectors.enabled: trueReplace the placeholder values with your actual values:
Config option Description Required Default Example value catalog.properties.metastoreThe metastore type. Set it to rest.Yes — restcatalog.properties.token.providerThe token provider. Set it to dlf.Yes — dlfcatalog.properties.uriThe URI to access the DLF REST catalog server. Format: http://[region-id]-vpc.dlf.aliyuncs.com. For region IDs, see Regions and endpoints.Yes — http://ap-southeast-1-vpc.dlf.aliyuncs.comcatalog.properties.warehouseThe name of the Paimon catalog. Yes — dlf_testcommit.userThe commit user for data writes. Assign unique commit users to distinct jobs to avoid conflicts. No adminyour_job_nametable.properties.deletion-vectors.enabledEnables deletion vectors to improve read performance with minimal impact on writes. No — true
Usage notes
Before running a job, be aware of the following constraints:
-
Commit user conflicts: The default commit user is
admin. Running concurrent data write jobs to the same table under the same commit user causes commit conflicts and data inconsistencies. Assign a uniquecommit.userto each job. -
No compaction or bucket options: DLF provides automatic file compaction. Do not configure compaction or bucket options such as
bucketandnum-sorted-run.compaction-trigger. -
Deletion vectors: Set
table.properties.deletion-vectors.enabled: trueto greatly speed up reads with minimal impact on writes.
Examples
Ingest data from an entire MySQL database to DLF
The following job syncs all tables in a MySQL database to DLF:
source:
type: mysql
name: MySQL Source
hostname: ${mysql.hostname}
port: ${mysql.port}
username: ${mysql.username}
password: ${mysql.password}
tables: mysql_test.\.*
server-id: 8601-8604
# (Optional) Sync data from tables created in the incremental phase.
scan.binlog.newly-added-table.enabled: true
# (Optional) Sync table and field comments.
include-comments.enabled: true
# (Optional) Prioritize unbounded shards to prevent potential TaskManager out-of-memory (OOM) errors.
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (Optional) Deserialize data only from matched tables to speed up reads.
scan.only.deserialize.captured.tables.changelog.enabled: true
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (Optional) The commit user. Set different commit users for different jobs to avoid conflicts.
commit.user: your_job_name
# (Optional) Enable deletion vectors to improve read performance.
table.properties.deletion-vectors.enabled: true
The recommended MySQL source options are:
| Option | Description |
|---|---|
scan.binlog.newly-added-table.enabled |
Syncs data from tables created during the incremental phase. |
include-comments.enabled |
Syncs table and field comments. |
scan.incremental.snapshot.unbounded-chunk-first.enabled |
Prevents potential TaskManager OOM errors by prioritizing unbounded shards. |
scan.only.deserialize.captured.tables.changelog.enabled |
Deserializes data only from matched tables to speed up reads. |
Ingest data to a partitioned table
To ingest from a non-partitioned source table to a partitioned table in DLF, add the partition-keys option to the transform module. For more information, see Data ingestion with Flink CDC.
source:
type: mysql
name: MySQL Source
hostname: ${mysql.hostname}
port: ${mysql.port}
username: ${mysql.username}
password: ${mysql.password}
tables: mysql_test.\.*
server-id: 8601-8604
# (Optional) Sync data from tables created in the incremental phase.
scan.binlog.newly-added-table.enabled: true
# (Optional) Sync table and field comments.
include-comments.enabled: true
# (Optional) Prioritize unbounded shards to prevent potential TaskManager out-of-memory (OOM) errors.
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (Optional) Deserialize data only from matched tables to speed up reads.
scan.only.deserialize.captured.tables.changelog.enabled: true
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (Optional) The commit user. Set different commit users for different jobs to avoid conflicts.
commit.user: your_job_name
# (Optional) Enable deletion vectors to improve read performance.
table.properties.deletion-vectors.enabled: true
transform:
- source-table: mysql_test.tbl1
partition-keys: id,pt
- source-table: mysql_test.tbl2
partition-keys: id,pt
Ingest data to an append-only table
To implement logical deletes (soft delete) during ingestion, use converter-after-transform: SOFT_DELETE in the transform module. This converts delete operations to insert operations, so the downstream table records all change operations completely. The __data_event_type__ field in projection writes the change type as a new column to the downstream table.
source:
type: mysql
name: MySQL Source
hostname: ${mysql.hostname}
port: ${mysql.port}
username: ${mysql.username}
password: ${mysql.password}
tables: mysql_test.\.*
server-id: 8601-8604
# (Optional) Sync data from tables created in the incremental phase.
scan.binlog.newly-added-table.enabled: true
# (Optional) Sync table and field comments.
include-comments.enabled: true
# (Optional) Prioritize unbounded shards to prevent potential TaskManager out-of-memory (OOM) errors.
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (Optional) Deserialize data only from matched tables to speed up reads.
scan.only.deserialize.captured.tables.changelog.enabled: true
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (Optional) The commit user. Set different commit users for different jobs to avoid conflicts.
commit.user: your_job_name
# (Optional) Enable deletion vectors to improve read performance.
table.properties.deletion-vectors.enabled: true
transform:
- source-table: mysql_test.tbl1
partition-keys: id,pt
projection: \*, __data_event_type__ AS op_type
converter-after-transform: SOFT_DELETE
- source-table: mysql_test.tbl2
partition-keys: id,pt
projection: \*, __data_event_type__ AS op_type
converter-after-transform: SOFT_DELETE
For more information, see Data ingestion with Flink CDC.
Sync data from Kafka to DLF in real time
The following job reads CDC data from the Kafka inventory topic (tables customers and products in Debezium JSON format) and syncs it to the corresponding destination tables in DLF. Because Debezium JSON messages do not contain primary key information, the primary key is explicitly specified in the transform module.
source:
type: kafka
name: Kafka Source
properties.bootstrap.servers: ${kafka.bootstrap.servers}
topic: inventory
scan.startup.mode: earliest-offset
value.format: debezium-json
debezium-json.distributed-tables: true
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (Optional) The commit user. Set different commit users for different jobs to avoid conflicts.
commit.user: your_job_name
# (Optional) Enable deletion vectors to improve read performance.
table.properties.deletion-vectors.enabled: true
# Debezium JSON does not contain primary key info, so explicitly specify primary keys.
transform:
- source-table: \.*.\.*
projection: \*
primary-keys: id
Additional notes for Kafka sources:
-
The Kafka source supports three data formats:
canal-json,debezium-json(default), andjson. -
When ingesting from multiple Kafka partitions to a single table in DLF, set
debezium-json.distributed-tablesorcanal-json.distributed-tablestotrue. -
The Kafka source supports multiple schema inference policies via the
schema.inference.strategyoption. For more information, see Message Queue for Apache Kafka.
For more information, see Data ingestion with Flink CDC.