All Products
Search
Document Center

Realtime Compute for Apache Flink:Schema evolution configurations

Last Updated:Oct 17, 2025

This topic describes how to configure schema evolution for Flink CDC data ingestion jobs.

Introduction

Data Ingestion supports syncing schema changes from the source to the sink. Supported events include creating tables (create.table), adding columns (add.column), renaming columns (rename.column), altering column types (alter.column.type), and dropping columns (drop.column) or tables (drop.table).

For schema changes not supported by the sink, add the schema.change.behavior option to the pipeline module to customize event handling.

pipeline:
  schema.change.behavior: EVOLVE

Schema evolution behavior

Behavior

Description

LENIENT (default)

Flink CDC converts unsupported schema changes into sink-supported types based on the following rules:

  • drop.table and truncate.table events: These events are not sent downstream.

  • rename.column events: Flink CDC transforms these into an alter.column.type event and an add.column event. The original column's data type is changed to nullable, and a new, nullable column with the new name is added.

  • drop.column events: Flink CDC sends an alter.column.type event, setting the column's data type to nullable.

  • add.column events: An add.column event is still sent, but the new column's data type is set to nullable.

EXCEPTION

Forbids all schema change events.

Configure this behavior when the sink does not support schema evolution. If the job receives a schema evolution event, it throws an exception.

EVOLVE

Flink CDC attempts to apply all schema evolution events directly to the sink.

If an event fails to apply, the job will throw an exception and restart.

Important

If the sink system cannot process all schema change events, the job may face unrecoverable errors.

TRY_EVOLVE

Flink CDC attempts to apply all schema evolution events directly to the sink. If the sink does not support a specific event, the job will not fail or restart. Instead, Flink CDC will attempt to convert the event to a format that the existing sink schema can accommodate.

Important

Such data casting and conversion may result in data loss or truncation.

IGNORE

Ignores all schema evolution events.

Configure this behavior when your sink is not ready for schema changes, and you want to continue receiving data from the existing columns.

Control schema changes at the sink

Flink CDC offers granular control over schema evolution. You can configure rules to define which change types are propagated to the sink, preventing data loss or service downtime from unexpected changes.

Control this by setting the include.schema.changes and exclude.schema.changes options in the sink module.

Config option

Description

Required

Data type

Default value

Note

include.schema.changes

A list of schema evolution event types to apply.

No

List<String>

No default value

If not specified, this option includes all supported event types by default.

exclude.schema.changes

A list of schema evolution event types to exclude.

No

List<String>

No default value

This option has a higher priority than include.schema.changes.

The following is a complete list of configurable schema change event types:

Event type

Description

add.column

Add a column.

alter.column.type

Change column type.

create.table

Create a table.

drop.column

Delete a column.

drop.table

Delete a table.

rename.column

Modify column name.

truncate.table

Delete data.

Note

These event types support partial matching. For example, specifying drop is equivalent to specifying both drop.column and drop.table. Similarly, specifying table is equivalent to specifying create.table, truncate.table, and drop.table.

Examples

  • Example 1: Apply all schema changes from the source to the sink by setting schema.change.behavior to EVOLVE:

source:
  type: mysql
  name: MySQL Source
  hostname: ${mysql.hostname}
  port: ${mysql.port}
  username: ${mysql.username}
  password: ${mysql.password}
  tables: ${mysql.source.table}
  server-id: 7601-7604

sink:
  type: values
  name: Values Sink
  print.enabled: true
  sink.print.logger: true
  
pipeline:
  name: mysql to print job
  schema.change.behavior: EVOLVE
  • Example 2: Apply only table creation and column-related events to the sink while excluding drop.column events:

source:
  type: mysql
  name: MySQL Source
  hostname: ${mysql.hostname}
  port: ${mysql.port}
  username: ${mysql.username}
  password: ${mysql.password}
  tables: ${mysql.source.table}
  server-id: 7601-7604

sink:
  type: values
  name: Values Sink
  print.enabled: true
  sink.print.logger: true
  include.schema.changes: [create.table, column] # The `column` wildcard matches `add.column`, `alter.column.type`, `rename.column`, and `drop.column`.
  exclude.schema.changes: [drop.column] # Excludes the `drop.column` event.
  
pipeline:
  name: mysql to print job
  schema.change.behavior: EVOLVE