すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:スキーマ進化の構成

最終更新日:Dec 11, 2025

この Topic では、Flink CDC データインジェスチョンジョブのスキーマ進化を構成する方法について説明します。

はじめに

データインジェスチョンは、ソースからシンクへのスキーマ変更の同期をサポートします。サポートされているイベントには、テーブルの作成 (create.table)、列の追加 (add.column)、列名の変更 (rename.column)、列の型の変更 (alter.column.type)、列 (drop.column) またはテーブル (drop.table) の削除が含まれます。

シンクでサポートされていないスキーマ変更については、pipeline モジュールに schema.change.behavior オプションを追加して、イベント処理をカスタマイズします。

pipeline:
  schema.change.behavior: EVOLVE

Flink CDC は、スキーマ変更イベントのみをサポートしています。リストにないイベントはジョブの例外を引き起こす可能性があり、回復するにはステートレス再起動が必要です。

  • テーブルの作成 (CREATE TABLE ...)

  • 列の追加 (ALTER TABLE ... ADD COLUMN ...)

  • 列のデータ型の変更 (ALTER TABLE ... MODIFY COLUMN ...)

  • 列の削除 (ALTER TABLE ... DROP COLUMN ...)

  • 列名の変更 (ALTER TABLE ... RENAME COLUMN ...)

  • テーブルの切り捨て (TRUNCATE TABLE ...)

  • テーブルの削除 (DROP TABLE ...)

スキーマ進化の動作

動作

説明

LENIENT (デフォルト)

Flink CDC は、次のルールに基づいて、サポートされていないスキーマ変更をシンクでサポートされている型に変換します。

  • drop.table および truncate.table イベント: これらのイベントはダウンストリームに送信されません。

  • rename.column イベント: Flink CDC はこれらを alter.column.type イベントと add.column イベントに変換します。元の列のデータ型は nullable に変更され、新しい名前の新しい nullable 列が追加されます。

  • drop.column イベント: Flink CDC は alter.column.type イベントを送信し、列のデータ型を nullable に設定します。

  • add.column イベント: add.column イベントは引き続き送信されますが、新しい列のデータ型は nullable に設定されます。

EXCEPTION

すべてのスキーマ変更イベントを禁止します。

シンクがスキーマ進化をサポートしていない場合にこの動作を構成します。ジョブがスキーマ進化イベントを受信すると、例外がスローされます。

EVOLVE

Flink CDC は、すべてのスキーマ進化イベントをシンクに直接適用しようとします。

イベントの適用に失敗した場合、ジョブは例外をスローして再起動します。

重要

シンクシステムがすべてのスキーマ変更イベントを処理できない場合、ジョブは回復不可能なエラーに直面する可能性があります。

TRY_EVOLVE

Flink CDC は、すべてのスキーマ進化イベントをシンクに直接適用しようとします。シンクが特定のイベントをサポートしていない場合、ジョブは失敗したり再起動したりしません。代わりに、Flink CDC はイベントを既存のシンクスキーマが対応できるフォーマットに変換しようとします。

重要

このようなデータキャスティングと変換は、データの損失や切り捨てにつながる可能性があります。

IGNORE

すべてのスキーマ進化イベントを無視します。

シンクがスキーマ変更に対応しておらず、既存の列からデータの受信を継続したい場合に、この動作を構成します。

シンクでのスキーマ変更のコントロール

Flink CDC は、スキーマ進化をきめ細かくコントロールできます。ルールを構成して、どの変更タイプをシンクに伝播させるかを定義し、予期しない変更によるデータの損失やサービスのダウンタイムを防ぐことができます。

これをコントロールするには、sink モジュールで include.schema.changes および exclude.schema.changes オプションを設定します。

構成オプション

説明

必須

データの型

デフォルト値

注意

include.schema.changes

適用するスキーマ進化イベントタイプのリスト。

いいえ

List<String>

デフォルト値なし

指定しない場合、このオプションにはデフォルトですべてのサポートされているイベントタイプが含まれます。

exclude.schema.changes

除外するスキーマ進化イベントタイプのリスト。

いいえ

List<String>

デフォルト値なし

このオプションは include.schema.changes よりも優先度が高いです。

以下は、構成可能なスキーマ変更イベントタイプの完全なリストです。

イベントタイプ

説明

add.column

列を追加します。

alter.column.type

列の型を変更します。

create.table

テーブルを作成します。

drop.column

列を削除します。

drop.table

テーブルを削除します。

rename.column

列名を変更します。

truncate.table

データを削除します。

説明

これらのイベントタイプは部分一致をサポートしています。たとえば、drop を指定することは、drop.columndrop.table の両方を指定することと同じです。同様に、table を指定することは、create.tabletruncate.table、および drop.table を指定することと同じです。

  • 例 1: schema.change.behaviorEVOLVE に設定して、ソースからシンクにすべてのスキーマ変更を適用します。

source:
  type: mysql
  name: MySQL Source
  hostname: ${mysql.hostname}
  port: ${mysql.port}
  username: ${mysql.username}
  password: ${mysql.password}
  tables: ${mysql.source.table}
  server-id: 7601-7604

sink:
  type: values
  name: Values Sink
  print.enabled: true
  sink.print.logger: true
  
pipeline:
  name: mysql to print job
  schema.change.behavior: EVOLVE
  • 例 2: drop.column イベントを除外し、テーブル作成と列関連のイベントのみをシンクに適用します。

source:
  type: mysql
  name: MySQL Source
  hostname: ${mysql.hostname}
  port: ${mysql.port}
  username: ${mysql.username}
  password: ${mysql.password}
  tables: ${mysql.source.table}
  server-id: 7601-7604

sink:
  type: values
  name: Values Sink
  print.enabled: true
  sink.print.logger: true
  include.schema.changes: [create.table, column] # `column` ワイルドカードは `add.column`、`alter.column.type`、`rename.column`、および `drop.column` に一致します。
  exclude.schema.changes: [drop.column] # `drop.column` イベントを除外します。
  
pipeline:
  name: mysql to print job
  schema.change.behavior: EVOLVE