すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:スキーマ進化の構成

最終更新日:Feb 06, 2026

この Topic では、Flink CDC データインジェスチョンジョブのスキーマ進化を構成する方法について説明します。

スキーマ進化の構成

Flink CDC データインジェストジョブは、ソースから結果テーブルへのスキーマ変更の同期をサポートしています。サポートされているイベントには、テーブルの作成、列の追加、列名の変更、列の型の変更、列の削除、テーブルの削除が含まれます。

子孫テーブルは、すべてのスキーマ変更をサポートしない場合があります。子孫テーブルがこれらの変更を処理する方法をカスタマイズするには、schema.change.behavior 構成をパイプラインモジュールに追加します。

pipeline:
  schema.change.behavior: EVOLVE

フレームワークは、以下にリストされているスキーマ変更タイプのみをサポートしています。サポートされていないスキーマ変更は、ジョブ例外を引き起こし、回復するためにステートレスな再起動を必要とする場合があります。

  • テーブルの作成 (CREATE TABLE ...)

  • 列の追加 (ALTER TABLE ... ADD COLUMN ...)

  • 列の型の変更 (ALTER TABLE ... MODIFY COLUMN ...)

  • 列の削除 (ALTER TABLE ... DROP COLUMN ...)

  • 列名の変更 (ALTER TABLE ... RENAME COLUMN ...)

  • テーブルの切り捨て (TRUNCATE TABLE ...)

  • テーブルの削除 (DROP TABLE ...)

スキーマ進化の動作

パターン

説明

LENIENT (デフォルト)

Flink CDC は、次のルールに基づいて、サポートされていないスキーマ変更をシンクでサポートされている型に変換します。

  • テーブルの削除またはテーブルの切り捨ての変更は送信しません。

  • rename.column イベントの場合、alter.column.type イベントと add.column イベントを送信します。元の列を保持します。その型をNULL許容に変更します。新しい名前で新しいNULL許容の列を追加します。

  • drop.column イベントの場合、alter.column.type イベントを送信し、列の型をNULL許容に設定します。

  • 新しい列を追加すると、システムは引き続き (column)(event) を送信しますが、フィールド型はNULL許容になります。

データインジェストジョブでスキーマ変更を可能な限り自動的に同期させたい場合は、この動作を使用します。

IGNORE

Flink CDC データインジェストジョブは、すべてのスキーマ進化を無視します。これは、アップストリームテーブルのスキーマ変更がダウンストリームの結果テーブルに適用されないことを意味します。

シンクがスキーマ変更をサポートしない場合、またはスキーマ変更を望まない場合で、既存の列からデータを受信し続けたい場合は、この動作を使用します。

EVOLVE

Flink CDC データインジェストジョブは、すべてのスキーマ変更を子孫テーブルに適用します。

子孫テーブルへのスキーマ変更の適用に失敗した場合、ジョブは例外をスローし、障害再起動をトリガーします。

データインジェストジョブで厳密で正確なスキーマ同期が必要な場合は、この動作を使用します。

重要

シンクがすべてのスキーマ変更イベントを処理できない場合、ジョブはフェイルオーバーし、自己回復できない可能性があります。

TRY_EVOLVE

Flink CDC データインジェストジョブは、スキーマ変更を子孫テーブルに適用しようと試みます。子孫テーブルがスキーマ変更を処理できない場合でも、ジョブは失敗したり再起動したりしません。代わりに、後続のデータを変換することで問題を処理しようと試みます。

ある程度のフォールトトレランスを伴う厳密なスキーマ同期が必要な場合は、この動作を使用します。

重要

スキーマ変更の適用に失敗した場合、後続のデータは一部の列を失うか、結果テーブルのスキーマに合わせて切り捨てられる可能性があります。

EXCEPTION

Flink CDC データインジェストジョブは、いかなるスキーマ変更も許可せず、スキーマ変更イベントを受信すると例外をスローします。

スキーマ変更が発生せず、データのみが同期されることを確実にしたい場合は、この動作を使用します。

シンクでのスキーマ変更のコントロール

データ同期シナリオでは、Flink CDC は詳細なスキーマ進化管理を提供します。シンクに到達するスキーマ変更タイプを制御するためのルールを構成できます。これにより、予期しない変更によるデータ損失やサービス中断を防ぎます。

include.schema.changes および exclude.schema.changes オプションを sink モジュールで設定することで、これを制御できます。

構成オプション

説明

必須

データの型

デフォルト値

注意

include.schema.changes

アプリケーションスキーマ変更をサポートします。

いいえ

List<String>

デフォルト値なし

すべての変更はデフォルトでサポートされます。

exclude.schema.changes

アプリケーションのスキーマを変更することはできません。

いいえ

List<String>

デフォルト値なし

このオプションは、include.schema.changes よりも優先度が高くなります。

以下は、構成可能なスキーマ変更イベントタイプの完全なリストです。

イベントタイプ

説明

add.column

列を追加します。

alter.column.type

列の型を変更します。

create.table

テーブルを作成します。

drop.column

列を削除します。

drop.table

テーブルを削除します。

rename.column

列名を変更します。

truncate.table

データを削除します。

説明

これらのイベントタイプは部分一致をサポートしています。たとえば、drop を指定すると、drop.columndrop.table の両方に一致します。table を指定すると、create.tabletruncate.table、および drop.table に一致します。

  • 例 1: schema.change.behavior を EVOLVE に設定して、ソースからシンクへのすべてのスキーマ変更を適用します。

source:
  type: mysql
  name: MySQL Source
  hostname: ${mysql.hostname}
  port: ${mysql.port}
  username: ${mysql.username}
  password: ${mysql.password}
  tables: ${mysql.source.table}
  server-id: 7601-7604

sink:
  type: values
  name: Values Sink
  print.enabled: true
  sink.print.logger: true
  
pipeline:
  name: mysql to print job
  schema.change.behavior: EVOLVE
  • 例 2: drop.column イベントを除外しながら、テーブル作成と列関連イベントのみをシンクに適用します。

source:
  type: mysql
  name: MySQL Source
  hostname: ${mysql.hostname}
  port: ${mysql.port}
  username: ${mysql.username}
  password: ${mysql.password}
  tables: ${mysql.source.table}
  server-id: 7601-7604

sink:
  type: values
  name: Values Sink
  print.enabled: true
  sink.print.logger: true
  include.schema.changes: [create.table, column] # `column` ワイルドカードは `add.column`、`alter.column.type`、`rename.column`、および `drop.column` に一致します。
  exclude.schema.changes: [drop.column] # `drop.column` イベントを除外します。
  
pipeline:
  name: mysql to print job
  schema.change.behavior: EVOLVE