この Topic では、Flink CDC データインジェスチョンジョブのスキーマ進化を構成する方法について説明します。
スキーマ進化の構成
Flink CDC データインジェストジョブは、ソースから結果テーブルへのスキーマ変更の同期をサポートしています。サポートされているイベントには、テーブルの作成、列の追加、列名の変更、列の型の変更、列の削除、テーブルの削除が含まれます。
子孫テーブルは、すべてのスキーマ変更をサポートしない場合があります。子孫テーブルがこれらの変更を処理する方法をカスタマイズするには、schema.change.behavior 構成をパイプラインモジュールに追加します。
pipeline:
schema.change.behavior: EVOLVEフレームワークは、以下にリストされているスキーマ変更タイプのみをサポートしています。サポートされていないスキーマ変更は、ジョブ例外を引き起こし、回復するためにステートレスな再起動を必要とする場合があります。
テーブルの作成 (
CREATE TABLE ...)列の追加 (
ALTER TABLE ... ADD COLUMN ...)列の型の変更 (
ALTER TABLE ... MODIFY COLUMN ...)列の削除 (
ALTER TABLE ... DROP COLUMN ...)列名の変更 (
ALTER TABLE ... RENAME COLUMN ...)テーブルの切り捨て (
TRUNCATE TABLE ...)テーブルの削除 (
DROP TABLE ...)
スキーマ進化の動作
パターン | 説明 |
LENIENT (デフォルト) | Flink CDC は、次のルールに基づいて、サポートされていないスキーマ変更をシンクでサポートされている型に変換します。
データインジェストジョブでスキーマ変更を可能な限り自動的に同期させたい場合は、この動作を使用します。 |
IGNORE | Flink CDC データインジェストジョブは、すべてのスキーマ進化を無視します。これは、アップストリームテーブルのスキーマ変更がダウンストリームの結果テーブルに適用されないことを意味します。 シンクがスキーマ変更をサポートしない場合、またはスキーマ変更を望まない場合で、既存の列からデータを受信し続けたい場合は、この動作を使用します。 |
EVOLVE | Flink CDC データインジェストジョブは、すべてのスキーマ変更を子孫テーブルに適用します。 子孫テーブルへのスキーマ変更の適用に失敗した場合、ジョブは例外をスローし、障害再起動をトリガーします。 データインジェストジョブで厳密で正確なスキーマ同期が必要な場合は、この動作を使用します。 重要 シンクがすべてのスキーマ変更イベントを処理できない場合、ジョブはフェイルオーバーし、自己回復できない可能性があります。 |
TRY_EVOLVE | Flink CDC データインジェストジョブは、スキーマ変更を子孫テーブルに適用しようと試みます。子孫テーブルがスキーマ変更を処理できない場合でも、ジョブは失敗したり再起動したりしません。代わりに、後続のデータを変換することで問題を処理しようと試みます。 ある程度のフォールトトレランスを伴う厳密なスキーマ同期が必要な場合は、この動作を使用します。 重要 スキーマ変更の適用に失敗した場合、後続のデータは一部の列を失うか、結果テーブルのスキーマに合わせて切り捨てられる可能性があります。 |
EXCEPTION | Flink CDC データインジェストジョブは、いかなるスキーマ変更も許可せず、スキーマ変更イベントを受信すると例外をスローします。 スキーマ変更が発生せず、データのみが同期されることを確実にしたい場合は、この動作を使用します。 |
シンクでのスキーマ変更のコントロール
データ同期シナリオでは、Flink CDC は詳細なスキーマ進化管理を提供します。シンクに到達するスキーマ変更タイプを制御するためのルールを構成できます。これにより、予期しない変更によるデータ損失やサービス中断を防ぎます。
include.schema.changes および exclude.schema.changes オプションを sink モジュールで設定することで、これを制御できます。
構成オプション | 説明 | 必須 | データの型 | デフォルト値 | 注意 |
include.schema.changes | アプリケーションスキーマ変更をサポートします。 | いいえ | List<String> | デフォルト値なし | すべての変更はデフォルトでサポートされます。 |
exclude.schema.changes | アプリケーションのスキーマを変更することはできません。 | いいえ | List<String> | デフォルト値なし | このオプションは、 |
以下は、構成可能なスキーマ変更イベントタイプの完全なリストです。
イベントタイプ | 説明 |
| 列を追加します。 |
| 列の型を変更します。 |
| テーブルを作成します。 |
| 列を削除します。 |
| テーブルを削除します。 |
| 列名を変更します。 |
| データを削除します。 |
これらのイベントタイプは部分一致をサポートしています。たとえば、drop を指定すると、drop.column と drop.table の両方に一致します。table を指定すると、create.table、truncate.table、および drop.table に一致します。
例
例 1: schema.change.behavior を EVOLVE に設定して、ソースからシンクへのすべてのスキーマ変更を適用します。
source:
type: mysql
name: MySQL Source
hostname: ${mysql.hostname}
port: ${mysql.port}
username: ${mysql.username}
password: ${mysql.password}
tables: ${mysql.source.table}
server-id: 7601-7604
sink:
type: values
name: Values Sink
print.enabled: true
sink.print.logger: true
pipeline:
name: mysql to print job
schema.change.behavior: EVOLVE例 2: drop.column イベントを除外しながら、テーブル作成と列関連イベントのみをシンクに適用します。
source:
type: mysql
name: MySQL Source
hostname: ${mysql.hostname}
port: ${mysql.port}
username: ${mysql.username}
password: ${mysql.password}
tables: ${mysql.source.table}
server-id: 7601-7604
sink:
type: values
name: Values Sink
print.enabled: true
sink.print.logger: true
include.schema.changes: [create.table, column] # `column` ワイルドカードは `add.column`、`alter.column.type`、`rename.column`、および `drop.column` に一致します。
exclude.schema.changes: [drop.column] # `drop.column` イベントを除外します。
pipeline:
name: mysql to print job
schema.change.behavior: EVOLVE