この Topic では、Flink CDC データインジェスチョンジョブのスキーマ進化を構成する方法について説明します。
はじめに
データインジェスチョンは、ソースからシンクへのスキーマ変更の同期をサポートします。サポートされているイベントには、テーブルの作成 (create.table)、列の追加 (add.column)、列名の変更 (rename.column)、列の型の変更 (alter.column.type)、列 (drop.column) またはテーブル (drop.table) の削除が含まれます。
シンクでサポートされていないスキーマ変更については、pipeline モジュールに schema.change.behavior オプションを追加して、イベント処理をカスタマイズします。
pipeline:
schema.change.behavior: EVOLVEFlink CDC は、スキーマ変更イベントのみをサポートしています。リストにないイベントはジョブの例外を引き起こす可能性があり、回復するにはステートレス再起動が必要です。
テーブルの作成 (
CREATE TABLE ...)列の追加 (
ALTER TABLE ... ADD COLUMN ...)列のデータ型の変更 (
ALTER TABLE ... MODIFY COLUMN ...)列の削除 (
ALTER TABLE ... DROP COLUMN ...)列名の変更 (
ALTER TABLE ... RENAME COLUMN ...)テーブルの切り捨て (
TRUNCATE TABLE ...)テーブルの削除 (
DROP TABLE ...)
スキーマ進化の動作
動作 | 説明 |
LENIENT (デフォルト) | Flink CDC は、次のルールに基づいて、サポートされていないスキーマ変更をシンクでサポートされている型に変換します。
|
EXCEPTION | すべてのスキーマ変更イベントを禁止します。 シンクがスキーマ進化をサポートしていない場合にこの動作を構成します。ジョブがスキーマ進化イベントを受信すると、例外がスローされます。 |
EVOLVE | Flink CDC は、すべてのスキーマ進化イベントをシンクに直接適用しようとします。 イベントの適用に失敗した場合、ジョブは例外をスローして再起動します。 重要 シンクシステムがすべてのスキーマ変更イベントを処理できない場合、ジョブは回復不可能なエラーに直面する可能性があります。 |
TRY_EVOLVE | Flink CDC は、すべてのスキーマ進化イベントをシンクに直接適用しようとします。シンクが特定のイベントをサポートしていない場合、ジョブは失敗したり再起動したりしません。代わりに、Flink CDC はイベントを既存のシンクスキーマが対応できるフォーマットに変換しようとします。 重要 このようなデータキャスティングと変換は、データの損失や切り捨てにつながる可能性があります。 |
IGNORE | すべてのスキーマ進化イベントを無視します。 シンクがスキーマ変更に対応しておらず、既存の列からデータの受信を継続したい場合に、この動作を構成します。 |
シンクでのスキーマ変更のコントロール
Flink CDC は、スキーマ進化をきめ細かくコントロールできます。ルールを構成して、どの変更タイプをシンクに伝播させるかを定義し、予期しない変更によるデータの損失やサービスのダウンタイムを防ぐことができます。
これをコントロールするには、sink モジュールで include.schema.changes および exclude.schema.changes オプションを設定します。
構成オプション | 説明 | 必須 | データの型 | デフォルト値 | 注意 |
| 適用するスキーマ進化イベントタイプのリスト。 | いいえ | List<String> | デフォルト値なし | 指定しない場合、このオプションにはデフォルトですべてのサポートされているイベントタイプが含まれます。 |
| 除外するスキーマ進化イベントタイプのリスト。 | いいえ | List<String> | デフォルト値なし | このオプションは |
以下は、構成可能なスキーマ変更イベントタイプの完全なリストです。
イベントタイプ | 説明 |
| 列を追加します。 |
| 列の型を変更します。 |
| テーブルを作成します。 |
| 列を削除します。 |
| テーブルを削除します。 |
| 列名を変更します。 |
| データを削除します。 |
これらのイベントタイプは部分一致をサポートしています。たとえば、drop を指定することは、drop.column と drop.table の両方を指定することと同じです。同様に、table を指定することは、create.table、truncate.table、および drop.table を指定することと同じです。
例
例 1:
schema.change.behaviorをEVOLVEに設定して、ソースからシンクにすべてのスキーマ変更を適用します。
source:
type: mysql
name: MySQL Source
hostname: ${mysql.hostname}
port: ${mysql.port}
username: ${mysql.username}
password: ${mysql.password}
tables: ${mysql.source.table}
server-id: 7601-7604
sink:
type: values
name: Values Sink
print.enabled: true
sink.print.logger: true
pipeline:
name: mysql to print job
schema.change.behavior: EVOLVE例 2:
drop.columnイベントを除外し、テーブル作成と列関連のイベントのみをシンクに適用します。
source:
type: mysql
name: MySQL Source
hostname: ${mysql.hostname}
port: ${mysql.port}
username: ${mysql.username}
password: ${mysql.password}
tables: ${mysql.source.table}
server-id: 7601-7604
sink:
type: values
name: Values Sink
print.enabled: true
sink.print.logger: true
include.schema.changes: [create.table, column] # `column` ワイルドカードは `add.column`、`alter.column.type`、`rename.column`、および `drop.column` に一致します。
exclude.schema.changes: [drop.column] # `drop.column` イベントを除外します。
pipeline:
name: mysql to print job
schema.change.behavior: EVOLVE