全部产品
Search
文档中心

数据湖构建:Flink CDC以Iceberg REST访问DLF Catalog

更新时间:May 27, 2026

本文为您介绍如何在阿里云实时计算Flink版上实现Flink CDC以Iceberg REST与DLF Catalog对接。

前提条件

  • 已创建Flink全托管工作空间。如未创建,详情请参见开通实时计算Flink版

  • 请确保Flink工作空间与DLF位于同一地域下,且添加Flink所在VPC为白名单,请参见配置VPC白名单

使用限制

仅实时计算引擎VVR 11.6.0及以上版本支持以 Iceberg REST对接DLF Catalog。

创建Catalog

说明
  • 这里仅用于建立与DLF Catalog的映射连接,创建或删除Catalog仅影响映射关系,不会对DLF中的实际数据产生影响

  • 通过Iceberg REST 在DLF Catalog创建的表均为Iceberg表

  1. 登录实时计算控制台

  2. 单击目标工作空间操作列下的控制台,进入对应的工作空间。

  3. 在左侧导航栏,单击数据开发 > 数据查询

  4. 数据查询文本编辑区域,输入以下SQL,单击页面右下角的执行环境,选择对应版本的Session集群(VVR 11.2.0及以上引擎版本),执行SQL,即可以Iceberg REST创建DLF Catalog。

    CREATE CATALOG `catalog_name` 
     WITH (
        'type' = 'iceberg',
        'catalog-type' = 'rest',
        'uri' = 'http://cn-hangzhou-vpc.dlf.aliyuncs.com/iceberg',
        'warehouse' = 'iceberg_test',
        'rest.signing-region' = 'cn-hangzhou',
        'io-impl' = 'org.apache.iceberg.rest.DlfFileIO'
    );

    参数说明如下:

    参数

    描述

    是否必填

    示例

    type

    类型,固定为iceberg。从自定义Jar自动解析,请勿更改。

    iceberg

    catalog-type

    Catalog类型,固定为rest。

    rest

    token.provider

    Token提供方,固定为dlf。

    dlf

    uri

    访问DLF Rest Catalog的URI,详见Iceberg REST

    http://cn-hangzhou-vpc.dlf.aliyuncs.com/iceberg

    warehouse

    DLF Catalog名称。

    iceberg_test

    rest.signing-region

    DLF的Region ID,详见服务接入点

    cn-hangzhou

    io-impl

    固定值:org.apache.iceberg.rest.DlfFileIO

    org.apache.iceberg.rest.DlfFileIO

Flink CDC对接Catalog配置参数

创建数据摄入作业的操作流程,请参见Flink CDC数据摄入作业开发

在预先创建好 Flink Catalog映射的前提下,参考复用已有Catalog获取连接信息中复用已有Catalog获取连接信息的功能,Flink中数据摄入作业的Sink使用以下配置:

sink:
  type: iceberg
  using.built-in-catalog: catalog_name

配置示例

下面为您介绍几种典型的通过Flink CDC YAML作业将数据同步到数据湖DLF的配置方案:

MySQL整库同步数据湖DLF

MySQL整库同步数据到DLF的CDC YAML作业如下所示:

source:
  type: mysql
  name: MySQL Source
  hostname: ${mysql.hostname}
  port: ${mysql.port}
  username: ${mysql.username}
  password: ${mysql.password}
  tables: mysql_test.\.*
  server-id: 8601-8604
  #(可选)同步增量阶段新创建的表的数据
  scan.binlog.newly-added-table.enabled: true
  #(可选)同步表注释和字段注释
  include-comments.enabled: true
  #(可选)优先分发无界的分片以避免可能出现的TaskManager OutOfMemory问题
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  #(可选)开启解析过滤,加速读取
  scan.only.deserialize.captured.tables.changelog.enabled: true

sink:
  type: iceberg
  using.built-in-catalog: catalog_name
说明

【MySQL Source配置】,建议设置下列配置项,详情请参见MySQL

  1. 参数:scan.binlog.newly-added-table.enabled

    作用:同步增量阶段新创建的表的数据。

  2. 参数:include-comments.enabled

    作用:同步表注释和字段注释。

  3. 参数:scan.incremental.snapshot.unbounded-chunk-first.enabled

    作用:避免可能出现的TaskManager OutOfMemory问题。

  4. 参数:scan.only.deserialize.captured.tables.changelog.enabled: true

    作用:仅对作业匹配的表的数据进行解析,加速读取。

写入数据湖DLF分区表

数据摄入作业的源表通常不包含分区字段信息,如果希望写入的下游表为分区表,您需要通过Flink CDC数据摄入作业开发参考中的partition-keys设置分区字段,配置示例如下:

source:
  type: mysql
  name: MySQL Source
  hostname: ${mysql.hostname}
  port: ${mysql.port}
  username: ${mysql.username}
  password: ${mysql.password}
  tables: mysql_test.\.*
  server-id: 8601-8604
  #(可选)同步增量阶段新创建的表的数据
  scan.binlog.newly-added-table.enabled: true
  #(可选)同步表注释和字段注释
  include-comments.enabled: true
  #(可选)优先分发无界的分片以避免可能出现的TaskManager OutOfMemory问题
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  #(可选)开启解析过滤,加速读取
  scan.only.deserialize.captured.tables.changelog.enabled: true

sink:
  type: iceberg
  using.built-in-catalog: catalog_name

transform:
  - source-table: mysql_test.tbl1
    #(可选)设置分区字段  
    partition-keys: id,pt
  - source-table: mysql_test.tbl2
    partition-keys: id,pt

写入数据湖DLF Append Only表

数据摄入作业的源表包含完整的变更类型,如果希望写入的下游表为将删除操作转化为插入操作实现逻辑删除的功能,您可以通过Flink CDC数据摄入作业开发参考实现该需求,配置示例如下:

source:
  type: mysql
  name: MySQL Source
  hostname: ${mysql.hostname}
  port: ${mysql.port}
  username: ${mysql.username}
  password: ${mysql.password}
  tables: mysql_test.\.*
  server-id: 8601-8604
  #(可选)同步增量阶段新创建的表的数据
  scan.binlog.newly-added-table.enabled: true
  #(可选)同步表注释和字段注释
  include-comments.enabled: true
  #(可选)优先分发无界的分片以避免可能出现的TaskManager OutOfMemory问题
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  #(可选)开启解析过滤,加速读取
  scan.only.deserialize.captured.tables.changelog.enabled: true

sink:
  type: iceberg
  using.built-in-catalog: catalog_name
  
transform:
  - source-table: mysql_test.tbl1
    #(可选)设置分区字段
    partition-keys: id,pt
    #(可选)实现软删除
    projection: \*, __data_event_type__ AS op_type
    converter-after-transform: SOFT_DELETE
  - source-table: mysql_test.tbl2
    #(可选)设置分区字段
    partition-keys: id,pt
    #(可选)实现软删除
    projection: \*, __data_event_type__ AS op_type
    converter-after-transform: SOFT_DELETE
说明
  • 通过在projection中添加__data_event_type,将变更类型作为新增字段写入到下游表中。同时设置converter-after-transform为SOFT_DELETE,可以将删除操作转化为插入操作,使得下游能够完整记录全部变更操作。详见Flink CDC数据摄入作业开发参考

Kafka CDC数据实时同步到数据湖DLF

假设Kafka的inventory Topic中存储了两张表(customers和products)的变更数据,且数据格式为Debezium JSON。以下示例作业可将这两张表的数据分别同步到DLF对应的目标表:

source:
  type: kafka
  name: Kafka Source
  properties.bootstrap.servers: ${kafka.bootstrap.servers}
  topic: inventory
  scan.startup.mode: earliest-offset
  value.format: debezium-json
  debezium-json.distributed-tables: true

sink:
  type: iceberg
  using.built-in-catalog: catalog_name

# debezium-json不包含主键信息,需要另外为表添加主键  
transform:
  - source-table: \.*.\.*
    projection: \*
    primary-keys: id
说明
  • Kafka数据源读取的格式支持canal-json、debezium-json(默认)和json格式。

  • 当数据格式为debezium-json时,由于debezium-json消息不记录主键信息,需要通过transform规则手动为表添加主键:

    transform:
      - source-table: \.*.\.*
        projection: \*
        primary-keys: id
  • 当单表的数据分布在多个分区中,或数据位于不同分区中的表需要进行分库分表合并时,需要将配置项debezium-json.distributed-tablescanal-json.distributed-tables设为true。

  • kafka数据源支持多种Schema推导策略,可以通过配置项schema.inference.strategy设置,Schema推导和变更同步策略详情请参见消息队列Kafka

Kafka 日志数据实时同步到数据湖DLF

如果您的Kafka集群中存储的是自定义的JSON格式的数据,您可以配置CDC YAML作业同步Kafka数据到DLF存储,我们会为您提供自动数据类型推导、表结构推导和表结构演进的支持。

假设Kafka的inventory Topic中存储了一张日志表的数据,且数据格式为JSON。以下示例作业可将这张表的数据同步到DLF对应的目标表:

source:
  type: kafka
  name: Kafka Source
  properties.bootstrap.servers: ${kafka.bootstrap.servers}
  topic: inventory
  scan.startup.mode: earliest-offset
  value.format: json
  # (可选)递归式地展开JSON中的嵌套列
  json.infer-schema.flatten-nested-columns.enable: true
  # (可选)跳过前 100 次出现的解析异常;若超过 100 次则作业失败。
  ingestion.ignore-errors: true
  ingestion.error-tolerance.max-count: 100

sink:
  type: iceberg
  using.built-in-catalog: catalog_name

# 为表添加主键信息 
transform:
  - source-table: \.*.\.*
    projection: \*
    primary-keys: id
    
# 将 inventory topic 中所有的数据都写入到 test_database.inventory 表中
route:
  - source-table: inventory
    sink-table: test_database.inventory
    
pipeline:
  # (可选)将会导致处理异常的脏数据记录到日志中
  dirty-data.collector:
    name: Logger Dirty Data Collector
    type: logger

假设Kafka的inventory Topic中存储了多张日志表的数据,数据格式为JSON,且在JSON内容中的databaseName、tableName字段中提供了库名、表名信息。以下示例作业可将这个Topic中的多张表的数据同步到DLF对应的目标表:

source:
  type: kafka
  name: Kafka Source
  properties.bootstrap.servers: ${kafka.bootstrap.servers}
  topic: inventory
  scan.startup.mode: earliest-offset
  value.format: json
  # (可选)递归式地展开JSON中的嵌套列
  json.infer-schema.flatten-nested-columns.enable: true
  # 使用 databaseName 字段中的值作为库名,使用 tableName 字段中的值作为表名
  json.decode.parser-table-id.fields: databaseName,tableName
  # (可选)跳过前 100 次出现的解析异常;若超过 100 次则作业失败。
  ingestion.ignore-errors: true
  ingestion.error-tolerance.max-count: 100

sink:
  type: iceberg
  using.built-in-catalog: catalog_name

# 为表添加主键信息 
transform:
  - source-table: \.*.\.*
    projection: \*
    primary-keys: id
    
# 将 ods.inventory、ods.customer,ods.user 中的数据分别写入到 test_database.inventory,test_database.customer,test_database.user 表中
route:
  - source-table: ods.inventory
    sink-table: test_database.inventory
  - source-table: ods.customer
    sink-table: test_database.customer
  - source-table: ods.user
    sink-table: test_database.user   
    
pipeline:
  # (可选)将会导致处理异常的脏数据记录到日志中
  dirty-data.collector:
    name: Logger Dirty Data Collector
    type: logger

如果您希望了解更多JSON格式的Kafka源表的表结构推导与演进策略,可以参考表结构解析和变更同步策略说明。

如果您希望添加进行更精细的作业配置,可以查询Flink CDC数据摄入作业开发参考