Flink CDC のデータインジェストジョブは、最大で 5 つのモジュールから構成される YAML ファイルとして定義されます。ソース(Source)モジュールとシンク(Sink)モジュールは必須であり、変換(Transform)、ルーティング(Route)、パイプライン(Pipeline)モジュールは任意です。
ジョブ構造の概要
完全なジョブファイルは、以下の構造に従います:
source: # 必須:データソースを定義
...
sink: # 必須:送信先を定義
...
transform: # 任意:書き込み前のデータのフィルタリングおよび再構成
- ...
route: # 任意:ソーステーブルを異なるシンクテーブルにマップ
- ...
pipeline: # 任意:ジョブ全体に適用されるグローバルパラメーターを設定
...
最小構成例
以下の例では、必須モジュールのみを使用して、MySQL から Paimon へ一致するすべてのテーブルを同期します:
# ソースモジュール
source:
type: mysql
name: MySQL Source
host: localhost
port: 3306
username: admin
password: <yourPassword>
tables: adb.*
# シンクモジュール
sink:
type: paimon
name: Paimon Sink
catalog.properties.metastore: filesystem
catalog.properties.warehouse: /path/warehouse
# パイプラインモジュール
pipeline:
name: source-database-sync-pipe
schema.change.behavior: evolve
完全構成例
以下の例では、Transform モジュールおよび Route モジュールを追加し、フィールドのフィルタリング、式の適用、テーブルの再マッピングを行います:
# ソースモジュール
source:
type: mysql
name: MySQL Source
host: localhost
port: 3306
username: admin
password: <yourPassword>
tables: adb.*, bdb.user_table_[0-9]+, [app|web]_order_.*, mydb.\\.*
# シンクモジュール
sink:
type: paimon
name: Paimon Sink
catalog.properties.metastore: filesystem
catalog.properties.warehouse: /path/warehouse
# 変換モジュール
transform:
- source-table: mydb.app_order_.*
projection: id, order_id, TO_UPPER(product_name)
filter: id > 10 AND order_id > 100
primary-keys: id
partition-keys: product_name
table-options: comment=app order
description: project fields from source table
converter-after-transform: SOFT_DELETE
- source-table: mydb.web_order_.*
projection: CONCAT(id, order_id) as uniq_id, *
filter: uniq_id > 10
description: add new uniq_id for each row
# ルーティングモジュール
route:
- source-table: mydb.default.app_order_.*
sink-table: odsdb.default.app_order
description: sync all table shards to one
- source-table: mydb.default.web_order
sink-table: odsdb.default.ods_web_order
description: sync table with given prefix ods_
# パイプラインモジュール
pipeline:
name: source-database-sync-pipe
schema.change.behavior: evolve
ソースモジュール
ソースモジュールは、Flink CDC がデータを読み取る元となるデータソースを定義します。対応しているコネクタは以下のとおりです:
source:
type: mysql # コネクタタイプ
name: mysql source
# コネクタ固有のパラメーターを以下に記述
xxx: ...
サポートされているすべてのパラメーターについては、対応するコネクタのドキュメントをご参照ください。
シンクモジュール
シンクモジュールは、Flink CDC がデータを書き込む先を定義します。対応しているコネクタは以下のとおりです:
sink:
type: hologres # コネクタタイプ
name: hologres sink
# コネクタ固有のパラメーターを以下に記述
xxx: ...
サポートされているすべてのパラメーターについては、対応するコネクタのドキュメントをご参照ください。
変換モジュール
変換モジュールは、データがシンクに到達する前に、ソーステーブルのデータに対して投影(projection)、計算、フィルタリングのルールを適用します。各ソーステーブルのパターンごとに 1 つのルールを定義します。
transform:
- source-table: db.tbl1
projection: ...
filter: ...
- source-table: db.tbl2
projection: ...
filter: ...
| パラメーター | 説明 | 必須 |
|---|---|---|
source-table |
ソーステーブル識別子。正規表現をサポート | 必須 |
projection |
列選択および式。SQL の SELECT 句と同様 |
任意 |
filter |
行フィルター条件。SQL の WHERE 句と同様 |
任意 |
primary-keys |
結果テーブルのプライマリキー(カンマ区切り) | 任意 |
partition-keys |
結果テーブルのパーティションキー(カンマ区切り) | 任意 |
table-options |
結果テーブルが自動作成される際に適用されるテーブル作成オプション | 任意 |
converter-after-transform |
変換ルール実行後にイベントに適用されるコンバーター | 任意 |
description |
ルールの説明 | 任意 |
式の構文の詳細については、「Flink CDC 変換モジュール」をご参照ください。
ルーティングモジュール
ルーティングモジュールは、ソーステーブルを結果テーブルにマップします。シャード化されたテーブルのマージ、テーブル名へのプレフィックス付与、またはテーブルの異なる宛先への転送などに使用します。
route:
- source-table: db.tbl1
sink-table: sinkdb.tbl1
- source-table: db.tbl2
sink-table: sinkdb.tbl2
| パラメーター | 説明 | 必須 |
|---|---|---|
source-table |
ソーステーブル識別子。正規表現をサポート | 必須 |
sink-table |
対象の結果テーブル識別子 | 必須 |
description |
ルーティングルールの説明 | 任意 |
式の構文の詳細については、「Flink CDC ルーティングモジュール」をご参照ください。
パイプラインモジュール
パイプラインモジュールは、ジョブ全体に適用されるグローバルパラメーターを設定します。
pipeline:
name: CDC YAML job
schema.change.behavior: LENIENT
| パラメーター | 説明 | 必須 |
|---|---|---|
name |
Flink ダッシュボードに表示されるジョブ名 | 任意 |
schema.change.behavior |
ジョブが上流のスキーマ変更をどのように処理するか。許容値: evolve、LENIENT など |
任意 |
すべての設定可能なパラメーターについては、「Flink CDC パイプラインモジュール」をご参照ください。