All Products
Search
Document Center

Data Lake Formation:Access DLF with Flink CDC

Last Updated:Mar 26, 2026

Flink Change Data Capture (CDC) is a data ingestion tool offered by Realtime Compute for Apache Flink. It supports syncing entire databases from sources to your data lakehouse. This topic guides you through using Flink CDC to ingest data to a DLF catalog in real time via Paimon REST.

By the end of this topic, you will have:

  • Created a DLF catalog as the ingestion destination

  • Configured a Flink CDC YAML job with the Paimon sink pointing to DLF

  • Run examples for common ingestion scenarios: full database sync, partitioned tables, append-only tables, and Kafka sources

Prerequisites

Before you begin, make sure you have:

  • A Realtime Compute for Apache Flink workspace. See Create a workspace.

  • The Realtime Compute for Apache Flink workspace and DLF catalogs in the same region.

  • The VPC of your Realtime Compute for Apache Flink workspace added to DLF's VPC whitelist. See Configure a VPC whitelist.

Engine requirements

Your Realtime Compute for Apache Flink job must run Ververica Runtime (VVR) version 11.1.0 or later.

Create a DLF catalog

See Get started with DLF.

Create and configure a data ingestion job

  1. Create a YAML draft to ingest data using Flink CDC. For more information, see Develop Flink CDC jobs for data ingestion (Beta).

  2. Configure the sink module:

    sink:
      type: paimon
      catalog.properties.metastore: rest
      catalog.properties.uri: dlf_uri
      catalog.properties.warehouse: your_warehouse
      catalog.properties.token.provider: dlf
      # (Optional) The commit user. Set different commit users for different jobs to avoid conflicts.
      commit.user: your_job_name
      # (Optional) Enable deletion vectors to improve read performance.
      table.properties.deletion-vectors.enabled: true

    Replace the placeholder values with your actual values:

    Config option Description Required Default Example value
    catalog.properties.metastore The metastore type. Set it to rest. Yes rest
    catalog.properties.token.provider The token provider. Set it to dlf. Yes dlf
    catalog.properties.uri The URI to access the DLF REST catalog server. Format: http://[region-id]-vpc.dlf.aliyuncs.com. For region IDs, see Regions and endpoints. Yes http://ap-southeast-1-vpc.dlf.aliyuncs.com
    catalog.properties.warehouse The name of the Paimon catalog. Yes dlf_test
    commit.user The commit user for data writes. Assign unique commit users to distinct jobs to avoid conflicts. No admin your_job_name
    table.properties.deletion-vectors.enabled Enables deletion vectors to improve read performance with minimal impact on writes. No true

Usage notes

Before running a job, be aware of the following constraints:

  • Commit user conflicts: The default commit user is admin. Running concurrent data write jobs to the same table under the same commit user causes commit conflicts and data inconsistencies. Assign a unique commit.user to each job.

  • No compaction or bucket options: DLF provides automatic file compaction. Do not configure compaction or bucket options such as bucket and num-sorted-run.compaction-trigger.

  • Deletion vectors: Set table.properties.deletion-vectors.enabled: true to greatly speed up reads with minimal impact on writes.

Examples

Ingest data from an entire MySQL database to DLF

The following job syncs all tables in a MySQL database to DLF:

source:
  type: mysql
  name: MySQL Source
  hostname: ${mysql.hostname}
  port: ${mysql.port}
  username: ${mysql.username}
  password: ${mysql.password}
  tables: mysql_test.\.*
  server-id: 8601-8604
  # (Optional) Sync data from tables created in the incremental phase.
  scan.binlog.newly-added-table.enabled: true
  # (Optional) Sync table and field comments.
  include-comments.enabled: true
  # (Optional) Prioritize unbounded shards to prevent potential TaskManager out-of-memory (OOM) errors.
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  # (Optional) Deserialize data only from matched tables to speed up reads.
  scan.only.deserialize.captured.tables.changelog.enabled: true

sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  # (Optional) The commit user. Set different commit users for different jobs to avoid conflicts.
  commit.user: your_job_name
  # (Optional) Enable deletion vectors to improve read performance.
  table.properties.deletion-vectors.enabled: true

The recommended MySQL source options are:

Option Description
scan.binlog.newly-added-table.enabled Syncs data from tables created during the incremental phase.
include-comments.enabled Syncs table and field comments.
scan.incremental.snapshot.unbounded-chunk-first.enabled Prevents potential TaskManager OOM errors by prioritizing unbounded shards.
scan.only.deserialize.captured.tables.changelog.enabled Deserializes data only from matched tables to speed up reads.

Ingest data to a partitioned table

To ingest from a non-partitioned source table to a partitioned table in DLF, add the partition-keys option to the transform module. For more information, see Data ingestion with Flink CDC.

source:
  type: mysql
  name: MySQL Source
  hostname: ${mysql.hostname}
  port: ${mysql.port}
  username: ${mysql.username}
  password: ${mysql.password}
  tables: mysql_test.\.*
  server-id: 8601-8604
  # (Optional) Sync data from tables created in the incremental phase.
  scan.binlog.newly-added-table.enabled: true
  # (Optional) Sync table and field comments.
  include-comments.enabled: true
  # (Optional) Prioritize unbounded shards to prevent potential TaskManager out-of-memory (OOM) errors.
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  # (Optional) Deserialize data only from matched tables to speed up reads.
  scan.only.deserialize.captured.tables.changelog.enabled: true

sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  # (Optional) The commit user. Set different commit users for different jobs to avoid conflicts.
  commit.user: your_job_name
  # (Optional) Enable deletion vectors to improve read performance.
  table.properties.deletion-vectors.enabled: true

transform:
  - source-table: mysql_test.tbl1
    partition-keys: id,pt
  - source-table: mysql_test.tbl2
    partition-keys: id,pt

Ingest data to an append-only table

To implement logical deletes (soft delete) during ingestion, use converter-after-transform: SOFT_DELETE in the transform module. This converts delete operations to insert operations, so the downstream table records all change operations completely. The __data_event_type__ field in projection writes the change type as a new column to the downstream table.

source:
  type: mysql
  name: MySQL Source
  hostname: ${mysql.hostname}
  port: ${mysql.port}
  username: ${mysql.username}
  password: ${mysql.password}
  tables: mysql_test.\.*
  server-id: 8601-8604
  # (Optional) Sync data from tables created in the incremental phase.
  scan.binlog.newly-added-table.enabled: true
  # (Optional) Sync table and field comments.
  include-comments.enabled: true
  # (Optional) Prioritize unbounded shards to prevent potential TaskManager out-of-memory (OOM) errors.
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  # (Optional) Deserialize data only from matched tables to speed up reads.
  scan.only.deserialize.captured.tables.changelog.enabled: true

sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  # (Optional) The commit user. Set different commit users for different jobs to avoid conflicts.
  commit.user: your_job_name
  # (Optional) Enable deletion vectors to improve read performance.
  table.properties.deletion-vectors.enabled: true

transform:
  - source-table: mysql_test.tbl1
    partition-keys: id,pt
    projection: \*, __data_event_type__ AS op_type
    converter-after-transform: SOFT_DELETE
  - source-table: mysql_test.tbl2
    partition-keys: id,pt
    projection: \*, __data_event_type__ AS op_type
    converter-after-transform: SOFT_DELETE

For more information, see Data ingestion with Flink CDC.

Sync data from Kafka to DLF in real time

The following job reads CDC data from the Kafka inventory topic (tables customers and products in Debezium JSON format) and syncs it to the corresponding destination tables in DLF. Because Debezium JSON messages do not contain primary key information, the primary key is explicitly specified in the transform module.

source:
  type: kafka
  name: Kafka Source
  properties.bootstrap.servers: ${kafka.bootstrap.servers}
  topic: inventory
  scan.startup.mode: earliest-offset
  value.format: debezium-json
  debezium-json.distributed-tables: true

sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  # (Optional) The commit user. Set different commit users for different jobs to avoid conflicts.
  commit.user: your_job_name
  # (Optional) Enable deletion vectors to improve read performance.
  table.properties.deletion-vectors.enabled: true

# Debezium JSON does not contain primary key info, so explicitly specify primary keys.
transform:
  - source-table: \.*.\.*
    projection: \*
    primary-keys: id

Additional notes for Kafka sources:

  • The Kafka source supports three data formats: canal-json, debezium-json (default), and json.

  • When ingesting from multiple Kafka partitions to a single table in DLF, set debezium-json.distributed-tables or canal-json.distributed-tables to true.

  • The Kafka source supports multiple schema inference policies via the schema.inference.strategy option. For more information, see Message Queue for Apache Kafka.

For more information, see Data ingestion with Flink CDC.