This topic describes how to configure schema evolution for Flink CDC data ingestion jobs.
Introduction
Data Ingestion supports syncing schema changes from the source to the sink. Supported events include creating tables (create.table), adding columns (add.column), renaming columns (rename.column), altering column types (alter.column.type), and dropping columns (drop.column) or tables (drop.table).
For schema changes not supported by the sink, add the schema.change.behavior option to the pipeline module to customize event handling.
pipeline:
schema.change.behavior: EVOLVESchema evolution behavior
Behavior | Description |
LENIENT (default) | Flink CDC converts unsupported schema changes into sink-supported types based on the following rules:
|
EXCEPTION | Forbids all schema change events. Configure this behavior when the sink does not support schema evolution. If the job receives a schema evolution event, it throws an exception. |
EVOLVE | Flink CDC attempts to apply all schema evolution events directly to the sink. If an event fails to apply, the job will throw an exception and restart. Important If the sink system cannot process all schema change events, the job may face unrecoverable errors. |
TRY_EVOLVE | Flink CDC attempts to apply all schema evolution events directly to the sink. If the sink does not support a specific event, the job will not fail or restart. Instead, Flink CDC will attempt to convert the event to a format that the existing sink schema can accommodate. Important Such data casting and conversion may result in data loss or truncation. |
IGNORE | Ignores all schema evolution events. Configure this behavior when your sink is not ready for schema changes, and you want to continue receiving data from the existing columns. |
Control schema changes at the sink
Flink CDC offers granular control over schema evolution. You can configure rules to define which change types are propagated to the sink, preventing data loss or service downtime from unexpected changes.
Control this by setting the include.schema.changes and exclude.schema.changes options in the sink module.
Config option | Description | Required | Data type | Default value | Note |
| A list of schema evolution event types to apply. | No | List<String> | No default value | If not specified, this option includes all supported event types by default. |
| A list of schema evolution event types to exclude. | No | List<String> | No default value | This option has a higher priority than |
The following is a complete list of configurable schema change event types:
Event type | Description |
| Add a column. |
| Change column type. |
| Create a table. |
| Delete a column. |
| Delete a table. |
| Modify column name. |
| Delete data. |
These event types support partial matching. For example, specifying drop is equivalent to specifying both drop.column and drop.table. Similarly, specifying table is equivalent to specifying create.table, truncate.table, and drop.table.
Examples
Example 1: Apply all schema changes from the source to the sink by setting
schema.change.behaviortoEVOLVE:
source:
type: mysql
name: MySQL Source
hostname: ${mysql.hostname}
port: ${mysql.port}
username: ${mysql.username}
password: ${mysql.password}
tables: ${mysql.source.table}
server-id: 7601-7604
sink:
type: values
name: Values Sink
print.enabled: true
sink.print.logger: true
pipeline:
name: mysql to print job
schema.change.behavior: EVOLVEExample 2: Apply only table creation and column-related events to the sink while excluding
drop.columnevents:
source:
type: mysql
name: MySQL Source
hostname: ${mysql.hostname}
port: ${mysql.port}
username: ${mysql.username}
password: ${mysql.password}
tables: ${mysql.source.table}
server-id: 7601-7604
sink:
type: values
name: Values Sink
print.enabled: true
sink.print.logger: true
include.schema.changes: [create.table, column] # The `column` wildcard matches `add.column`, `alter.column.type`, `rename.column`, and `drop.column`.
exclude.schema.changes: [drop.column] # Excludes the `drop.column` event.
pipeline:
name: mysql to print job
schema.change.behavior: EVOLVE