Flink Change Data Capture (CDC) is a data ingestion tool offered by Realtime Compute for Apache Flink. It supports syncing entire databases from sources to your data lakehouse. This topic guides you through using Flink CDC to ingest data to a DLF catalog in real time via Paimon REST.
Prerequisites
You have created a Realtime Compute for Apache Flink workspace. See Create a workspace.
Your Realtime Compute for Apache Flink workspace and DLF catalogs reside in the same region.
You have added the VPC of your Realtime Compute for Apache Flink workspace to DLF's VPC whitelist. See Configure a VPC whitelist.
Engine requirements
Your Realtime Compute for Apache Flink job runs Ververica Runtime (VVR) version 11.1.0 or later.
Create a DLF catalog
See Get started with DLF.
Create and configure a data ingestion job
Create a YAML draft to ingest data using Flink CDC. For more information, see Develop Flink CDC jobs for data ingestion (Beta).
Configure the sink module as follows:
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (Optional) The commit user. Set different commit users for different jobs to avoid conflicts.
commit.user: your_job_name
# (Optional) Enable deletion vectors to improve read performance.
table.properties.deletion-vectors.enabled: trueReplace the placeholder values of config options with your actual values:
Config option | Description | Required? | Example value |
| The metastore type. Set it to | Yes |
|
| The token provider. Set it to | Yes |
|
| The URI to access the DLF REST catalog server. Format: | Yes |
|
| The name of the Paimon catalog. | Yes |
|
Examples
This section offers examples for ingesting data to DLF.
Ingest data from an entire MySQL database to DLF
The following code syncs data from an entire MySQL database to DLF:
source:
type: mysql
name: MySQL Source
hostname: ${mysql.hostname}
port: ${mysql.port}
username: ${mysql.username}
password: ${mysql.password}
tables: mysql_test.\.*
server-id: 8601-8604
# (Optional) Sync data from tables created in the incremental phase.
scan.binlog.newly-added-table.enabled: true
# (Optional) Sync comments.
include-comments.enabled: true
# (Optional) Prioritize the distribution of unbounded shards to prevent potential TM OOM errors.
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (Optional) Enable deserialization filters to accelerate reads.
scan.only.deserialize.captured.tables.changelog.enabled: true
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (Optional) The commit user. Set different commit users for different jobs to avoid conflicts.
commit.user: your_job_name
# (Optional) Enable deletion vectors to improve read performance.
table.properties.deletion-vectors.enabled: trueRecommended config options for the MySQL source module are as follows:
scan.binlog.newly-added-table.enabledSyncs data from tables created during the incremental phase.
include-comments.enabledSyncs table and field comments.
scan.incremental.snapshot.unbounded-chunk-first.enabledPrevents potential TaskManager OOM errors.
scan.only.deserialize.captured.tables.changelog.enabledDeserialize data only from matched tables to speed up reads.
Config options for the Paimon sink module are as follows:
Connection:
catalog.properties.*Table properties:
table.properties.*
Enable
table.properties.deletion-vectors.enabledto greatly speed up reads with minimal impact on writes.DLF provides automatic file compaction. Therefore, do not configure compaction or bucket options, such as
bucketandnum-sorted-run.compaction-trigger
Commit users (
commit.user)
The commit user for data write jobs. Defaults to
admin. Using the defaultadminuser for concurrent data write jobs to the same table can cause commit conflicts and data inconsistencies. It's recommended to assign unique commit users to distinct jobs.
Ingest data to a partitioned table
To ingest data from a non-partitioned table to a partitioned table in DLF, configure the partition-keys option. For more information, see Data Ingestion with Flink CDC.
source:
type: mysql
name: MySQL Source
hostname: ${mysql.hostname}
port: ${mysql.port}
username: ${mysql.username}
password: ${mysql.password}
tables: mysql_test.\.*
server-id: 8601-8604
# (Optional) Sync data from tables created in the incremental phase.
scan.binlog.newly-added-table.enabled: true
# (Optional) Sync table and field comments.
include-comments.enabled: true
# (Optional) Prioritize the distribution of unbounded shards to prevent potential TM OOM.
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (Optional) Enable deserialization filters to accelerate reads.
scan.only.deserialize.captured.tables.changelog.enabled: true
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (Optional) The commit user. Set different commit users for different jobs to avoid conflicts.
commit.user: your_job_name
# (Optional) Enable deletion vectors to improve read performance.
table.properties.deletion-vectors.enabled: true
transform:
- source-table: mysql_test.tbl1
# (Optional) Set partition fields.
partition-keys: id,pt
- source-table: mysql_test.tbl2
partition-keys: id,ptIngest data to an append-only table
Configure SOFT_DELETE to implement logical deletes during data ingestion:
source:
type: mysql
name: MySQL Source
hostname: ${mysql.hostname}
port: ${mysql.port}
username: ${mysql.username}
password: ${mysql.password}
tables: mysql_test.\.*
server-id: 8601-8604
# (Optional) Sync data from tables that are newly created in the incremental phase.
scan.binlog.newly-added-table.enabled: true
# (Optional) Sync table and field comments.
include-comments.enabled: true
# (Optional) Prioritize the distribution of unbounded shards to prevent potential TaskManager OutOfMemory errors.
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (Optional) Enable deserialization filters to accelerate reads.
scan.only.deserialize.captured.tables.changelog.enabled: true
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (Optional) The commit user. Set different commit users for different jobs to avoid conflicts.
commit.user: your_job_name
# (Optional) Enable deletion vectors to improve read performance.
table.properties.deletion-vectors.enabled: true
transform:
- source-table: mysql_test.tbl1
# (Optional) Set partition fields.
partition-keys: id,pt
# (Optional) Implement soft delete.
projection: \*, __data_event_type__ AS op_type
converter-after-transform: SOFT_DELETE
- source-table: mysql_test.tbl2
# (Optional) Set partition fields.
partition-keys: id,pt
# (Optional) Implement soft delete.
projection: \*, __data_event_type__ AS op_type
converter-after-transform: SOFT_DELETEBy adding
`__data_event_type__`in theprojection, you can write the change type as a new column to the downstream table. Settingconverter-after-transformtoSOFT_DELETEconverts delete operations to insert operations. This allows the downstream table to completely record all change operations. For more information, see Data Ingestion with Flink CDC.
Sync data from Kafka to DLF in real time
Assume that the Kafka inventory topic has two tables, customers and products, and the data format is Debezium JSON. The code below syncs data from these two tables to their corresponding destination tables in DLF:
source:
type: kafka
name: Kafka Source
properties.bootstrap.servers: ${kafka.bootstrap.servers}
topic: inventory
scan.startup.mode: earliest-offset
value.format: debezium-json
debezium-json.distributed-tables: true
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (Optional) The commit user. Set different commit users for different jobs to avoid conflicts.
commit.user: your_job_name
# (Optional) Enable deletion vectors to improve read performance.
table.properties.deletion-vectors.enabled: true
# Debezium JSON does not contain primary key info, so explicitly specify primary keys.
transform:
- source-table: \.*.\.*
projection: \*
primary-keys: idThe Kafka source supports three data formats:
canal-json,debezium-json(default), andjson.When the data format is
debezium-json, you must explicitly specify the primary key in the transform module. This is because Debezium JSON messages do not contain primary key information.transform: - source-table: \.*.\.* projection: \* primary-keys: idWhen ingesting data from multiple partitions to a single table in DLF, set the
debezium-json.distributed-tablesorcanal-json.distributed-tablesconfig item totrue.The Kafka data source supports multiple schema inference policies. You can set the policy using the
schema.inference.strategyconfig option. For more information, see Message Queue for Apache Kafka.
For more information, see Data Ingestion with Flink CDC.