All Products
Search
Document Center

Realtime Compute for Apache Flink:Schema evolution configurations

Last Updated:Mar 26, 2026

Configure schema evolution for Flink CDC data ingestion jobs to control how schema changes in the source are applied to the sink.

Supported schema change events

Flink CDC data ingestion jobs can sync the following schema changes from source to sink:

  • Create table — CREATE TABLE ...

  • Add column — ALTER TABLE ... ADD COLUMN ...

  • Alter column type — ALTER TABLE ... MODIFY COLUMN ...

  • Drop column — ALTER TABLE ... DROP COLUMN ...

  • Rename column — ALTER TABLE ... RENAME COLUMN ...

  • Truncate table — TRUNCATE TABLE ...

  • Drop table — DROP TABLE ...

Important

The framework only supports the schema change types listed above. Unsupported schema changes cause job exceptions and require a stateless restart to recover.

Schema evolution behavior

Set schema.change.behavior in the pipeline module to control how Flink CDC handles schema changes:

pipeline:
  schema.change.behavior: EVOLVE

LENIENT (default)

Converts unsupported schema changes into sink-compatible operations:

  • `rename.column`: Sends an alter.column.type event to change the original column to nullable, then an add.column event to add a new nullable column with the new name. The original column is kept.

  • `drop.column`: Sends an alter.column.type event and sets the column type to nullable instead of dropping it.

  • New columns: The system still sends the add-column event, but the field type becomes nullable.

  • `drop.table` and `truncate.table`: Not sent to the sink.

Use LENIENT when you want the job to sync schema changes as automatically as possible with maximum compatibility.

IGNORE

All schema evolution is ignored. Upstream schema changes are not applied to the downstream sink table, and data continues to flow from existing columns.

Use IGNORE when your sink does not support schema changes, or when you want to keep receiving data from existing columns without modifying the sink schema.

EVOLVE

All schema changes are applied to the sink table exactly as they occur in the source. If applying a schema change fails, the job throws an exception and triggers a failure restart.

Important

If the sink cannot process a schema change event, the job may fail and cannot self-recover.

Use EVOLVE when you need strict, exact schema synchronization.

TRY_EVOLVE

Attempts to apply schema changes to the sink table. If the sink cannot process the change, the job does not fail or restart — it continues and attempts to handle affected data by transforming it.

Important

If a schema change fails to apply, subsequent data may lose columns or be truncated to match the existing sink schema.

Use TRY_EVOLVE when you want strict schema synchronization with some fault tolerance.

EXCEPTION

Throws an exception when any schema change event is received. No schema changes are allowed.

Use EXCEPTION when you must guarantee that only data — not schema — is synced.

Control schema changes at the sink

For fine-grained control, use include.schema.changes and exclude.schema.changes in the sink module to filter which schema change event types reach the sink.

Config option Required Data type Default value Note
include.schema.changes No List<String> No default value All changes are supported by default
exclude.schema.changes No List<String> No default value Takes priority over include.schema.changes

Configurable event types

Event type Description
add.column Add a column
alter.column.type Change column type
create.table Create a table
drop.column Drop a column
drop.table Drop a table
rename.column Rename a column
truncate.table Truncate a table

Partial matching is supported. For example, specifying drop matches both drop.column and drop.table. Specifying table matches create.table, truncate.table, and drop.table. Specifying column matches add.column, alter.column.type, rename.column, and drop.column.

Examples

Example 1: Apply all schema changes

Set schema.change.behavior to EVOLVE in the pipeline module:

source:
  type: mysql
  name: MySQL Source
  hostname: ${mysql.hostname}
  port: ${mysql.port}
  username: ${mysql.username}
  password: ${mysql.password}
  tables: ${mysql.source.table}
  server-id: 7601-7604

sink:
  type: values
  name: Values Sink
  print.enabled: true
  sink.print.logger: true

pipeline:
  name: mysql to print job
  schema.change.behavior: EVOLVE

Example 2: Apply only selected schema change types

Allow table creation and all column events, but exclude drop.column:

source:
  type: mysql
  name: MySQL Source
  hostname: ${mysql.hostname}
  port: ${mysql.port}
  username: ${mysql.username}
  password: ${mysql.password}
  tables: ${mysql.source.table}
  server-id: 7601-7604

sink:
  type: values
  name: Values Sink
  print.enabled: true
  sink.print.logger: true
  include.schema.changes: [create.table, column]  # `column` matches add.column, alter.column.type, rename.column, and drop.column
  exclude.schema.changes: [drop.column]            # Excludes drop.column even though it is matched by `column`

pipeline:
  name: mysql to print job
  schema.change.behavior: EVOLVE