全部產品
Search
文件中心

Realtime Compute for Apache Flink:Flink CDC表結構變更同步配置

更新時間:Feb 06, 2026

本文向您介紹了在Flink CDC資料攝入作業中與表結構變更(Schema Evolution)相關的配置。

表結構變更同步配置

Flink CDC資料攝入作業支援將資料來源的Schema變更同步到下遊表,例如建立表、添加列、重新命名列、更改列類型、刪除列和刪除表等。

下遊表可能不支援全部的Schema變更,您可以通過在Pipeline模組添加schema.change.behavior配置來修改Schema變更發生時下遊表的處理方式。

pipeline:
  schema.change.behavior: EVOLVE

目前,架構只支援同步以下表結構變更類型。未列出的表結構變更可能導致作業異常,需要無狀態重啟恢複。

  • Create Table(建表事件,CREATE TABLE ...

  • Add Column(加列事件,ALTER TABLE ... ADD COLUMN ...

  • Alter Column Type(修改列類型事件,ALTER TABLE ... MODIFY COLUMN ...

  • Drop Column(刪除列事件,ALTER TABLE ... DROP COLUMN ...

  • Rename Column(重新命名列事件,ALTER TABLE ... RENAME COLUMN ...

  • Truncate Table(清空表事件,TRUNCATE TABLE ...

  • Drop Table(刪除表事件,DROP TABLE ...

Schema變更模式

模式

說明

LENIENT(預設)

Flink CDC資料攝入作業會對錶結構變更進行寬容轉換,並發送給下遊結果表,轉換遵循以下規則:

  • 不發送Drop table和Truncate table變更。

  • 列重新命名時,改為發送更改列類型和新增列兩個事件。原有的列不刪除,更改列類型為nullable,同時新增一個列名為新名稱、資料類型改為nullable的列。

  • 刪除列時,改為發送更改列類型事件,將對應欄位類型變為nullable。

  • 新增列時仍發送新增列事件,但欄位類型會變為nullable。

當您期望資料攝入作業儘可能自動化地同步表結構變更,推薦使用此模式。

IGNORE

Flink CDC資料攝入作業會忽略所有表結構變更,即上遊表結構修改都不會應用到下遊結果表。

當您的下遊表不支援或不希望發生表結構變更,想要繼續從未修改的列中繼續接收資料時,推薦使用此模式。

EVOLVE

Flink CDC資料攝入作業會將所有Schema更改應用於下遊表。

如果Schema變更在下遊表應用失敗,作業會拋出異常並觸發故障重啟。

當您期望資料攝入作業裡的表結構變更同步儘可能嚴格,推薦使用此模式。

重要

在此模式下,若下遊無法支援應用所有的表結構變更事件,可能導致作業Failover且無法自愈。

TRY_EVOLVE

Flink CDC資料攝入作業會嘗試將Schema變更應用到下遊表,如果下遊表不支援處理髮送的Schema變更,作業不會失敗重啟,而是嘗試通過轉換後續資料方式進行處理。

當您期望資料攝入作業裡的表結構變更同步儘可能嚴格,同時有一定的容錯性,推薦使用此模式。

重要

TRY_EVOLVE模式下,如果發生Schema變更應用失敗,可能導致上遊後續到來的資料出現部分列丟失、被截斷以適配下遊表結構。

EXCEPTION

Flink CDC資料攝入作業不允許任何Schema變更行為,收到Schema變更事件時,作業會拋出異常。

當您需要確保資料攝入作業不會發生表結構變更,只允許同步資料時,推薦使用此模式。

下遊Schema變更接收控制

在資料同步情境中,Flink CDC 提供精細化 Schema 變更管理策略,允許您通過規則配置控制下遊接收的表結構變更類型,避免意外變更導致的資料丟失或服務中斷。

您可以通過在sink模組中設定include.schema.changesexclude.schema.changes選項來控制。

參數

說明

是否必填

資料類型

預設值

備忘

include.schema.changes

支援應用的Schema變更。

List<String>

預設支援所有變更。

exclude.schema.changes

不支援應用的Schema變更。

List<String>

優先順序高於include.schema.changes

以下是可配置架構變更事件類型的完整列表:

事件類型

說明

add.column

新增列。

alter.column.type

變更列類型。

create.table

建立表。

drop.column

刪除列。

drop.table

刪除表。

rename.column

修改列名。

truncate.table

清空資料。

說明

Schema變更支援部分匹配。例如,傳入drop相當於同時傳入drop.columndrop.table;傳入 table相當於同時傳入create.tabletruncate.tabledrop.table

程式碼範例

  • 樣本1:Schema變更行為配置為EVOLVE,將上遊的表結構變更同步到下遊。

source:
  type: mysql
  name: MySQL Source
  hostname: ${mysql.hostname}
  port: ${mysql.port}
  username: ${mysql.username}
  password: ${mysql.password}
  tables: ${mysql.source.table}
  server-id: 7601-7604

sink:
  type: values
  name: Values Sink
  print.enabled: true
  sink.print.logger: true
  
pipeline:
  name: mysql to print job
  schema.change.behavior: EVOLVE
  • 樣本2:將上遊表的建立表和列相關事件應用到下遊,並忽略刪除列事件。

source:
  type: mysql
  name: MySQL Source
  hostname: ${mysql.hostname}
  port: ${mysql.port}
  username: ${mysql.username}
  password: ${mysql.password}
  tables: ${mysql.source.table}
  server-id: 7601-7604

sink:
  type: values
  name: Values Sink
  print.enabled: true
  sink.print.logger: true
  include.schema.changes: [create.table, column] # 匹配了 CreateTable、AddColumn、AlterColumnType、RenameColumn、和 DropColumn 事件
  exclude.schema.changes: [drop.column] # 排除了 DropColumn 事件
  
pipeline:
  name: mysql to print job
  schema.change.behavior: EVOLVE