本文為您介紹如何在阿里雲Realtime ComputeFlink版上實現Flink CDC以Iceberg REST與DLF Catalog對接。
前提條件
已建立Flink全託管工作空間。如未建立,詳情請參見開通Realtime ComputeFlink版。
請確保Flink工作空間與DLF位於同一地區下,且添加Flink所在VPC為白名單,請參見配置VPC白名單。
使用限制
僅Realtime Compute引擎VVR 11.6.0及以上版本支援以 Iceberg REST對接DLF Catalog。
建立Catalog
這裡僅用於建立與DLF Catalog的映射串連,建立或刪除Catalog僅影響映射關係,不會對DLF中的實際資料產生影響。
通過Iceberg REST 在DLF Catalog建立的表均為Iceberg表。
單擊目標工作空間操作列下的控制台,進入對應的工作空間。
在左側導覽列,單擊。
在資料查詢文本編輯地區,輸入以下SQL,單擊頁面右下角的執行環境,選擇對應版本的Session叢集(VVR 11.2.0及以上引擎版本),執行SQL,即可以Iceberg REST建立DLF Catalog。
CREATE CATALOG `catalog_name` WITH ( 'type' = 'iceberg', 'catalog-type' = 'rest', 'uri' = 'http://cn-hangzhou-vpc.dlf.aliyuncs.com/iceberg', 'warehouse' = 'iceberg_test', 'rest.signing-region' = 'cn-hangzhou', 'io-impl' = 'org.apache.iceberg.rest.DlfFileIO' );參數說明如下:
參數
描述
是否必填
樣本
type
類型,固定為iceberg。從自訂Jar自動解析,請勿更改。
是
iceberg
catalog-type
Catalog類型,固定為rest。
是
rest
token.provider
Token提供方,固定為dlf。
是
dlf
uri
訪問DLF Rest Catalog的URI,詳見Iceberg REST。
是
http://cn-hangzhou-vpc.dlf.aliyuncs.com/iceberg
warehouse
DLF Catalog名稱。
是
iceberg_test
rest.signing-region
DLF的Region ID,詳見服務存取點。
是
cn-hangzhou
io-impl
固定值:org.apache.iceberg.rest.DlfFileIO
是
org.apache.iceberg.rest.DlfFileIO
Flink CDC對接Catalog配置參數
建立資料攝入作業的操作流程,請參見Flink CDC資料攝入作業開發。
在預先建立好 Flink Catalog映射的前提下,參考複用已有Catalog擷取串連資訊中複用已有Catalog擷取串連資訊的功能,Flink中資料攝入作業的Sink使用以下配置:
sink:
type: iceberg
using.built-in-catalog: catalog_name配置樣本
下面為您介紹幾種典型的通過Flink CDC YAML作業將資料同步到資料湖DLF的配置方案:
MySQL整庫同步資料湖DLF
MySQL整庫同步資料到DLF的CDC YAML作業如下所示:
source:
type: mysql
name: MySQL Source
hostname: ${mysql.hostname}
port: ${mysql.port}
username: ${mysql.username}
password: ${mysql.password}
tables: mysql_test.\.*
server-id: 8601-8604
#(可選)同步增量階段新建立的表的資料
scan.binlog.newly-added-table.enabled: true
#(可選)同步表注釋和欄位注釋
include-comments.enabled: true
#(可選)優先分發無界的分區以避免可能出現的TaskManager OutOfMemory問題
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
#(可選)開啟解析過濾,加速讀取
scan.only.deserialize.captured.tables.changelog.enabled: true
sink:
type: iceberg
using.built-in-catalog: catalog_name【MySQL Source配置】,建議設定下列配置項,詳情請參見MySQL。
參數:scan.binlog.newly-added-table.enabled
作用:同步增量階段新建立的表的資料。
參數:include-comments.enabled
作用:同步表注釋和欄位注釋。
參數:scan.incremental.snapshot.unbounded-chunk-first.enabled
作用:避免可能出現的TaskManager OutOfMemory問題。
參數:scan.only.deserialize.captured.tables.changelog.enabled: true
作用:僅對作業匹配的表的資料進行解析,加速讀取。
寫入資料湖DLF分區表
資料攝入作業的源表通常不包含分區欄位資訊,如果希望寫入的下遊表為分區表,您需要通過Flink CDC資料攝入作業開發參考中的partition-keys設定分區欄位,配置樣本如下:
source:
type: mysql
name: MySQL Source
hostname: ${mysql.hostname}
port: ${mysql.port}
username: ${mysql.username}
password: ${mysql.password}
tables: mysql_test.\.*
server-id: 8601-8604
#(可選)同步增量階段新建立的表的資料
scan.binlog.newly-added-table.enabled: true
#(可選)同步表注釋和欄位注釋
include-comments.enabled: true
#(可選)優先分發無界的分區以避免可能出現的TaskManager OutOfMemory問題
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
#(可選)開啟解析過濾,加速讀取
scan.only.deserialize.captured.tables.changelog.enabled: true
sink:
type: iceberg
using.built-in-catalog: catalog_name
transform:
- source-table: mysql_test.tbl1
#(可選)設定分區欄位
partition-keys: id,pt
- source-table: mysql_test.tbl2
partition-keys: id,pt寫入資料湖DLF Append Only表
資料攝入作業的源表包含完整的變更類型,如果希望寫入的下遊表為將刪除操作轉化為插入操作實現邏輯刪除的功能,您可以通過Flink CDC資料攝入作業開發參考實現該需求,配置樣本如下:
source:
type: mysql
name: MySQL Source
hostname: ${mysql.hostname}
port: ${mysql.port}
username: ${mysql.username}
password: ${mysql.password}
tables: mysql_test.\.*
server-id: 8601-8604
#(可選)同步增量階段新建立的表的資料
scan.binlog.newly-added-table.enabled: true
#(可選)同步表注釋和欄位注釋
include-comments.enabled: true
#(可選)優先分發無界的分區以避免可能出現的TaskManager OutOfMemory問題
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
#(可選)開啟解析過濾,加速讀取
scan.only.deserialize.captured.tables.changelog.enabled: true
sink:
type: iceberg
using.built-in-catalog: catalog_name
transform:
- source-table: mysql_test.tbl1
#(可選)設定分區欄位
partition-keys: id,pt
#(可選)實現虛刪除
projection: \*, __data_event_type__ AS op_type
converter-after-transform: SOFT_DELETE
- source-table: mysql_test.tbl2
#(可選)設定分區欄位
partition-keys: id,pt
#(可選)實現虛刪除
projection: \*, __data_event_type__ AS op_type
converter-after-transform: SOFT_DELETE通過在projection中添加__data_event_type,將變更類型作為新增欄位寫入到下遊表中。同時設定converter-after-transform為SOFT_DELETE,可以將刪除操作轉化為插入操作,使得下遊能夠完整記錄全部變更操作。詳見Flink CDC資料攝入作業開發參考。
Kafka CDC資料即時同步到資料湖DLF
假設Kafka的inventory Topic中儲存了兩張表(customers和products)的變更資料,且資料格式為Debezium JSON。以下樣本作業可將這兩張表的資料分別同步到DLF對應的目標表:
source:
type: kafka
name: Kafka Source
properties.bootstrap.servers: ${kafka.bootstrap.servers}
topic: inventory
scan.startup.mode: earliest-offset
value.format: debezium-json
debezium-json.distributed-tables: true
sink:
type: iceberg
using.built-in-catalog: catalog_name
# debezium-json不包含主鍵資訊,需要另外為表添加主鍵
transform:
- source-table: \.*.\.*
projection: \*
primary-keys: idKafka資料來源讀取的格式支援canal-json、debezium-json(預設)和json格式。
當資料格式為debezium-json時,由於debezium-json訊息不記錄主鍵資訊,需要通過transform規則手動為表添加主鍵:
transform: - source-table: \.*.\.* projection: \* primary-keys: id當單表的資料分布在多個分區中,或資料位元於不同分區中的表需要進行分庫分表合并時,需要將配置項debezium-json.distributed-tables或canal-json.distributed-tables設為true。
kafka資料來源支援多種Schema推導策略,可以通過配置項schema.inference.strategy設定,Schema推導和變更同步策略詳情請參見訊息佇列Kafka。
Kafka 日誌資料即時同步到資料湖DLF
如果您的Kafka叢集中儲存的是自訂的JSON格式的資料,您可以配置CDC YAML作業同步Kafka資料到DLF儲存,我們會為您提供自動資料類型推導、表結構推導和表結構演化的支援。
假設Kafka的inventory Topic中儲存了一張日誌表的資料,且資料格式為JSON。以下樣本作業可將這張表的資料同步到DLF對應的目標表:
source:
type: kafka
name: Kafka Source
properties.bootstrap.servers: ${kafka.bootstrap.servers}
topic: inventory
scan.startup.mode: earliest-offset
value.format: json
# (可選)遞迴式地展開JSON中的嵌套列
json.infer-schema.flatten-nested-columns.enable: true
# (可選)跳過前 100 次出現的解析異常;若超過 100 次則作業失敗。
ingestion.ignore-errors: true
ingestion.error-tolerance.max-count: 100
sink:
type: iceberg
using.built-in-catalog: catalog_name
# 為表添加主鍵資訊
transform:
- source-table: \.*.\.*
projection: \*
primary-keys: id
# 將 inventory topic 中所有的資料都寫入到 test_database.inventory 表中
route:
- source-table: inventory
sink-table: test_database.inventory
pipeline:
# (可選)將會導致處理異常的髒資料記錄到日誌中
dirty-data.collector:
name: Logger Dirty Data Collector
type: logger假設Kafka的inventory Topic中儲存了多張日誌表的資料,資料格式為JSON,且在JSON內容中的databaseName、tableName欄位中提供了庫名、表名資訊。以下樣本作業可將這個Topic中的多張表的資料同步到DLF對應的目標表:
source:
type: kafka
name: Kafka Source
properties.bootstrap.servers: ${kafka.bootstrap.servers}
topic: inventory
scan.startup.mode: earliest-offset
value.format: json
# (可選)遞迴式地展開JSON中的嵌套列
json.infer-schema.flatten-nested-columns.enable: true
# 使用 databaseName 欄位中的值作為庫名,使用 tableName 欄位中的值作為表名
json.decode.parser-table-id.fields: databaseName,tableName
# (可選)跳過前 100 次出現的解析異常;若超過 100 次則作業失敗。
ingestion.ignore-errors: true
ingestion.error-tolerance.max-count: 100
sink:
type: iceberg
using.built-in-catalog: catalog_name
# 為表添加主鍵資訊
transform:
- source-table: \.*.\.*
projection: \*
primary-keys: id
# 將 ods.inventory、ods.customer,ods.user 中的資料分別寫入到 test_database.inventory,test_database.customer,test_database.user 表中
route:
- source-table: ods.inventory
sink-table: test_database.inventory
- source-table: ods.customer
sink-table: test_database.customer
- source-table: ods.user
sink-table: test_database.user
pipeline:
# (可選)將會導致處理異常的髒資料記錄到日誌中
dirty-data.collector:
name: Logger Dirty Data Collector
type: logger如果您希望瞭解更多JSON格式的Kafka源表的表結構推導與演化策略,可以參考表結構解析和變更同步策略說明。
如果您希望添加進行更精細的作業配置,可以查詢Flink CDC資料攝入作業開發參考。