全部產品
Search
文件中心

Realtime Compute for Apache Flink:資料庫資料即時入湖

更新時間:Feb 06, 2026

本文為您介紹使用資料攝入CDC YAML作業將即時資料寫入阿里雲資料湖構建的最佳實務。

阿里雲資料湖構建(Data Lake Formation,簡稱DLF)是一款全託管的統一中繼資料和資料存放區及管理平台,為客戶提供中繼資料管理、許可權管理和儲存最佳化等功能。詳情請參見什麼是資料湖構建

資料攝入作業支援使用DLF的Paimon Catalog作為目標端,您可以使用資料攝入作業完成大規模整庫資料入湖需求。

MySQL整庫同步資料湖DLF

MySQL整庫同步資料到DLF的CDC YAML作業如下所示:

source:
  type: mysql
  name: MySQL Source
  hostname: ${mysql.hostname}
  port: ${mysql.port}
  username: ${mysql.username}
  password: ${mysql.password}
  tables: mysql_test.\.*
  server-id: 8601-8604
  #(可選)同步增量階段新建立的表的資料
  scan.binlog.newly-added-table.enabled: true
  #(可選)同步表注釋和欄位注釋
  include-comments.enabled: true
  #(可選)優先分發無界的分區以避免可能出現的TaskManager OutOfMemory問題
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  #(可選)開啟解析過濾,加速讀取
  scan.only.deserialize.captured.tables.changelog.enabled: true

sink:
  type: paimon
  # Metastore類型,固定為rest
  catalog.properties.metastore: rest
  # Token提供方,固定為dlf
  catalog.properties.token.provider: dlf
  # 訪問DLF Rest Catalog Server的URI,格式為http://[region-id]-vpc.dlf.aliyuncs.com,如http://cn-hangzhou-vpc.dlf.aliyuncs.com
  catalog.properties.uri: dlf_uri
  # DLF Catalog名稱。
  catalog.properties.warehouse: your_warehouse
  #(可選)開啟刪除向量,提升讀取效能
  table.properties.deletion-vectors.enabled: true
說明
  • 【MySQL Source配置】更多參數詳情請參見MySQL

  • 在建表參數中添加deletion-vectors.enabled的配置,在不損失太大寫入更新效能的同時,獲得極大的讀取效能提升,達到近即時更新與極速查詢的效果

  • 在DLF中已經提供了自動進行檔案合并的功能,不建議在建表參數中添加檔案合并和bucket相關參數,例如bucket、num-sorted-run.compaction-trigger等。

寫入資料湖DLF分區表

資料攝入作業的源表通常不包含分區欄位資訊,如果希望寫入的下遊表為分區表,您需要通過Flink CDC資料攝入作業開發參考中的partition-keysFlink CDC資料攝入作業開發參考設定分區欄位,配置樣本如下:

source:
  type: mysql
  name: MySQL Source
  hostname: ${mysql.hostname}
  port: ${mysql.port}
  username: ${mysql.username}
  password: ${mysql.password}
  tables: mysql_test.\.*
  server-id: 8601-8604
  #(可選)同步增量階段新建立的表的資料
  scan.binlog.newly-added-table.enabled: true
  #(可選)同步表注釋和欄位注釋
  include-comments.enabled: true
  #(可選)優先分發無界的分區以避免可能出現的TaskManager OutOfMemory問題
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  #(可選)開啟解析過濾,加速讀取
  scan.only.deserialize.captured.tables.changelog.enabled: true

sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  #(可選)開啟刪除向量,提升讀取效能
  table.properties.deletion-vectors.enabled: true

transform:
  - source-table: mysql_test.tbl1
    #(可選)設定分區欄位  
    partition-keys: id,pt
  - source-table: mysql_test.tbl2
    partition-keys: id,pt

寫入資料湖DLF Append Only表

資料攝入作業的源表包含完整的變更類型,如果希望寫入的下遊表為將刪除操作轉化為插入操作實現邏輯刪除的功能,您可以通過Flink CDC資料攝入作業開發參考實現該需求,配置樣本如下:

source:
  type: mysql
  name: MySQL Source
  hostname: ${mysql.hostname}
  port: ${mysql.port}
  username: ${mysql.username}
  password: ${mysql.password}
  tables: mysql_test.\.*
  server-id: 8601-8604
  #(可選)同步增量階段新建立的表的資料
  scan.binlog.newly-added-table.enabled: true
  #(可選)同步表注釋和欄位注釋
  include-comments.enabled: true
  #(可選)優先分發無界的分區以避免可能出現的TaskManager OutOfMemory問題
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  #(可選)開啟解析過濾,加速讀取
  scan.only.deserialize.captured.tables.changelog.enabled: true

sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  #(可選)開啟刪除向量,提升讀取效能
  table.properties.deletion-vectors.enabled: true
  
transform:
  - source-table: mysql_test.tbl1
    #(可選)設定分區欄位
    partition-keys: id,pt
    #(可選)實現虛刪除
    projection: \*, __data_event_type__ AS op_type
    converter-after-transform: SOFT_DELETE
  - source-table: mysql_test.tbl2
    #(可選)設定分區欄位
    partition-keys: id,pt
    #(可選)實現虛刪除
    projection: \*, __data_event_type__ AS op_type
    converter-after-transform: SOFT_DELETE
說明
  • 通過在projection中添加__data_event_type,將變更類型作為新增欄位寫入到下遊表中。同時設定converter-after-transform為SOFT_DELETE,可以將刪除操作轉化為插入操作,使得下遊能夠完整記錄全部變更操作。詳見Flink CDC資料攝入作業開發參考

Kafka CDC資料即時同步到資料湖DLF

實現即時分發能夠將MySQL資料即時同步至Kafka,在此基礎上,您可以配置CDC YAML作業同步Kafka資料到DLF儲存。

假設Kafka的inventory Topic中儲存了兩張表(customers和products)的資料,且資料格式為Debezium JSON。以下樣本作業可將這兩張表的資料分別同步到DLF對應的目標表:

source:
  type: kafka
  name: Kafka Source
  properties.bootstrap.servers: ${kafka.bootstrap.servers}
  topic: inventory
  scan.startup.mode: earliest-offset
  value.format: debezium-json
  debezium-json.distributed-tables: true

sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  #(可選)開啟刪除向量,提升讀取效能
  table.properties.deletion-vectors.enabled: true

# debezium-json不包含主鍵資訊,需要另外為表添加主鍵  
transform:
  - source-table: \.*.\.*
    projection: \*
    primary-keys: id
說明
  • Kafka資料來源讀取的格式支援canal-json、debezium-json(預設)和json格式。

  • 支援 Kafka 訊息格式變更時,自動同步到 Paimon 表結構(如新增欄位),提升資料同步靈活性。

  • 當資料格式為debezium-json時,由於debezium-json訊息不記錄主鍵資訊,需要通過transform規則手動為表添加主鍵:

    transform:
      - source-table: \.*.\.*
        projection: \*
        primary-keys: id
  • 當單表的資料分布在多個分區中,或資料位元於不同分區中的表需要進行分庫分表合并時,需要將配置項debezium-json.distributed-tablescanal-json.distributed-tables設為true。

  • kafka資料來源支援多種Schema推導策略,可以通過配置項schema.inference.strategy設定,Schema推導和變更同步策略詳情請參見訊息佇列Kafka