All Products
Search
Document Center

Realtime Compute for Apache Flink:Ingest data into a DLF data lake in real time using Flink CDC

Last Updated:Mar 26, 2026

Use Flink CDC to write real-time change data from MySQL or Kafka into Data Lake Formation (DLF).

DLF is a fully managed platform that provides unified metadata, data storage, and data management, including metadata management, permission management, and storage optimization. Data ingestion jobs write to DLF through its Paimon Catalog, which lets you ingest entire large-scale databases into the data lake in real time. For more information about DLF, see What is Data Lake Formation?.

Prerequisites

Before you begin, ensure that you have:

  • A DLF instance with the Paimon Catalog enabled

  • The URI of your DLF Rest Catalog Server (format: http://<region-id>-vpc.dlf.aliyuncs.com, for example, http://cn-hangzhou-vpc.dlf.aliyuncs.com)

  • The name of your DLF Catalog warehouse

DLF sink configuration

All examples in this topic use the following Paimon sink parameters to connect to DLF.

Parameter

Required

Description

catalog.properties.metastore

Yes

Metastore type. Set to rest.

catalog.properties.token.provider

Yes

Token provider. Set to dlf.

catalog.properties.uri

Yes

URI of the DLF Rest Catalog Server. Format: http://<region-id>-vpc.dlf.aliyuncs.com.

catalog.properties.warehouse

Yes

Name of the DLF Catalog.

table.properties.deletion-vectors.enabled

No

Set to true to enable deletion vectors. Significantly improves read performance with minimal impact on write and update performance, enabling near-real-time updates and high-speed queries.

Usage notes

  • Do not add file merging or bucket-related parameters (such as bucket and num-sorted-run.compaction-trigger) to the sink configuration. DLF manages file merging automatically, and adding these parameters causes conflicts.

  • For MySQL source parameters, see MySQL.

Synchronize an entire MySQL database to DLF

The following CDC YAML job synchronizes an entire MySQL database to DLF. The source section captures all tables matching mysql_test.*, and the sink section uses DLF's Paimon Catalog as the destination.

source:
  type: mysql
  name: MySQL Source
  hostname: ${mysql.hostname}
  port: ${mysql.port}
  username: ${mysql.username}
  password: ${mysql.password}
  tables: mysql_test.\.*
  server-id: 8601-8604
  # (Optional) Synchronize data from tables newly created in the incremental phase.
  scan.binlog.newly-added-table.enabled: true
  # (Optional) Synchronize table and field comments.
  include-comments.enabled: true
  # (Optional) Prioritize the distribution of unbounded chunks to prevent potential TaskManager OutOfMemory issues.
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  # (Optional) Enable parsing filters to accelerate reads.
  scan.only.deserialize.captured.tables.changelog.enabled: true

sink:
  type: paimon
  # The Metastore type. Set to rest.
  catalog.properties.metastore: rest
  # The token provider. Set to dlf.
  catalog.properties.token.provider: dlf
  # The URI to access the DLF Rest Catalog Server.
  catalog.properties.uri: dlf_uri
  # The name of the DLF Catalog.
  catalog.properties.warehouse: your_warehouse
  # (Optional) Enable deletion vectors to improve read performance.
  table.properties.deletion-vectors.enabled: true

Write to partitioned tables in DLF

Source tables in data ingestion jobs don't include partition field information. To write to a partitioned table in DLF, add a transform block and specify partition-keys for each source table. For details on partition-keys syntax, see Flink CDC Data Ingestion Job Development Reference.

source:
  type: mysql
  name: MySQL Source
  hostname: ${mysql.hostname}
  port: ${mysql.port}
  username: ${mysql.username}
  password: ${mysql.password}
  tables: mysql_test.\.*
  server-id: 8601-8604
  # (Optional) Synchronize data from tables newly created in the incremental phase.
  scan.binlog.newly-added-table.enabled: true
  # (Optional) Synchronize table and field comments.
  include-comments.enabled: true
  # (Optional) Prioritize the distribution of unbounded chunks to prevent potential TaskManager OutOfMemory issues.
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  # (Optional) Enable parsing filters to accelerate reads.
  scan.only.deserialize.captured.tables.changelog.enabled: true

sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  # (Optional) Enable deletion vectors to improve read performance.
  table.properties.deletion-vectors.enabled: true

transform:
  - source-table: mysql_test.tbl1
    # Set partition fields.
    partition-keys: id,pt
  - source-table: mysql_test.tbl2
    partition-keys: id,pt

Write to append-only tables in DLF

Source tables contain full CDC change data, including delete operations. To convert delete operations to insert operations (soft delete) and write all changes to an append-only table in DLF, add __data_event_type__ to the projection and set converter-after-transform to SOFT_DELETE in the transform block. This ensures every change event — including deletes — is recorded in the destination table as an insert row.

For more information, see Flink CDC data ingestion job development reference.

source:
  type: mysql
  name: MySQL Source
  hostname: ${mysql.hostname}
  port: ${mysql.port}
  username: ${mysql.username}
  password: ${mysql.password}
  tables: mysql_test.\.*
  server-id: 8601-8604
  # (Optional) Synchronize data from tables newly created in the incremental phase.
  scan.binlog.newly-added-table.enabled: true
  # (Optional) Synchronize table and field comments.
  include-comments.enabled: true
  # (Optional) Prioritize the distribution of unbounded chunks to prevent potential TaskManager OutOfMemory issues.
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  # (Optional) Enable parsing filters to accelerate reads.
  scan.only.deserialize.captured.tables.changelog.enabled: true

sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  # (Optional) Enable deletion vectors to improve read performance.
  table.properties.deletion-vectors.enabled: true

transform:
  - source-table: mysql_test.tbl1
    # Set partition fields.
    partition-keys: id,pt
    # Add the change type as a new field, and convert deletes to inserts.
    projection: \*, __data_event_type__ AS op_type
    converter-after-transform: SOFT_DELETE
  - source-table: mysql_test.tbl2
    partition-keys: id,pt
    projection: \*, __data_event_type__ AS op_type
    converter-after-transform: SOFT_DELETE

Synchronize Kafka CDC data to DLF in real time

If MySQL data is already flowing to Kafka — for example through real-time distribution — configure a CDC YAML job to read from Kafka and write to DLF.

The following example reads from the inventory Kafka topic, which stores change data for the customers and products tables in Debezium JSON format, and synchronizes each table to its corresponding destination table in DLF. Because debezium-json messages don't carry primary key information, the transform block manually adds id as the primary key.

source:
  type: kafka
  name: Kafka Source
  properties.bootstrap.servers: ${kafka.bootstrap.servers}
  topic: inventory
  scan.startup.mode: earliest-offset
  value.format: debezium-json
  debezium-json.distributed-tables: true

sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  # (Optional) Enable deletion vectors to improve read performance.
  table.properties.deletion-vectors.enabled: true

# debezium-json does not include primary key information. Add primary keys using a transform rule.
transform:
  - source-table: \.*.\.*
    projection: \*
    primary-keys: id

Kafka source notes:

  • Supported formats: canal-json, debezium-json (default), and json.

  • Schema evolution: If a Kafka message schema changes (for example, a new field is added), the change is automatically synchronized to the Paimon table schema.

  • Primary keys with debezium-json: debezium-json messages don't record primary key information. Add primary keys manually using a transform rule:

    transform:
      - source-table: \.*.\.*
        projection: \*
        primary-keys: id
  • Distributed tables: If a single table's data is spread across multiple Kafka partitions, or to merge tables from different partitions using sharding, set debezium-json.distributed-tables or canal-json.distributed-tables to true.

  • Schema inference policies: The Kafka source supports multiple schema inference policies, configured with schema.inference.strategy. For details, see Message Queue for Kafka.