全部產品
Search
文件中心

Data Lake Formation:Flink CDC以Iceberg REST訪問DLF Catalog

更新時間:May 28, 2026

本文為您介紹如何在阿里雲Realtime ComputeFlink版上實現Flink CDC以Iceberg REST與DLF Catalog對接。

前提條件

使用限制

僅Realtime Compute引擎VVR 11.6.0及以上版本支援以 Iceberg REST對接DLF Catalog。

建立Catalog

說明
  • 這裡僅用於建立與DLF Catalog的映射串連,建立或刪除Catalog僅影響映射關係,不會對DLF中的實際資料產生影響

  • 通過Iceberg REST 在DLF Catalog建立的表均為Iceberg表

  1. 登入Realtime Compute控制台

  2. 單擊目標工作空間操作列下的控制台,進入對應的工作空間。

  3. 在左側導覽列,單擊資料開發 > 資料查詢

  4. 資料查詢文本編輯地區,輸入以下SQL,單擊頁面右下角的執行環境,選擇對應版本的Session叢集(VVR 11.2.0及以上引擎版本),執行SQL,即可以Iceberg REST建立DLF Catalog。

    CREATE CATALOG `catalog_name` 
     WITH (
        'type' = 'iceberg',
        'catalog-type' = 'rest',
        'uri' = 'http://cn-hangzhou-vpc.dlf.aliyuncs.com/iceberg',
        'warehouse' = 'iceberg_test',
        'rest.signing-region' = 'cn-hangzhou',
        'io-impl' = 'org.apache.iceberg.rest.DlfFileIO'
    );

    參數說明如下:

    參數

    描述

    是否必填

    樣本

    type

    類型,固定為iceberg。從自訂Jar自動解析,請勿更改。

    iceberg

    catalog-type

    Catalog類型,固定為rest。

    rest

    token.provider

    Token提供方,固定為dlf。

    dlf

    uri

    訪問DLF Rest Catalog的URI,詳見Iceberg REST

    http://cn-hangzhou-vpc.dlf.aliyuncs.com/iceberg

    warehouse

    DLF Catalog名稱。

    iceberg_test

    rest.signing-region

    DLF的Region ID,詳見服務存取點

    cn-hangzhou

    io-impl

    固定值:org.apache.iceberg.rest.DlfFileIO

    org.apache.iceberg.rest.DlfFileIO

Flink CDC對接Catalog配置參數

建立資料攝入作業的操作流程,請參見Flink CDC資料攝入作業開發

在預先建立好 Flink Catalog映射的前提下,參考複用已有Catalog擷取串連資訊中複用已有Catalog擷取串連資訊的功能,Flink中資料攝入作業的Sink使用以下配置:

sink:
  type: iceberg
  using.built-in-catalog: catalog_name

配置樣本

下面為您介紹幾種典型的通過Flink CDC YAML作業將資料同步到資料湖DLF的配置方案:

MySQL整庫同步資料湖DLF

MySQL整庫同步資料到DLF的CDC YAML作業如下所示:

source:
  type: mysql
  name: MySQL Source
  hostname: ${mysql.hostname}
  port: ${mysql.port}
  username: ${mysql.username}
  password: ${mysql.password}
  tables: mysql_test.\.*
  server-id: 8601-8604
  #(可選)同步增量階段新建立的表的資料
  scan.binlog.newly-added-table.enabled: true
  #(可選)同步表注釋和欄位注釋
  include-comments.enabled: true
  #(可選)優先分發無界的分區以避免可能出現的TaskManager OutOfMemory問題
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  #(可選)開啟解析過濾,加速讀取
  scan.only.deserialize.captured.tables.changelog.enabled: true

sink:
  type: iceberg
  using.built-in-catalog: catalog_name
說明

【MySQL Source配置】,建議設定下列配置項,詳情請參見MySQL

  1. 參數:scan.binlog.newly-added-table.enabled

    作用:同步增量階段新建立的表的資料。

  2. 參數:include-comments.enabled

    作用:同步表注釋和欄位注釋。

  3. 參數:scan.incremental.snapshot.unbounded-chunk-first.enabled

    作用:避免可能出現的TaskManager OutOfMemory問題。

  4. 參數:scan.only.deserialize.captured.tables.changelog.enabled: true

    作用:僅對作業匹配的表的資料進行解析,加速讀取。

寫入資料湖DLF分區表

資料攝入作業的源表通常不包含分區欄位資訊,如果希望寫入的下遊表為分區表,您需要通過Flink CDC資料攝入作業開發參考中的partition-keys設定分區欄位,配置樣本如下:

source:
  type: mysql
  name: MySQL Source
  hostname: ${mysql.hostname}
  port: ${mysql.port}
  username: ${mysql.username}
  password: ${mysql.password}
  tables: mysql_test.\.*
  server-id: 8601-8604
  #(可選)同步增量階段新建立的表的資料
  scan.binlog.newly-added-table.enabled: true
  #(可選)同步表注釋和欄位注釋
  include-comments.enabled: true
  #(可選)優先分發無界的分區以避免可能出現的TaskManager OutOfMemory問題
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  #(可選)開啟解析過濾,加速讀取
  scan.only.deserialize.captured.tables.changelog.enabled: true

sink:
  type: iceberg
  using.built-in-catalog: catalog_name

transform:
  - source-table: mysql_test.tbl1
    #(可選)設定分區欄位  
    partition-keys: id,pt
  - source-table: mysql_test.tbl2
    partition-keys: id,pt

寫入資料湖DLF Append Only表

資料攝入作業的源表包含完整的變更類型,如果希望寫入的下遊表為將刪除操作轉化為插入操作實現邏輯刪除的功能,您可以通過Flink CDC資料攝入作業開發參考實現該需求,配置樣本如下:

source:
  type: mysql
  name: MySQL Source
  hostname: ${mysql.hostname}
  port: ${mysql.port}
  username: ${mysql.username}
  password: ${mysql.password}
  tables: mysql_test.\.*
  server-id: 8601-8604
  #(可選)同步增量階段新建立的表的資料
  scan.binlog.newly-added-table.enabled: true
  #(可選)同步表注釋和欄位注釋
  include-comments.enabled: true
  #(可選)優先分發無界的分區以避免可能出現的TaskManager OutOfMemory問題
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  #(可選)開啟解析過濾,加速讀取
  scan.only.deserialize.captured.tables.changelog.enabled: true

sink:
  type: iceberg
  using.built-in-catalog: catalog_name
  
transform:
  - source-table: mysql_test.tbl1
    #(可選)設定分區欄位
    partition-keys: id,pt
    #(可選)實現虛刪除
    projection: \*, __data_event_type__ AS op_type
    converter-after-transform: SOFT_DELETE
  - source-table: mysql_test.tbl2
    #(可選)設定分區欄位
    partition-keys: id,pt
    #(可選)實現虛刪除
    projection: \*, __data_event_type__ AS op_type
    converter-after-transform: SOFT_DELETE
說明
  • 通過在projection中添加__data_event_type,將變更類型作為新增欄位寫入到下遊表中。同時設定converter-after-transform為SOFT_DELETE,可以將刪除操作轉化為插入操作,使得下遊能夠完整記錄全部變更操作。詳見Flink CDC資料攝入作業開發參考

Kafka CDC資料即時同步到資料湖DLF

假設Kafka的inventory Topic中儲存了兩張表(customers和products)的變更資料,且資料格式為Debezium JSON。以下樣本作業可將這兩張表的資料分別同步到DLF對應的目標表:

source:
  type: kafka
  name: Kafka Source
  properties.bootstrap.servers: ${kafka.bootstrap.servers}
  topic: inventory
  scan.startup.mode: earliest-offset
  value.format: debezium-json
  debezium-json.distributed-tables: true

sink:
  type: iceberg
  using.built-in-catalog: catalog_name

# debezium-json不包含主鍵資訊,需要另外為表添加主鍵  
transform:
  - source-table: \.*.\.*
    projection: \*
    primary-keys: id
說明
  • Kafka資料來源讀取的格式支援canal-json、debezium-json(預設)和json格式。

  • 當資料格式為debezium-json時,由於debezium-json訊息不記錄主鍵資訊,需要通過transform規則手動為表添加主鍵:

    transform:
      - source-table: \.*.\.*
        projection: \*
        primary-keys: id
  • 當單表的資料分布在多個分區中,或資料位元於不同分區中的表需要進行分庫分表合并時,需要將配置項debezium-json.distributed-tablescanal-json.distributed-tables設為true。

  • kafka資料來源支援多種Schema推導策略,可以通過配置項schema.inference.strategy設定,Schema推導和變更同步策略詳情請參見訊息佇列Kafka

Kafka 日誌資料即時同步到資料湖DLF

如果您的Kafka叢集中儲存的是自訂的JSON格式的資料,您可以配置CDC YAML作業同步Kafka資料到DLF儲存,我們會為您提供自動資料類型推導、表結構推導和表結構演化的支援。

假設Kafka的inventory Topic中儲存了一張日誌表的資料,且資料格式為JSON。以下樣本作業可將這張表的資料同步到DLF對應的目標表:

source:
  type: kafka
  name: Kafka Source
  properties.bootstrap.servers: ${kafka.bootstrap.servers}
  topic: inventory
  scan.startup.mode: earliest-offset
  value.format: json
  # (可選)遞迴式地展開JSON中的嵌套列
  json.infer-schema.flatten-nested-columns.enable: true
  # (可選)跳過前 100 次出現的解析異常;若超過 100 次則作業失敗。
  ingestion.ignore-errors: true
  ingestion.error-tolerance.max-count: 100

sink:
  type: iceberg
  using.built-in-catalog: catalog_name

# 為表添加主鍵資訊 
transform:
  - source-table: \.*.\.*
    projection: \*
    primary-keys: id
    
# 將 inventory topic 中所有的資料都寫入到 test_database.inventory 表中
route:
  - source-table: inventory
    sink-table: test_database.inventory
    
pipeline:
  # (可選)將會導致處理異常的髒資料記錄到日誌中
  dirty-data.collector:
    name: Logger Dirty Data Collector
    type: logger

假設Kafka的inventory Topic中儲存了多張日誌表的資料,資料格式為JSON,且在JSON內容中的databaseName、tableName欄位中提供了庫名、表名資訊。以下樣本作業可將這個Topic中的多張表的資料同步到DLF對應的目標表:

source:
  type: kafka
  name: Kafka Source
  properties.bootstrap.servers: ${kafka.bootstrap.servers}
  topic: inventory
  scan.startup.mode: earliest-offset
  value.format: json
  # (可選)遞迴式地展開JSON中的嵌套列
  json.infer-schema.flatten-nested-columns.enable: true
  # 使用 databaseName 欄位中的值作為庫名,使用 tableName 欄位中的值作為表名
  json.decode.parser-table-id.fields: databaseName,tableName
  # (可選)跳過前 100 次出現的解析異常;若超過 100 次則作業失敗。
  ingestion.ignore-errors: true
  ingestion.error-tolerance.max-count: 100

sink:
  type: iceberg
  using.built-in-catalog: catalog_name

# 為表添加主鍵資訊 
transform:
  - source-table: \.*.\.*
    projection: \*
    primary-keys: id
    
# 將 ods.inventory、ods.customer,ods.user 中的資料分別寫入到 test_database.inventory,test_database.customer,test_database.user 表中
route:
  - source-table: ods.inventory
    sink-table: test_database.inventory
  - source-table: ods.customer
    sink-table: test_database.customer
  - source-table: ods.user
    sink-table: test_database.user   
    
pipeline:
  # (可選)將會導致處理異常的髒資料記錄到日誌中
  dirty-data.collector:
    name: Logger Dirty Data Collector
    type: logger

如果您希望瞭解更多JSON格式的Kafka源表的表結構推導與演化策略,可以參考表結構解析和變更同步策略說明。

如果您希望添加進行更精細的作業配置,可以查詢Flink CDC資料攝入作業開發參考