すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:Flink CDC ジョブ構造

最終更新日:Mar 27, 2026

Flink CDC のデータインジェストジョブは、最大で 5 つのモジュールから構成される YAML ファイルとして定義されます。ソース(Source)モジュールとシンク(Sink)モジュールは必須であり、変換(Transform)、ルーティング(Route)、パイプライン(Pipeline)モジュールは任意です。

ジョブ構造の概要

完全なジョブファイルは、以下の構造に従います:

source:         # 必須:データソースを定義
  ...

sink:           # 必須:送信先を定義
  ...

transform:      # 任意:書き込み前のデータのフィルタリングおよび再構成
  - ...

route:          # 任意:ソーステーブルを異なるシンクテーブルにマップ
  - ...

pipeline:       # 任意:ジョブ全体に適用されるグローバルパラメーターを設定
  ...

最小構成例

以下の例では、必須モジュールのみを使用して、MySQL から Paimon へ一致するすべてのテーブルを同期します:

# ソースモジュール
source:
  type: mysql
  name: MySQL Source
  host: localhost
  port: 3306
  username: admin
  password: <yourPassword>
  tables: adb.*

# シンクモジュール
sink:
  type: paimon
  name: Paimon Sink
  catalog.properties.metastore: filesystem
  catalog.properties.warehouse: /path/warehouse

# パイプラインモジュール
pipeline:
  name: source-database-sync-pipe
  schema.change.behavior: evolve

完全構成例

以下の例では、Transform モジュールおよび Route モジュールを追加し、フィールドのフィルタリング、式の適用、テーブルの再マッピングを行います:

# ソースモジュール
source:
  type: mysql
  name: MySQL Source
  host: localhost
  port: 3306
  username: admin
  password: <yourPassword>
  tables: adb.*, bdb.user_table_[0-9]+, [app|web]_order_.*, mydb.\\.*

# シンクモジュール
sink:
  type: paimon
  name: Paimon Sink
  catalog.properties.metastore: filesystem
  catalog.properties.warehouse: /path/warehouse

# 変換モジュール
transform:
  - source-table: mydb.app_order_.*
    projection: id, order_id, TO_UPPER(product_name)
    filter: id > 10 AND order_id > 100
    primary-keys: id
    partition-keys: product_name
    table-options: comment=app order
    description: project fields from source table
    converter-after-transform: SOFT_DELETE
  - source-table: mydb.web_order_.*
    projection: CONCAT(id, order_id) as uniq_id, *
    filter: uniq_id > 10
    description: add new uniq_id for each row

# ルーティングモジュール
route:
  - source-table: mydb.default.app_order_.*
    sink-table: odsdb.default.app_order
    description: sync all table shards to one
  - source-table: mydb.default.web_order
    sink-table: odsdb.default.ods_web_order
    description: sync table with given prefix ods_

# パイプラインモジュール
pipeline:
  name: source-database-sync-pipe
  schema.change.behavior: evolve

ソースモジュール

ソースモジュールは、Flink CDC がデータを読み取る元となるデータソースを定義します。対応しているコネクタは以下のとおりです:

source:
  type: mysql      # コネクタタイプ
  name: mysql source
  # コネクタ固有のパラメーターを以下に記述
  xxx: ...

サポートされているすべてのパラメーターについては、対応するコネクタのドキュメントをご参照ください。

シンクモジュール

シンクモジュールは、Flink CDC がデータを書き込む先を定義します。対応しているコネクタは以下のとおりです:

sink:
  type: hologres   # コネクタタイプ
  name: hologres sink
  # コネクタ固有のパラメーターを以下に記述
  xxx: ...

サポートされているすべてのパラメーターについては、対応するコネクタのドキュメントをご参照ください。

変換モジュール

変換モジュールは、データがシンクに到達する前に、ソーステーブルのデータに対して投影(projection)、計算、フィルタリングのルールを適用します。各ソーステーブルのパターンごとに 1 つのルールを定義します。

transform:
  - source-table: db.tbl1
    projection: ...
    filter: ...
  - source-table: db.tbl2
    projection: ...
    filter: ...
パラメーター 説明 必須
source-table ソーステーブル識別子。正規表現をサポート 必須
projection 列選択および式。SQL の SELECT 句と同様 任意
filter 行フィルター条件。SQL の WHERE 句と同様 任意
primary-keys 結果テーブルのプライマリキー(カンマ区切り) 任意
partition-keys 結果テーブルのパーティションキー(カンマ区切り) 任意
table-options 結果テーブルが自動作成される際に適用されるテーブル作成オプション 任意
converter-after-transform 変換ルール実行後にイベントに適用されるコンバーター 任意
description ルールの説明 任意

式の構文の詳細については、「Flink CDC 変換モジュール」をご参照ください。

ルーティングモジュール

ルーティングモジュールは、ソーステーブルを結果テーブルにマップします。シャード化されたテーブルのマージ、テーブル名へのプレフィックス付与、またはテーブルの異なる宛先への転送などに使用します。

route:
  - source-table: db.tbl1
    sink-table: sinkdb.tbl1
  - source-table: db.tbl2
    sink-table: sinkdb.tbl2
パラメーター 説明 必須
source-table ソーステーブル識別子。正規表現をサポート 必須
sink-table 対象の結果テーブル識別子 必須
description ルーティングルールの説明 任意

式の構文の詳細については、「Flink CDC ルーティングモジュール」をご参照ください。

パイプラインモジュール

パイプラインモジュールは、ジョブ全体に適用されるグローバルパラメーターを設定します。

pipeline:
  name: CDC YAML job
  schema.change.behavior: LENIENT
パラメーター 説明 必須
name Flink ダッシュボードに表示されるジョブ名 任意
schema.change.behavior ジョブが上流のスキーマ変更をどのように処理するか。許容値: evolveLENIENT など 任意

すべての設定可能なパラメーターについては、「Flink CDC パイプラインモジュール」をご参照ください。

次のステップ