All Products
Search
Document Center

Realtime Compute for Apache Flink:Ingest data into a DLF data lake in real time using Flink CDC

Last Updated:Nov 20, 2025

This topic describes best practices for ingesting real-time data into Alibaba Cloud Data Lake Formation (DLF) using Flink CDC.

DLF is a fully managed platform that unifies data and metadata management and storage. It also features permission management and storage optimization.

You can create a data ingestion task that uses Flink CDC to load data from an entire database to a Paimon catalog hosted in DLF.

Ingest an entire database from MySQL to DLF

Write code in YAML to ingest data from MySQL to DLF:

source:
  type: mysql
  name: MySQL Source
  hostname: ${mysql.hostname}
  port: ${mysql.port}
  username: ${mysql.username}
  password: ${mysql.password}
  tables: mysql_test.\.*
  server-id: 8601-8604
  # (Optional) Sync data from new tables created during incremental reading.
  scan.binlog.newly-added-table.enabled: true
  # (Optional) Sync table and field comments.
  include-comments.enabled: true
  # (Optional) Prioritize the distribution of unbounded shards to prevent TM OOM errors.
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  # (Optional) Deserialize only change events to accelerate data reads.
  scan.only.deserialize.captured.tables.changelog.enabled: true

sink:
  type: paimon
  # The metastore type, fixed to rest.
  catalog.properties.metastore: rest
  # The token provider, fixed to dlf.
  catalog.properties.token.provider: dlf
  # URI used to access the DLF Rest Catalog Server. Format: http://[region-id]-vpc.dlf.aliyuncs.com. Example: http://ap-southeast-1-vpc.dlf.aliyuncs.com.
  catalog.properties.uri: dlf_uri
  # The catalog name.
  catalog.properties.warehouse: your_warehouse
  # (Optional) Enable deletion vectors to improve read performance.
  table.properties.deletion-vectors.enabled: true
Note
  • For more information about MySQL source configurations, see MySQL.

  • To achieve both near-real-time writes and rapid reads, enable the table.properties.deletion-vectors.enabled option. It greatly improves read performance with minimal impact on writes.

  • DLF features automatic compaction, so do not add table properties related to file compaction (num-sorted-run.compaction-trigger) and buckets (bucket).

Ingest data to a DLF partitioned table

To load data from a non-partitioned table to a DLF partitioned table, use the partition-keys option in the transform module. Example code:

source:
  type: mysql
  name: MySQL Source
  hostname: ${mysql.hostname}
  port: ${mysql.port}
  username: ${mysql.username}
  password: ${mysql.password}
  tables: mysql_test.\.*
  server-id: 8601-8604
  # (Optional) Sync data from new tables created during incremental reading.
  scan.binlog.newly-added-table.enabled: true
  # (Optional) Sync table and field comments.
  include-comments.enabled: true
  # (Optional) Prioritize the distribution of unbounded shards to prevent TM OOM.
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  # (Optional) Deserialize only change events to accelerate data reads.
  scan.only.deserialize.captured.tables.changelog.enabled: true

sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  # (Optional) Enable deletion vectors to improve read performance.
  table.properties.deletion-vectors.enabled: true

transform:
  - source-table: mysql_test.tbl1
    # (Optional) Set partition fields.  
    partition-keys: id,pt
  - source-table: mysql_test.tbl2
    partition-keys: id,pt

Ingest data to a DLF append-only table

Ingest MySQL data to DLF and implement soft deletes for data removed from the source.

source:
  type: mysql
  name: MySQL Source
  hostname: ${mysql.hostname}
  port: ${mysql.port}
  username: ${mysql.username}
  password: ${mysql.password}
  tables: mysql_test.\.*
  server-id: 8601-8604
  # (Optional) Sync data from new tables created during incremental reading.
  scan.binlog.newly-added-table.enabled: true
  # (Optional) Sync table and field comments.
  include-comments.enabled: true
  # (Optional) Prioritize the distribution of unbounded shards to prevent TM OOM.
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  # (Optional) Deserialize only change events to accelerate data reads.
  scan.only.deserialize.captured.tables.changelog.enabled: true

sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  # (Optional) Enable deletion vectors to improve read performance.
  table.properties.deletion-vectors.enabled: true
  
transform:
  - source-table: mysql_test.tbl1
    # (Optional) Set partition fields.
    partition-keys: id,pt
    # (Optional) Implement soft deletes.
    projection: \*, __data_event_type__ AS op_type
    converter-after-transform: SOFT_DELETE
  - source-table: mysql_test.tbl2
    # (Optional) Set partition fields.
    partition-keys: id,pt
    # (Optional) Implement soft deletes.
    projection: \*, __data_event_type__ AS op_type
    converter-after-transform: SOFT_DELETE
Note
  • Specifying __data_event_type__ in projection adds the event type as a new column to the sink table. Setting converter-after-transform to SOFT_DELETE converts deletions into insertions. This ensures all change operations are completely recorded in the downstream system.

Ingest data from Kafka to DLF in real time

After syncing data from MySQL to Kafka in real time using Flink CDC, you can further load data to DLF for storage.

Ingest data from the Kafka inventory topic (which has two tables, customers and products):

source:
  type: kafka
  name: Kafka Source
  properties.bootstrap.servers: ${kafka.bootstrap.servers}
  topic: inventory
  scan.startup.mode: earliest-offset
  value.format: debezium-json
  debezium-json.distributed-tables: true

sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  # (Optional) Enable deletion vectors to improve read performance.
  table.properties.deletion-vectors.enabled: true

# Debezium JSON does not contain primary key information. You need to add primary keys to the table.  
transform:
  - source-table: \.*.\.*
    projection: \*
    primary-keys: id
Note
  • Valid values of Kafka source's value.format: canal-json, debezium-json (default), and json.

  • Schema evolution is supported from Kafka to Paimon, providing greater flexibility for data replication.

  • When the value format is debezium-json, you must explicitly specify the primary key in the transform module:

    transform:
      - source-table: \.*.\.*
        projection: \*
        primary-keys: id
  • To ingest a table with data distributed across shards, set debezium-json.distributed-tables or canal-json.distributed-tables to true.

  • The Kafka source supports multiple schema inference policies. You can set a policy using the schema.inference.strategy option. For more information, see Kafka connector.