本文為您介紹在源端表結構變更、資料邏輯增強(注入中繼資料/計算資料行/邏輯刪除)、異構路由(分表合并、整庫同步)及精準管控(表過濾、時間戳記啟動)等複雜業務情境下使用Flink CDC資料攝入作業的最佳實務。
同步新增表
在Flink CDC資料攝入作業中,支援兩種情況下的新增表:
增量空表HotSync:新增表無歷史資料(僅包含後續變更),作業無需重啟即可動態捕獲。
帶歷史資料表同步:新增表已存在歷史資料,需全量+增量同步處理且重啟作業生效
HotSync新增空表,不存在歷史資料
Flink CDC作業通過開啟scan.binlog.newly-added-table.enabled參數,可在不重啟的情況下即時同步增量階段新建立的無歷史資料空表。建議配置,無需重啟作業。
當Flink CDC資料攝入作業正在同步MySQL dlf_test庫中的所有表時,源庫建立了無歷史資料的空表products,只需在作業中啟用scan.binlog.newly-added-table.enabled: true參數配置,即可實現不重啟作業同步新增表。請參考如下配置:
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: dlf_test.\.*
server-id: 8601-8604
#(可選)同步增量階段新建立的表的資料
scan.binlog.newly-added-table.enabled: true
#(可選)同步表注釋和欄位注釋
include-comments.enabled: true
#(可選)優先分發無界的分區以避免可能出現的TaskManager OutOfMemory問題
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
#(可選)開啟解析過濾,加速讀取
scan.only.deserialize.captured.tables.changelog.enabled: true
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
#(可選)提交使用者名稱,建議為不同作業設定不同的提交使用者以避免衝突
commit.user: your_job_name
#(可選)開啟刪除向量,提升讀取效能
table.properties.deletion-vectors.enabled: true這樣配置的CDC YAML作業運行後會自動在目標端建立dlf_test資料庫下的全部新增表。
開啟scan.newly-added-table.enabled參數,僅在scan.startup.mode為initial(預設)時生效。
帶歷史資料表同步
假設MySQL資料庫中已存在表customers和products,但是啟動時只需要同步customers表,作業配置如下:
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: dlf_test.customers
server-id: 8601-8604
#(可選)同步表注釋和欄位注釋
include-comments.enabled: true
#(可選)優先分發無界的分區以避免可能出現的TaskManager OutOfMemory問題
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
#(可選)開啟解析過濾,加速讀取
scan.only.deserialize.captured.tables.changelog.enabled: true
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
#(可選)提交使用者名稱,建議為不同作業設定不同的提交使用者以避免衝突
commit.user: your_job_name
#(可選)開啟刪除向量,提升讀取效能
table.properties.deletion-vectors.enabled: true作業運行一段時間後,如果需要額外同步該資料庫下全部的表和歷史資料,需要重啟作業,按照如下步驟操作:
保留Savepoint停止作業。
修改MySQL資料來源tables配置為需要匹配的表,同時MySQL資料來源去掉
scan.binlog.newly-added-table.enabled參數並開啟scan.newly-added-table.enabled。
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: dlf_test.\.*
server-id: 8601-8604
#(可選)同步新增表的全量和增量資料
scan.newly-added-table.enabled: true
#(可選)同步表注釋和欄位注釋
include-comments.enabled: true
#(可選)優先分發無界的分區以避免可能出現的TaskManager OutOfMemory問題
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
#(可選)開啟解析過濾,加速讀取
scan.only.deserialize.captured.tables.changelog.enabled: true
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
#(可選)提交使用者名稱,建議為不同作業設定不同的提交使用者以避免衝突
commit.user: your_job_name
#(可選)開啟刪除向量,提升讀取效能
table.properties.deletion-vectors.enabled: true從保留的Savepoint重啟作業。
不支援同時開啟scan.binlog.newly-added-table.enabled和scan.newly-added-table.enabled。
排除指定表
在Flink CDC資料攝入作業中,支援對某些表進行過濾以避免在下遊對這些表進行建立和資料同步。
例如MySQL資料庫dlf_test中有customers和products等多張表,但是您希望排除名為products_tmp的表,作業需要如下配置:
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: dlf_test.\.*
#(可選)排除某些不希望同步的表
tables.exclude: dlf_test.products_tmp
server-id: 8601-8604
#(可選)同步增量階段新建立的表的資料
scan.binlog.newly-added-table.enabled: true
#(可選)同步表注釋和欄位注釋
include-comments.enabled: true
#(可選)優先分發無界的分區以避免可能出現的TaskManager OutOfMemory問題
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
#(可選)開啟解析過濾,加速讀取
scan.only.deserialize.captured.tables.changelog.enabled: true
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
#(可選)提交使用者名稱,建議為不同作業設定不同的提交使用者以避免衝突
commit.user: your_job_name
#(可選)開啟刪除向量,提升讀取效能
table.properties.deletion-vectors.enabled: true啟用此配置的Flink CDC資料攝入作業將自動在目標端動態建立dlf_test庫下所有表(排除products_tmp表),即時保持表結構與資料同步。
tables.exclude參數支援通過正則匹配多張表。如果 tables.exclude排除的表與tables需要同步的表存在重疊,這些重疊的表將被排除,最終不會被同步。請確保兩個參數配置的表集的交集被正確處理。
中繼資料列與計算資料行增強
添加中繼資料列
寫入資料時,可以使用transform模組添加中繼資料列到資料中。例如如下的作業配置,可以將表名、操作時間和類型寫入到下遊表資料中,詳情請見Transform模組。
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: dlf_test.\.*
server-id: 8601-8604
#(可選)同步新增表的全量和增量資料
scan.newly-added-table.enabled: true
#(可選)同步表注釋和欄位注釋
include-comments.enabled: true
#(可選)優先分發無界的分區以避免可能出現的TaskManager OutOfMemory問題
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
#(可選)開啟解析過濾,加速讀取
scan.only.deserialize.captured.tables.changelog.enabled: true
# 將操作時間作為中繼資料
metadata-column.include-list: op_ts
transform:
- source-table: dlf_test.customers
projection: __schema_name__ || '.' || __table_name__ as identifier, op_ts, __data_event_type__ as op, *
#(可選)修改主鍵
primary-keys: id,identifier
description: add identifier, op_ts and op
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
#(可選)提交使用者名稱,建議為不同作業設定不同的提交使用者以避免衝突
commit.user: your_job_name
#(可選)開啟刪除向量,提升讀取效能
table.properties.deletion-vectors.enabled: true使用MySQL作為Source時,需要添加metadata-column.include-list: op_ts才會將操作時間作為中繼資料發送到下遊。詳情請參見MySQL。
資料攝入作業的源表包含完整的變更類型,如果希望寫入的下遊表為將刪除操作轉化為插入操作實現邏輯刪除的功能,您可以在transform模組中添加converter-after-transform: SOFT_DELETE配置以實現該需求。
添加計算資料行
寫入資料時,可以使用transform模組添加計算資料行到資料中。例如如下的作業配置,通過對created_at欄位進行轉換產生dt欄位,並作為下遊表的分區欄位。
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: dlf_test.\.*
server-id: 8601-8604
#(可選)同步新增表的全量和增量資料
scan.newly-added-table.enabled: true
#(可選)同步表注釋和欄位注釋
include-comments.enabled: true
#(可選)優先分發無界的分區以避免可能出現的TaskManager OutOfMemory問題
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
#(可選)開啟解析過濾,加速讀取
scan.only.deserialize.captured.tables.changelog.enabled: true
# 將操作時間作為中繼資料
metadata-column.include-list: op_ts
transform:
- source-table: dlf_test.customers
projection: DATE_FORMAT(created_at, 'yyyyMMdd') as dt, *
#(可選)設定分區欄位
partition-keys: dt
description: add dt
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
#(可選)提交使用者名稱,建議為不同作業設定不同的提交使用者以避免衝突
commit.user: your_job_name
#(可選)開啟刪除向量,提升讀取效能
table.properties.deletion-vectors.enabled: true使用MySQL作為Source時,需要添加metadata-column.include-list: op_ts才會將操作時間作為中繼資料發送到下遊。詳情請參見MySQL。
表名映射
在將上遊表同步到下遊表的過程中,您可能還有使用route模組對錶名進行替換的需求,下面兩種列出了對錶名進行替換的典型情境和Flink CDC資料攝入作業配置案例。
分庫分表合并
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: dlf_test.\.*
server-id: 8601-8604
#(可選)同步新增表的全量和增量資料
scan.newly-added-table.enabled: true
#(可選)同步表注釋和欄位注釋
include-comments.enabled: true
#(可選)優先分發無界的分區以避免可能出現的TaskManager OutOfMemory問題
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
#(可選)開啟解析過濾,加速讀取
scan.only.deserialize.captured.tables.changelog.enabled: true
route:
# 將dlf_test庫下所有以product_開頭和數字結尾作為表名的表合并到dlf.products中
- source-table: dlf_test.product_[0-9]+
sink-table: dlf.products
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
#(可選)提交使用者名稱,建議為不同作業設定不同的提交使用者以避免衝突
commit.user: your_job_name
#(可選)開啟刪除向量,提升讀取效能
table.properties.deletion-vectors.enabled: true整庫同步
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: dlf_test.\.*
server-id: 8601-8604
#(可選)同步新增表的全量和增量資料
scan.newly-added-table.enabled: true
#(可選)同步表注釋和欄位注釋
include-comments.enabled: true
#(可選)優先分發無界的分區以避免可能出現的TaskManager OutOfMemory問題
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
#(可選)開啟解析過濾,加速讀取
scan.only.deserialize.captured.tables.changelog.enabled: true
route:
# 統一修改表名,將dlf_test庫下所有表同步到dlf庫中以ods_加源表表名作為表名的表中
- source-table: dlf_test.\.*
sink-table: dlf.ods_<>
replace-symbol: <>
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
#(可選)提交使用者名稱,建議為不同作業設定不同的提交使用者以避免衝突
commit.user: your_job_name
#(可選)開啟刪除向量,提升讀取效能
table.properties.deletion-vectors.enabled: true構建複雜業務綜合案例
下面的Flink CDC資料攝入作業展示了一個綜合使用上述功能構建複雜業務的案例,您可以基於此代碼進行適當調整實現業務需求。
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: dlf_test.\.*
#(可選)排除某些不希望同步的表
tables.exclude: dlf_test.products_tmp
server-id: 8601-8604
#(可選)同步新增表的全量和增量資料
scan.newly-added-table.enabled: true
#(可選)同步表注釋和欄位注釋
include-comments.enabled: true
#(可選)優先分發無界的分區以避免可能出現的TaskManager OutOfMemory問題
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
#(可選)開啟解析過濾,加速讀取
scan.only.deserialize.captured.tables.changelog.enabled: true
# 將操作時間作為中繼資料
metadata-column.include-list: op_ts
transform:
- source-table: dlf_test.customers
projection: __schema_name__ || '.' || __table_name__ as identifier, op_ts, __data_event_type__ as op, DATE_FORMAT(created_at, 'yyyyMMdd') as dt, *
#(可選)修改主鍵
primary-keys: id,identifier
#(可選)設定分區欄位
partition-keys: dt
#(可選)將刪除的資料會轉化為插入
converter-after-transform: SOFT_DELETE
route:
# 將dlf_test庫下所有表同步到dlf庫中以ods_加源表表名作為表名的表中
- source-table: dlf_test.\.*
sink-table: dlf.ods_<>
replace-symbol: <>
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
#(可選)提交使用者名稱,建議為不同作業設定不同的提交使用者以避免衝突
commit.user: your_job_name
#(可選)開啟刪除向量,提升讀取效能
table.properties.deletion-vectors.enabled: true指定時間戳記啟動
在無狀態啟動Flink CDC資料攝入作業時,支援指定資料來源的開始時間,協助您從指定的Binlog位置恢複資料的讀取。
營運頁面配置
在作業營運頁面,選擇作業無狀態啟動時可以指定源表開始時間。

作業參數配置
在作業草稿中,可以通過配置參數指定源表開始時間。
以MySQL Source為例,您可以在作業配置中設定scan.startup.mode: timestamp以指定源表開始時間,作業配置樣本如下:
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: dlf_test.\.*
server-id: 8601-8604
#(可選)以指定源表開始時間模式啟動
scan.startup.mode: timestamp
# 在時間戳記啟動模式下指定啟動時間戳記
scan.startup.timestamp-millis: 1667232000000
#(可選)同步增量階段新建立的表的資料
scan.binlog.newly-added-table.enabled: true
#(可選)同步表注釋和欄位注釋
include-comments.enabled: true
#(可選)優先分發無界的分區以避免可能出現的TaskManager OutOfMemory問題
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
#(可選)開啟解析過濾,加速讀取
scan.only.deserialize.captured.tables.changelog.enabled: true
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
#(可選)提交使用者名稱,建議為不同作業設定不同的提交使用者以避免衝突
commit.user: your_job_name
#(可選)開啟刪除向量,提升讀取效能
table.properties.deletion-vectors.enabled: true指定源表啟動時間優先順序:營運介面配置 > 作業參數配置。