本文向您介绍了在Flink CDC数据摄入作业中与表结构变更(Schema Evolution)相关的配置。
表结构变更同步配置
Flink CDC数据摄入作业支持将数据源的Schema变更同步到下游表,例如创建表、添加列、重命名列、更改列类型、删除列和删除表等。
下游表可能不支持全部的Schema变更,您可以通过在Pipeline模块添加schema.change.behavior配置来修改Schema变更发生时下游表的处理方式。
pipeline:
schema.change.behavior: EVOLVE目前,框架只支持同步以下表结构变更类型。未列出的表结构变更可能导致作业异常,需要无状态重启恢复。
Create Table(建表事件,
CREATE TABLE ...)Add Column(加列事件,
ALTER TABLE ... ADD COLUMN ...)Alter Column Type(修改列类型事件,
ALTER TABLE ... MODIFY COLUMN ...)Drop Column(删除列事件,
ALTER TABLE ... DROP COLUMN ...)Rename Column(重命名列事件,
ALTER TABLE ... RENAME COLUMN ...)Truncate Table(清空表事件,
TRUNCATE TABLE ...)Drop Table(删除表事件,
DROP TABLE ...)
Schema变更模式
模式 | 说明 |
LENIENT(默认) | Flink CDC数据摄入作业会对表结构变更进行宽容转换,并发送给下游结果表,转换遵循以下规则:
当您期望数据摄入作业尽可能自动化地同步表结构变更,推荐使用此模式。 |
IGNORE | Flink CDC数据摄入作业会忽略所有表结构变更,即上游表结构修改都不会应用到下游结果表。 当您的下游表不支持或不希望发生表结构变更,想要继续从未修改的列中继续接收数据时,推荐使用此模式。 |
EVOLVE | Flink CDC数据摄入作业会将所有Schema更改应用于下游表。 如果Schema变更在下游表应用失败,作业会抛出异常并触发故障重启。 当您期望数据摄入作业里的表结构变更同步尽可能严格,推荐使用此模式。 重要 在此模式下,若下游无法支持应用所有的表结构变更事件,可能导致作业Failover且无法自愈。 |
TRY_EVOLVE | Flink CDC数据摄入作业会尝试将Schema变更应用到下游表,如果下游表不支持处理发送的Schema变更,作业不会失败重启,而是尝试通过转换后续数据方式进行处理。 当您期望数据摄入作业里的表结构变更同步尽可能严格,同时有一定的容错性,推荐使用此模式。 重要 TRY_EVOLVE模式下,如果发生Schema变更应用失败,可能导致上游后续到来的数据出现部分列丢失、被截断以适配下游表结构。 |
EXCEPTION | Flink CDC数据摄入作业不允许任何Schema变更行为,收到Schema变更事件时,作业会抛出异常。 当您需要确保数据摄入作业不会发生表结构变更,只允许同步数据时,推荐使用此模式。 |
下游Schema变更接收控制
在数据同步场景中,Flink CDC 提供精细化 Schema 变更管理策略,允许您通过规则配置控制下游接收的表结构变更类型,避免意外变更导致的数据丢失或服务中断。
您可以通过在sink模块中设置include.schema.changes和exclude.schema.changes选项来控制。
参数 | 说明 | 是否必填 | 数据类型 | 默认值 | 备注 |
include.schema.changes | 支持应用的Schema变更。 | 否 | List<String> | 无 | 默认支持所有变更。 |
exclude.schema.changes | 不支持应用的Schema变更。 | 否 | List<String> | 无 | 优先级高于 |
以下是可配置架构变更事件类型的完整列表:
事件类型 | 说明 |
| 新增列。 |
| 变更列类型。 |
| 创建表。 |
| 删除列。 |
| 删除表。 |
| 修改列名。 |
| 清空数据。 |
Schema变更支持部分匹配。例如,传入drop相当于同时传入drop.column和 drop.table;传入 table相当于同时传入create.table、truncate.table和drop.table。
代码示例
示例1:Schema变更行为配置为EVOLVE,将上游的表结构变更同步到下游。
source:
type: mysql
name: MySQL Source
hostname: ${mysql.hostname}
port: ${mysql.port}
username: ${mysql.username}
password: ${mysql.password}
tables: ${mysql.source.table}
server-id: 7601-7604
sink:
type: values
name: Values Sink
print.enabled: true
sink.print.logger: true
pipeline:
name: mysql to print job
schema.change.behavior: EVOLVE示例2:将上游表的创建表和列相关事件应用到下游,并忽略删除列事件。
source:
type: mysql
name: MySQL Source
hostname: ${mysql.hostname}
port: ${mysql.port}
username: ${mysql.username}
password: ${mysql.password}
tables: ${mysql.source.table}
server-id: 7601-7604
sink:
type: values
name: Values Sink
print.enabled: true
sink.print.logger: true
include.schema.changes: [create.table, column] # 匹配了 CreateTable、AddColumn、AlterColumnType、RenameColumn、和 DropColumn 事件
exclude.schema.changes: [drop.column] # 排除了 DropColumn 事件
pipeline:
name: mysql to print job
schema.change.behavior: EVOLVE