This topic describes best practices for ingesting real-time data into Alibaba Cloud Data Lake Formation (DLF) using Flink CDC.
DLF is a fully managed platform that unifies data and metadata management and storage. It also features permission management and storage optimization.
You can create a data ingestion task that uses Flink CDC to load data from an entire database to a Paimon catalog hosted in DLF.
Ingest an entire database from MySQL to DLF
Write code in YAML to ingest data from MySQL to DLF:
source:
type: mysql
name: MySQL Source
hostname: ${mysql.hostname}
port: ${mysql.port}
username: ${mysql.username}
password: ${mysql.password}
tables: mysql_test.\.*
server-id: 8601-8604
# (Optional) Sync data from new tables created during incremental reading.
scan.binlog.newly-added-table.enabled: true
# (Optional) Sync table and field comments.
include-comments.enabled: true
# (Optional) Prioritize the distribution of unbounded shards to prevent TM OOM errors.
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (Optional) Deserialize only change events to accelerate data reads.
scan.only.deserialize.captured.tables.changelog.enabled: true
sink:
type: paimon
# The metastore type, fixed to rest.
catalog.properties.metastore: rest
# The token provider, fixed to dlf.
catalog.properties.token.provider: dlf
# URI used to access the DLF Rest Catalog Server. Format: http://[region-id]-vpc.dlf.aliyuncs.com. Example: http://ap-southeast-1-vpc.dlf.aliyuncs.com.
catalog.properties.uri: dlf_uri
# The catalog name.
catalog.properties.warehouse: your_warehouse
# (Optional) Enable deletion vectors to improve read performance.
table.properties.deletion-vectors.enabled: trueFor more information about MySQL source configurations, see MySQL.
To achieve both near-real-time writes and rapid reads, enable the
table.properties.deletion-vectors.enabledoption. It greatly improves read performance with minimal impact on writes.DLF features automatic compaction, so do not add table properties related to file compaction (
num-sorted-run.compaction-trigger) and buckets (bucket).
Ingest data to a DLF partitioned table
To load data from a non-partitioned table to a DLF partitioned table, use the partition-keys option in the transform module. Example code:
source:
type: mysql
name: MySQL Source
hostname: ${mysql.hostname}
port: ${mysql.port}
username: ${mysql.username}
password: ${mysql.password}
tables: mysql_test.\.*
server-id: 8601-8604
# (Optional) Sync data from new tables created during incremental reading.
scan.binlog.newly-added-table.enabled: true
# (Optional) Sync table and field comments.
include-comments.enabled: true
# (Optional) Prioritize the distribution of unbounded shards to prevent TM OOM.
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (Optional) Deserialize only change events to accelerate data reads.
scan.only.deserialize.captured.tables.changelog.enabled: true
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (Optional) Enable deletion vectors to improve read performance.
table.properties.deletion-vectors.enabled: true
transform:
- source-table: mysql_test.tbl1
# (Optional) Set partition fields.
partition-keys: id,pt
- source-table: mysql_test.tbl2
partition-keys: id,ptIngest data to a DLF append-only table
Ingest MySQL data to DLF and implement soft deletes for data removed from the source.
source:
type: mysql
name: MySQL Source
hostname: ${mysql.hostname}
port: ${mysql.port}
username: ${mysql.username}
password: ${mysql.password}
tables: mysql_test.\.*
server-id: 8601-8604
# (Optional) Sync data from new tables created during incremental reading.
scan.binlog.newly-added-table.enabled: true
# (Optional) Sync table and field comments.
include-comments.enabled: true
# (Optional) Prioritize the distribution of unbounded shards to prevent TM OOM.
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (Optional) Deserialize only change events to accelerate data reads.
scan.only.deserialize.captured.tables.changelog.enabled: true
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (Optional) Enable deletion vectors to improve read performance.
table.properties.deletion-vectors.enabled: true
transform:
- source-table: mysql_test.tbl1
# (Optional) Set partition fields.
partition-keys: id,pt
# (Optional) Implement soft deletes.
projection: \*, __data_event_type__ AS op_type
converter-after-transform: SOFT_DELETE
- source-table: mysql_test.tbl2
# (Optional) Set partition fields.
partition-keys: id,pt
# (Optional) Implement soft deletes.
projection: \*, __data_event_type__ AS op_type
converter-after-transform: SOFT_DELETESpecifying
__data_event_type__inprojectionadds the event type as a new column to the sink table. Settingconverter-after-transformtoSOFT_DELETEconverts deletions into insertions. This ensures all change operations are completely recorded in the downstream system.
Ingest data from Kafka to DLF in real time
After syncing data from MySQL to Kafka in real time using Flink CDC, you can further load data to DLF for storage.
Ingest data from the Kafka inventory topic (which has two tables, customers and products):
source:
type: kafka
name: Kafka Source
properties.bootstrap.servers: ${kafka.bootstrap.servers}
topic: inventory
scan.startup.mode: earliest-offset
value.format: debezium-json
debezium-json.distributed-tables: true
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (Optional) Enable deletion vectors to improve read performance.
table.properties.deletion-vectors.enabled: true
# Debezium JSON does not contain primary key information. You need to add primary keys to the table.
transform:
- source-table: \.*.\.*
projection: \*
primary-keys: idValid values of Kafka source's
value.format: canal-json, debezium-json (default), and json.Schema evolution is supported from Kafka to Paimon, providing greater flexibility for data replication.
When the value format is debezium-json, you must explicitly specify the primary key in the transform module:
transform: - source-table: \.*.\.* projection: \* primary-keys: idTo ingest a table with data distributed across shards, set debezium-json.distributed-tables or canal-json.distributed-tables to
true.The Kafka source supports multiple schema inference policies. You can set a policy using the schema.inference.strategy option. For more information, see Kafka connector.