Configure schema evolution for Flink CDC data ingestion jobs to control how schema changes in the source are applied to the sink.
Supported schema change events
Flink CDC data ingestion jobs can sync the following schema changes from source to sink:
-
Create table —
CREATE TABLE ... -
Add column —
ALTER TABLE ... ADD COLUMN ... -
Alter column type —
ALTER TABLE ... MODIFY COLUMN ... -
Drop column —
ALTER TABLE ... DROP COLUMN ... -
Rename column —
ALTER TABLE ... RENAME COLUMN ... -
Truncate table —
TRUNCATE TABLE ... -
Drop table —
DROP TABLE ...
The framework only supports the schema change types listed above. Unsupported schema changes cause job exceptions and require a stateless restart to recover.
Schema evolution behavior
Set schema.change.behavior in the pipeline module to control how Flink CDC handles schema changes:
pipeline:
schema.change.behavior: EVOLVE
LENIENT (default)
Converts unsupported schema changes into sink-compatible operations:
-
`rename.column`: Sends an
alter.column.typeevent to change the original column to nullable, then anadd.columnevent to add a new nullable column with the new name. The original column is kept. -
`drop.column`: Sends an
alter.column.typeevent and sets the column type to nullable instead of dropping it. -
New columns: The system still sends the add-column event, but the field type becomes nullable.
-
`drop.table` and `truncate.table`: Not sent to the sink.
Use LENIENT when you want the job to sync schema changes as automatically as possible with maximum compatibility.
IGNORE
All schema evolution is ignored. Upstream schema changes are not applied to the downstream sink table, and data continues to flow from existing columns.
Use IGNORE when your sink does not support schema changes, or when you want to keep receiving data from existing columns without modifying the sink schema.
EVOLVE
All schema changes are applied to the sink table exactly as they occur in the source. If applying a schema change fails, the job throws an exception and triggers a failure restart.
If the sink cannot process a schema change event, the job may fail and cannot self-recover.
Use EVOLVE when you need strict, exact schema synchronization.
TRY_EVOLVE
Attempts to apply schema changes to the sink table. If the sink cannot process the change, the job does not fail or restart — it continues and attempts to handle affected data by transforming it.
If a schema change fails to apply, subsequent data may lose columns or be truncated to match the existing sink schema.
Use TRY_EVOLVE when you want strict schema synchronization with some fault tolerance.
EXCEPTION
Throws an exception when any schema change event is received. No schema changes are allowed.
Use EXCEPTION when you must guarantee that only data — not schema — is synced.
Control schema changes at the sink
For fine-grained control, use include.schema.changes and exclude.schema.changes in the sink module to filter which schema change event types reach the sink.
| Config option | Required | Data type | Default value | Note |
|---|---|---|---|---|
include.schema.changes |
No | List<String> |
No default value | All changes are supported by default |
exclude.schema.changes |
No | List<String> |
No default value | Takes priority over include.schema.changes |
Configurable event types
| Event type | Description |
|---|---|
add.column |
Add a column |
alter.column.type |
Change column type |
create.table |
Create a table |
drop.column |
Drop a column |
drop.table |
Drop a table |
rename.column |
Rename a column |
truncate.table |
Truncate a table |
Partial matching is supported. For example, specifying drop matches both drop.column and drop.table. Specifying table matches create.table, truncate.table, and drop.table. Specifying column matches add.column, alter.column.type, rename.column, and drop.column.
Examples
Example 1: Apply all schema changes
Set schema.change.behavior to EVOLVE in the pipeline module:
source:
type: mysql
name: MySQL Source
hostname: ${mysql.hostname}
port: ${mysql.port}
username: ${mysql.username}
password: ${mysql.password}
tables: ${mysql.source.table}
server-id: 7601-7604
sink:
type: values
name: Values Sink
print.enabled: true
sink.print.logger: true
pipeline:
name: mysql to print job
schema.change.behavior: EVOLVE
Example 2: Apply only selected schema change types
Allow table creation and all column events, but exclude drop.column:
source:
type: mysql
name: MySQL Source
hostname: ${mysql.hostname}
port: ${mysql.port}
username: ${mysql.username}
password: ${mysql.password}
tables: ${mysql.source.table}
server-id: 7601-7604
sink:
type: values
name: Values Sink
print.enabled: true
sink.print.logger: true
include.schema.changes: [create.table, column] # `column` matches add.column, alter.column.type, rename.column, and drop.column
exclude.schema.changes: [drop.column] # Excludes drop.column even though it is matched by `column`
pipeline:
name: mysql to print job
schema.change.behavior: EVOLVE