This topic describes how to configure schema evolution for Flink CDC data ingestion jobs.
Schema evolution configurations
Flink CDC data ingestion jobs support syncing schema changes from the source to the sink table. Supported events include creating tables, adding columns, renaming columns, altering column types, dropping columns, and dropping tables.
Descendant tables may not support all schema changes. You can add the schema.change.behavior configuration to the pipeline module to customize how the descendant table handles these changes.
pipeline:
schema.change.behavior: EVOLVEThe framework supports only the schema change types listed below. Unsupported schema changes can cause job exceptions and require a stateless restart to recover.
Create Table (
CREATE TABLE ...)Add Column (
ALTER TABLE ... ADD COLUMN ...)Alter Column Type (
ALTER TABLE ... MODIFY COLUMN ...)Drop Column (
ALTER TABLE ... DROP COLUMN ...)Rename Column (
ALTER TABLE ... RENAME COLUMN ...)Truncate Table (
TRUNCATE TABLE ...)Drop Table (
DROP TABLE ...)
Schema evolution behavior
pattern | Description |
LENIENT (default) | Flink CDC converts unsupported schema changes into sink-supported types based on the following rules:
Use this behavior when you want the data ingestion job to sync schema changes as automatically as possible. |
IGNORE | Flink CDC data ingestion jobs ignore all schema evolution, meaning that upstream table schema modifications are not applied to downstream sink tables. Use this behavior when your sink does not support schema changes or when you do not want them, and you want to keep receiving data from existing columns. |
EVOLVE | Flink CDC data ingestion jobs apply all schema changes to descendant tables. If applying a schema change to a descendant table fails, the job throws an exception and triggers a failure restart. Use this behavior when you want strict, exact schema synchronization in your data ingestion job. Important If the sink cannot process all schema change events, the job may fail over and cannot self-recover. |
TRY_EVOLVE | A Flink CDC data ingestion job attempts to apply schema changes to the descendant table. If the descendant table cannot process the schema changes, the job does not fail or restart. Instead, it attempts to handle the issue by transforming subsequent data. Use this behavior when you want strict schema synchronization with some fault tolerance. Important If a schema change fails to apply, later data may lose some columns or be truncated to match the sink table schema. |
EXCEPTION | Flink CDC data ingestion jobs do not allow any schema changes and throw an exception when a schema change event is received. Use this behavior when you must ensure that no schema changes occur—only data is synced. |
Control schema changes at the sink
In data synchronization scenarios, Flink CDC offers fine-grained schema evolution management. You can configure rules to control which schema change types reach the sink. This prevents data loss or service interruption from unexpected changes.
You can control this by setting the include.schema.changes and exclude.schema.changes options in the sink module.
Config option | Description | Required | Data type | Default value | Note |
include.schema.changes | Supports application schema changes. | No | List<String> | No default value | All changes are supported by default. |
exclude.schema.changes | You cannot change the schema for applications. | No | List<String> | No default value | This option has higher priority than |
The following is a complete list of configurable schema change event types:
Event type | Description |
| Add a column. |
| Change column type. |
| Create a table. |
| Delete a column. |
| Delete a table. |
| Modify column name. |
| Delete data. |
These event types support partial matching. For example, specifying drop matches both drop.column and drop.table. Specifying table matches create.table, truncate.table, and drop.table.
Examples
Example 1: Apply all schema changes from the source to the sink by setting schema.change.behavior to EVOLVE:
source:
type: mysql
name: MySQL Source
hostname: ${mysql.hostname}
port: ${mysql.port}
username: ${mysql.username}
password: ${mysql.password}
tables: ${mysql.source.table}
server-id: 7601-7604
sink:
type: values
name: Values Sink
print.enabled: true
sink.print.logger: true
pipeline:
name: mysql to print job
schema.change.behavior: EVOLVEExample 2: Apply only table creation and column-related events to the sink while excluding drop.column events:
source:
type: mysql
name: MySQL Source
hostname: ${mysql.hostname}
port: ${mysql.port}
username: ${mysql.username}
password: ${mysql.password}
tables: ${mysql.source.table}
server-id: 7601-7604
sink:
type: values
name: Values Sink
print.enabled: true
sink.print.logger: true
include.schema.changes: [create.table, column] # The `column` wildcard matches `add.column`, `alter.column.type`, `rename.column`, and `drop.column`.
exclude.schema.changes: [drop.column] # Excludes the `drop.column` event.
pipeline:
name: mysql to print job
schema.change.behavior: EVOLVE