All Products
Search
Document Center

Data Lake Formation:Ingest data to DLF using Flink CDC

Last Updated:Nov 25, 2025

Flink Change Data Capture (CDC) is a data ingestion tool offered by Realtime Compute for Apache Flink. It supports syncing entire databases from sources to your data lakehouse. This topic guides you through using Flink CDC to ingest data to a DLF catalog in real time via Paimon REST.

Prerequisites

  • You have created a Realtime Compute for Apache Flink workspace. See Create a workspace.

  • Your Realtime Compute for Apache Flink workspace and DLF catalogs reside in the same region.

  • You have added the VPC of your Realtime Compute for Apache Flink workspace to DLF's VPC whitelist. See Configure a VPC whitelist.

Engine requirements

Your Realtime Compute for Apache Flink job runs Ververica Runtime (VVR) version 11.1.0 or later.

Create a DLF catalog

See Get started with DLF.

Create and configure a data ingestion job

  1. Create a YAML draft to ingest data using Flink CDC. For more information, see Develop Flink CDC jobs for data ingestion (Beta).

  2. Configure the sink module as follows:

sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  # (Optional) The commit user. Set different commit users for different jobs to avoid conflicts.
  commit.user: your_job_name
  # (Optional) Enable deletion vectors to improve read performance.
  table.properties.deletion-vectors.enabled: true

Replace the placeholder values of config options with your actual values:

Config option

Description

Required?

Example value

catalog.properties.metastore

The metastore type. Set it to rest.

Yes

rest

catalog.properties.token.provider

The token provider. Set it to dlf.

Yes

dlf

catalog.properties.uri

The URI to access the DLF REST catalog server. Format: http://[region-id]-vpc.dlf.aliyuncs.com. For more information about Region IDs, see Regions and endpoints.

Yes

http://ap-southeast-1-vpc.dlf.aliyuncs.com

catalog.properties.warehouse

The name of the Paimon catalog.

Yes

dlf_test

Examples

This section offers examples for ingesting data to DLF.

Ingest data from an entire MySQL database to DLF

The following code syncs data from an entire MySQL database to DLF:

source:
  type: mysql
  name: MySQL Source
  hostname: ${mysql.hostname}
  port: ${mysql.port}
  username: ${mysql.username}
  password: ${mysql.password}
  tables: mysql_test.\.*
  server-id: 8601-8604
  # (Optional) Sync data from tables created in the incremental phase.
  scan.binlog.newly-added-table.enabled: true
  # (Optional) Sync comments.
  include-comments.enabled: true
  # (Optional) Prioritize the distribution of unbounded shards to prevent potential TM OOM errors.
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  # (Optional) Enable deserialization filters to accelerate reads.
  scan.only.deserialize.captured.tables.changelog.enabled: true

sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
   # (Optional) The commit user. Set different commit users for different jobs to avoid conflicts.
  commit.user: your_job_name
  # (Optional) Enable deletion vectors to improve read performance.
  table.properties.deletion-vectors.enabled: true
Note

Recommended config options for the MySQL source module are as follows:

  1. scan.binlog.newly-added-table.enabled

    Syncs data from tables created during the incremental phase.

  2. include-comments.enabled

    Syncs table and field comments.

  3. scan.incremental.snapshot.unbounded-chunk-first.enabled

    Prevents potential TaskManager OOM errors.

  4. scan.only.deserialize.captured.tables.changelog.enabled

    Deserialize data only from matched tables to speed up reads.

Note

Config options for the Paimon sink module are as follows:

  1. Connection: catalog.properties.*

  2. Table properties: table.properties.*

  • Enable table.properties.deletion-vectors.enabled to greatly speed up reads with minimal impact on writes.

  • DLF provides automatic file compaction. Therefore, do not configure compaction or bucket options, such as bucket and num-sorted-run.compaction-trigger

  1. Commit users (commit.user)

  • The commit user for data write jobs. Defaults to admin. Using the default admin user for concurrent data write jobs to the same table can cause commit conflicts and data inconsistencies. It's recommended to assign unique commit users to distinct jobs.

Ingest data to a partitioned table

To ingest data from a non-partitioned table to a partitioned table in DLF, configure the partition-keys option. For more information, see Data Ingestion with Flink CDC.

source:
  type: mysql
  name: MySQL Source
  hostname: ${mysql.hostname}
  port: ${mysql.port}
  username: ${mysql.username}
  password: ${mysql.password}
  tables: mysql_test.\.*
  server-id: 8601-8604
  # (Optional) Sync data from tables created in the incremental phase.
  scan.binlog.newly-added-table.enabled: true
  # (Optional) Sync table and field comments.
  include-comments.enabled: true
  # (Optional) Prioritize the distribution of unbounded shards to prevent potential TM OOM.
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  # (Optional) Enable deserialization filters to accelerate reads.
  scan.only.deserialize.captured.tables.changelog.enabled: true

sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  # (Optional) The commit user. Set different commit users for different jobs to avoid conflicts.
  commit.user: your_job_name
  # (Optional) Enable deletion vectors to improve read performance.
  table.properties.deletion-vectors.enabled: true

transform:
  - source-table: mysql_test.tbl1
    # (Optional) Set partition fields.  
    partition-keys: id,pt
  - source-table: mysql_test.tbl2
    partition-keys: id,pt

Ingest data to an append-only table

Configure SOFT_DELETE to implement logical deletes during data ingestion:

source:
  type: mysql
  name: MySQL Source
  hostname: ${mysql.hostname}
  port: ${mysql.port}
  username: ${mysql.username}
  password: ${mysql.password}
  tables: mysql_test.\.*
  server-id: 8601-8604
  # (Optional) Sync data from tables that are newly created in the incremental phase.
  scan.binlog.newly-added-table.enabled: true
  # (Optional) Sync table and field comments.
  include-comments.enabled: true
  # (Optional) Prioritize the distribution of unbounded shards to prevent potential TaskManager OutOfMemory errors.
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  # (Optional) Enable deserialization filters to accelerate reads.
  scan.only.deserialize.captured.tables.changelog.enabled: true

sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  # (Optional) The commit user. Set different commit users for different jobs to avoid conflicts.
  commit.user: your_job_name
  # (Optional) Enable deletion vectors to improve read performance.
  table.properties.deletion-vectors.enabled: true
  
transform:
  - source-table: mysql_test.tbl1
    # (Optional) Set partition fields.
    partition-keys: id,pt
    # (Optional) Implement soft delete.
    projection: \*, __data_event_type__ AS op_type
    converter-after-transform: SOFT_DELETE
  - source-table: mysql_test.tbl2
    # (Optional) Set partition fields.
    partition-keys: id,pt
    # (Optional) Implement soft delete.
    projection: \*, __data_event_type__ AS op_type
    converter-after-transform: SOFT_DELETE
Note
  • By adding `__data_event_type__` in the projection, you can write the change type as a new column to the downstream table. Setting converter-after-transform to SOFT_DELETE converts delete operations to insert operations. This allows the downstream table to completely record all change operations. For more information, see Data Ingestion with Flink CDC.

Sync data from Kafka to DLF in real time

Assume that the Kafka inventory topic has two tables, customers and products, and the data format is Debezium JSON. The code below syncs data from these two tables to their corresponding destination tables in DLF:

source:
  type: kafka
  name: Kafka Source
  properties.bootstrap.servers: ${kafka.bootstrap.servers}
  topic: inventory
  scan.startup.mode: earliest-offset
  value.format: debezium-json
  debezium-json.distributed-tables: true

sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  # (Optional) The commit user. Set different commit users for different jobs to avoid conflicts.
  commit.user: your_job_name
  # (Optional) Enable deletion vectors to improve read performance.
  table.properties.deletion-vectors.enabled: true

# Debezium JSON does not contain primary key info, so explicitly specify primary keys.  
transform:
  - source-table: \.*.\.*
    projection: \*
    primary-keys: id
Note
  • The Kafka source supports three data formats: canal-json, debezium-json (default), and json.

  • When the data format is debezium-json, you must explicitly specify the primary key in the transform module. This is because Debezium JSON messages do not contain primary key information.

    transform:
      - source-table: \.*.\.*
        projection: \*
        primary-keys: id
  • When ingesting data from multiple partitions to a single table in DLF, set the debezium-json.distributed-tables or canal-json.distributed-tables config item to true.

  • The Kafka data source supports multiple schema inference policies. You can set the policy using the schema.inference.strategy config option. For more information, see Message Queue for Apache Kafka.

For more information, see Data Ingestion with Flink CDC.