すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:Flink CDC 変換モジュール

最終更新日:Oct 22, 2025

このトピックでは、Flink CDC データインジェスチョンジョブドラフトの transform モジュールの構文について説明します。

パラメーター

transform モジュールを使用すると、同期中に列を追加、削除、または変更したり、データをフィルタリングしたりできます。次のパラメーターを使用して変換ルールを定義します。

パラメーター

説明

必須

注意

source-table

変換するソーステーブルを指定します。

はい

正規表現がサポートされています。

projection

ソーステーブルのプロジェクションルールを指定します。これにより、変換後の出力列が定義されます。

いいえ

構文は SQL の SELECT 文に似ています。

このパラメーターを空のままにすると、列は追加も削除もされません。

詳細については、「プロジェクションルールを定義する」をご参照ください。利用可能なビルトイン関数については、「Flink CDC ビルトイン関数」をご参照ください。

filter

データ行をフィルタリングするためのルールを指定します。

いいえ

構文は SQL の WHERE 句に似ています。

このパラメーターを空のままにすると、行は除外されません。

primary-keys

結果テーブルのプライマリキー列を指定します。

いいえ

このパラメーターを空のままにすると、プライマリキーの定義はソーステーブルのスキーマから継承されます。プライマリキー列を指定するには、カンマ区切りのリストとして入力します。

重要

このパラメーターを定義する場合は、インジェストされるすべてのデータがプライマリキー制約に準拠していることを確認してください。特にパーティションをまたがる書き込みでデータの順序が乱れる問題を回避するために、ソーステーブルのプライマリキー列をカスタム定義に含めることをお勧めします。

partition-keys

結果テーブルのパーティションキー列を指定します。

いいえ

このパラメーターを空のままにすると、パーティションキーの定義はソーステーブルのスキーマから継承されます。パーティションキー列を指定するには、カンマ区切りのリストとして入力します。

重要

カスタムパーティションキー列を定義する際は、インジェストされるすべてのデータがプライマリキー制約に準拠していることを確認してください。これにより、パーティションをまたがる書き込み中にデータの順序が乱れる問題を回避できます。

table-options

シンクに渡す追加の設定項目を指定します。

いいえ

Paimon シンクのバケット数やコメントなどのオプションのプロパティ。

カンマ (,) を使用して設定項目を区切り、等号 (=) を使用してキーと値を区切ります。

例:

key1=value1,key2=value2

description

変換ルールの説明。

いいえ

N/A。

converter-after-transform

変換後に追加の処理を実行するコンバーター。

いいえ

変換後にコンバーターを使用する」をご参照ください。

使用上の注意

  • transform モジュール内の文を変更した後は、ジョブをステートレスで再起動してください。

  • 通常、projection 文と filter 文を引用符で囲む必要はありません。

    transform:
      - projection: a, b, c
        # 上記の行は次の行と同等です。
      - projection: "a, b, c"

    ただし、プロジェクション式が特殊文字 (例: * または ') で始まる場合に正しく解析されるようにするには、式全体を単一引用符 (') または二重引用符 (") で囲むか、バックスラッシュ (\) で特殊文字をエスケープします。

    transform:
      - projection: *, 42      # 無効
      - projection: '*, 42'    # OK
      - projection: \*, 42     # OK  

プロジェクションルールを定義する

transform モジュールは、SQL に似た構文を使用してプロジェクションルールを定義します。これらのルールを使用して、選択した列を同期したり、計算列を追加したり、メタデータ列を参照したりできます。

列のプルーニングを実行する

ソースからシンクに特定の列を同期するには、projection ルールでそれらを指定します。指定されていない列は除外されます。

transform:
  - source-table: db.tbl
    projection: col_1, col_3, col_4 # col_2 は除外されます
重要

列のプルーニングにより、ソーススキーマの更新がシンクに到達するのを防ぐことができます。列のプルーニングは同期する列の明示的なリストに依存するため、ソーススキーマの変更 (列の追加や削除など) は自動的にダウンストリームに反映されません。

ワイルドカードを使用する

既存のすべての列と新しく追加された列をそのままダウンストリームに送信するには、projection ルールでワイルドカード文字 (*) を使用します。

説明

ワイルドカード (*) の代わりに明示的な列のリストを使用すると、静的なスキーマになります。これは、ソーステーブルに追加された新しい列が自動的にシンクに含まれないことを意味します。

たとえば、*, 'extras' AS extras は、ソーススキーマの末尾に追加の列を追加し、ソースからシンクへのスキーマ変更を継続的に伝播します。

transform:
  - source-table: db.tbl
    projection: \*, 'extras' AS extras

計算列を追加する

プロジェクションルール内で <Expression> AS <ColName> 構文を使用して計算列を追加できます。この式は各データ行に対して評価され、その結果が計算列に設定されます。

説明

計算列の式は、別の計算列を参照できません。この制限は、定義される順序に関係なく適用されます。たとえば、a, b AS c, c AS d は無効です。

たとえば、ソーステーブル db.tbl からの [+I, id = 1][+I, id = 1, inc_id = 2] に変換し、それを結果テーブルにインジェストします。

transform:
  - source-table: db.tbl
    projection: id, id + 1 AS inc_id

メタデータ列を参照する

projection ルールでは、次の事前定義されたメタデータ列を通常のデータ列であるかのように使用できます。

重要

メタデータ列と同じ名前の通常のデータ列を定義しないでください。

メタデータ列名

データ型

説明

__namespace_name__

String

ソーステーブルの名前空間名。

__schema_name__

String

ソーステーブルのスキーマ名。

__table_name__

String

ソーステーブルの名前。

__data_event_type__

String

データ変更イベントの操作タイプ (+I-U+U-D)。

重要

このメタデータ列をプライマリキーとして使用しないでください。Flink CDC の更新イベントは、常に単一の更新の「前」(-U) と「後」(+U) の状態を 1 つのイベントにバンドルします。したがって、__data_event_type__ は同じ更新イベント内に -U+U の両方を含みます。

例: ソーステーブルの完全修飾名を格納する計算列を追加します。

transform:
  - source-table: \.*.\.*
    projection: \*, __namespace_name__ || __schema_name__ || __table_name__ AS identifier

データベースシステム間での名前空間、スキーマ、およびテーブルの同等物:

データベースシステム

名前空間

スキーマ

テーブル

MySQL

データベース

-

テーブル

Kafka

-

-

Topic

SLS

-

プロジェクト

LogStore

MongoDB

-

データベース

Collection

Paimon

-

データベース

テーブル

Hologres

-

スキーマ

テーブル

StarRocks

データベース

-

テーブル

Doris

データベース

-

テーブル

フィルターを追加する

Transform モジュールは、SQL に似た構文を使用してデータフィルタリングルールを定義します。

filter ルールは、BOOLEAN 型に評価される式です。ソーステーブルの任意の列または計算列を参照できます。

フィルター式が FALSE と評価されるデータ変更イベントは、ダウンストリームに送信されません。

説明

プロジェクションルールでソーステーブルの列を計算列で上書きした場合、フィルター式でのその列への参照は、計算列の値を参照します。

たとえば、次の変換ルールは有効です。

transform:
  - source-table: db.tbl
    projection: CAST(id AS VARCHAR) AS id
    filter: CHAR_LENGTH(id) > 5

filter 式で参照される id は、VARCHAR 型にキャストされた計算列です。

変換後にコンバーターを使用する

converter-after-transform パラメーターを使用して、すべての変換ルールが適用された後にデータ変更を処理します。複数のコンバーターをカンマ区切りのリストで指定すると、Flink は定義された順序でそれらを順次適用します。有効な値:

コンバーター名

説明

サポートされているバージョン

SOFT_DELETE

削除を挿入に変換します。

Ververica Runtime (VVR) 8.0.11 以降

FIELD_NAME_LOWER_CASE

テーブルのすべての列名を小文字に変換します。

VVR 11.1 以降

ソフトデリートを実行する

ソフトデリートを実行するには、SOFT_DELETE コンバーターを __data_event_type__ メタデータ列と組み合わせます。このアプローチにより、削除イベント時にデータがシンクから物理的に削除されないことが保証されます。代わりに、Flink は削除を挿入に変換し、op_type 列に -D でマークします。

transform: 
  - source-table: db.tbl
    projection: \*, __data_event_type__ AS op_type
    converter-after-transform: SOFT_DELETE