全部產品
Search
文件中心

Realtime Compute for Apache Flink:即時資料入倉

更新時間:Aug 08, 2025

本文為您介紹使用資料攝入CDC YAML作業將即時資料寫入常用資料倉儲的最佳實務。

Flink CDC即時同步至Hologres數倉

使用資料攝入CDC YAML同步資料到Hologres搭建即時數倉,可以充分利用Flink強大的即時處理能力和Hologres提供的Binlog、行列共存和資源強隔離等能力,實現高效、可擴充的即時資料處理和分析。

MySQL即時同步至Hologres數倉

最基本的MySQL整庫同步Hologres的資料攝入CDC YAML作業如下所示:

source:
  type: mysql
  name: MySQL Source
  hostname: localhost
  port: 3306
  username: username
  password: password
  tables: holo_test.\.*
  server-id: 8601-8604
  #(可選)同步表注釋和欄位注釋
  include-comments.enabled: true
  #(可選)優先分發無界的分區以避免可能出現的TaskManager OutOfMemory問題
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  #(可選)優先分發無界的分區以避免可能出現的TaskManager OutOfMemory問題
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  #(可選)開啟解析過濾,加速讀取
  scan.only.deserialize.captured.tables.changelog.enabled: true  

sink:
  type: hologres
  name: Hologres Sink
  endpoint: ****.hologres.aliyuncs.com:80
  dbname: cdcyaml_test
  username: ${secret_values.holo-username}
  password: ${secret_values.holo-password}
  sink.type-normalize-strategy: BROADEN
說明
  • 建議在作業最開始啟動時就在MySQL Source中設定include-comments.enabled以同步表注釋和欄位注釋。詳情請參見MySQL

  • 建議在作業最開始啟動時就在MySQL Source中設定scan.incremental.snapshot.unbounded-chunk-first.enabled以避免可能出現的TaskManager OutOfMemory問題。詳情請參見MySQL

使用更寬容的類型映射

Hologres連接器無法處理列類型變更事件,但支援了多種類型映射關係。為了更好地支援資料來源的變更,您可以通過將多個MySQL資料類型映射到更寬的Hologres類型,跳過不必要的類型變更事件,從而讓作業正常運行。您可以通過配置項sink.type-normalize-strategy變更,預設值為STANDARD,詳情請見資料攝入YAML作業Hologres連接器類型映射

例如,可以使用ONLY_BIGINT_OR_TEXT讓類型只對應到Hologres的int8和text類型。此時如果MySQL某個列的類型從INT改為BIGINT,Hologres將這兩種MySQL類型對應到int8類型,作業不會因為無法處理類型轉換而報錯。

source:
  type: mysql
  name: MySQL Source
  hostname: localhost
  port: 3306
  username: username
  password: password
  tables: holo_test.\.*
  server-id: 8601-8604
  #(可選)同步表注釋和欄位注釋
  include-comments.enabled: true
  #(可選)優先分發無界的分區以避免可能出現的TaskManager OutOfMemory問題
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  #(可選)優先分發無界的分區以避免可能出現的TaskManager OutOfMemory問題
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  #(可選)開啟解析過濾,加速讀取
  scan.only.deserialize.captured.tables.changelog.enabled: true

sink:
  type: hologres
  name: Hologres Sink
  endpoint: ****.hologres.aliyuncs.com:80
  dbname: cdcyaml_test
  username: ${secret_values.holo-username}
  password: ${secret_values.holo-password}
  sink.type-normalize-strategy: ONLY_BIGINT_OR_TEXT

分區表寫入

資料攝入YAML在使用Hologres連接器作為目標端時支援寫入分區表,詳情請參見分區表寫入

Kafka即時同步至Hologres數倉

實現即時分發為您提供了在Kafka中儲存MySQL資料的方案。進一步的,資料攝入YAML支援同步Kafka資料到Hologres搭建即時數倉。

假設Kafka中名為inventory的topic中存有debezium-json格式的兩張表customers和products的資料,下面的作業可以將兩張表的資料分別同步到Hologres對應的兩張表中。

source:
  type: kafka
  name: Kafka Source
  properties.bootstrap.servers: ${kafka.bootstrap.servers}
  topic: inventory
  scan.startup.mode: earliest-offset
  value.format: debezium-json
  debezium-json.distributed-tables: true

sink:
  type: hologres
  name: Hologres Sink
  endpoint: ****.hologres.aliyuncs.com:80
  dbname: cdcyaml_test
  username: ${secret_values.holo-username}
  password: ${secret_values.holo-password}
  sink.type-normalize-strategy: ONLY_BIGINT_OR_TEXT
 
transform:
  - source-table: \.*.\.*
    projection: \*
    primary-keys: id
說明
  • Kafka資料來源讀取的格式支援json、canal-json和debezium-json(預設)。

  • 當資料格式為debezium-json時,需要通過transform規則手動為表添加主鍵:

    transform:
      - source-table: \.*.\.*
        projection: \*
        primary-keys: id
  • 當單表的資料分布在多個分區中,或資料位元於不同分區中的表需要進行分庫分表合并時,需要將配置項debezium-json.distributed-tablescanal-json.distributed-tables設為true。

  • kafka資料來源支援多種Schema推導策略,可以通過配置項schema.inference.strategy設定,Schema推導和變更同步策略詳情請參見訊息佇列Kafka