本文向您介紹了在Flink CDC資料攝入作業中與表結構變更(Schema Evolution)相關的配置。
表結構變更同步配置
Flink CDC資料攝入作業支援將資料來源的Schema變更同步到下遊表,例如建立表、添加列、重新命名列、更改列類型、刪除列和刪除表等。
下遊表可能不支援全部的Schema變更,您可以通過在Pipeline模組添加schema.change.behavior配置來修改Schema變更發生時下遊表的處理方式。
pipeline:
schema.change.behavior: EVOLVE目前,架構只支援同步以下表結構變更類型。未列出的表結構變更可能導致作業異常,需要無狀態重啟恢複。
Create Table(建表事件,
CREATE TABLE ...)Add Column(加列事件,
ALTER TABLE ... ADD COLUMN ...)Alter Column Type(修改列類型事件,
ALTER TABLE ... MODIFY COLUMN ...)Drop Column(刪除列事件,
ALTER TABLE ... DROP COLUMN ...)Rename Column(重新命名列事件,
ALTER TABLE ... RENAME COLUMN ...)Truncate Table(清空表事件,
TRUNCATE TABLE ...)Drop Table(刪除表事件,
DROP TABLE ...)
Schema變更模式
模式 | 說明 |
LENIENT(預設) | Flink CDC資料攝入作業會對錶結構變更進行寬容轉換,並發送給下遊結果表,轉換遵循以下規則:
當您期望資料攝入作業儘可能自動化地同步表結構變更,推薦使用此模式。 |
IGNORE | Flink CDC資料攝入作業會忽略所有表結構變更,即上遊表結構修改都不會應用到下遊結果表。 當您的下遊表不支援或不希望發生表結構變更,想要繼續從未修改的列中繼續接收資料時,推薦使用此模式。 |
EVOLVE | Flink CDC資料攝入作業會將所有Schema更改應用於下遊表。 如果Schema變更在下遊表應用失敗,作業會拋出異常並觸發故障重啟。 當您期望資料攝入作業裡的表結構變更同步儘可能嚴格,推薦使用此模式。 重要 在此模式下,若下遊無法支援應用所有的表結構變更事件,可能導致作業Failover且無法自愈。 |
TRY_EVOLVE | Flink CDC資料攝入作業會嘗試將Schema變更應用到下遊表,如果下遊表不支援處理髮送的Schema變更,作業不會失敗重啟,而是嘗試通過轉換後續資料方式進行處理。 當您期望資料攝入作業裡的表結構變更同步儘可能嚴格,同時有一定的容錯性,推薦使用此模式。 重要 TRY_EVOLVE模式下,如果發生Schema變更應用失敗,可能導致上遊後續到來的資料出現部分列丟失、被截斷以適配下遊表結構。 |
EXCEPTION | Flink CDC資料攝入作業不允許任何Schema變更行為,收到Schema變更事件時,作業會拋出異常。 當您需要確保資料攝入作業不會發生表結構變更,只允許同步資料時,推薦使用此模式。 |
下遊Schema變更接收控制
在資料同步情境中,Flink CDC 提供精細化 Schema 變更管理策略,允許您通過規則配置控制下遊接收的表結構變更類型,避免意外變更導致的資料丟失或服務中斷。
您可以通過在sink模組中設定include.schema.changes和exclude.schema.changes選項來控制。
參數 | 說明 | 是否必填 | 資料類型 | 預設值 | 備忘 |
include.schema.changes | 支援應用的Schema變更。 | 否 | List<String> | 無 | 預設支援所有變更。 |
exclude.schema.changes | 不支援應用的Schema變更。 | 否 | List<String> | 無 | 優先順序高於 |
以下是可配置架構變更事件類型的完整列表:
事件類型 | 說明 |
| 新增列。 |
| 變更列類型。 |
| 建立表。 |
| 刪除列。 |
| 刪除表。 |
| 修改列名。 |
| 清空資料。 |
Schema變更支援部分匹配。例如,傳入drop相當於同時傳入drop.column和 drop.table;傳入 table相當於同時傳入create.table、truncate.table和drop.table。
程式碼範例
樣本1:Schema變更行為配置為EVOLVE,將上遊的表結構變更同步到下遊。
source:
type: mysql
name: MySQL Source
hostname: ${mysql.hostname}
port: ${mysql.port}
username: ${mysql.username}
password: ${mysql.password}
tables: ${mysql.source.table}
server-id: 7601-7604
sink:
type: values
name: Values Sink
print.enabled: true
sink.print.logger: true
pipeline:
name: mysql to print job
schema.change.behavior: EVOLVE樣本2:將上遊表的建立表和列相關事件應用到下遊,並忽略刪除列事件。
source:
type: mysql
name: MySQL Source
hostname: ${mysql.hostname}
port: ${mysql.port}
username: ${mysql.username}
password: ${mysql.password}
tables: ${mysql.source.table}
server-id: 7601-7604
sink:
type: values
name: Values Sink
print.enabled: true
sink.print.logger: true
include.schema.changes: [create.table, column] # 匹配了 CreateTable、AddColumn、AlterColumnType、RenameColumn、和 DropColumn 事件
exclude.schema.changes: [drop.column] # 排除了 DropColumn 事件
pipeline:
name: mysql to print job
schema.change.behavior: EVOLVE