すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:変換

最終更新日:Feb 27, 2026

このトピックでは、Flink CDC のデータインジェストジョブにおける変換モジュールでサポートされている構文とビルトイン関数について説明します。

変換ルールのパラメーター

変換モジュールを使用すると、同期中に列の追加、削除、変更、およびデータのフィルタリングができます。変換ルールを定義するには、次のパラメーターを使用します。

パラメーター

説明

必須

注意

source-table

変換対象のソーステーブルを指定します。

はい

正規表現がサポートされています。

projection

ソーステーブルの射影ルールを指定します。これにより、変換後のすべての出力列が定義されます。

いいえ

構文は SQL の SELECT 文に似ています。

このパラメーターが空の場合、列の追加や削除は行われません。

詳細については、「射影ルールの定義」をご参照ください。利用可能なビルトイン関数については、「Flink CDC のビルトイン関数」をご参照ください。

filter

データ行をフィルタリングするためのルールを指定します。

いいえ

構文は SQL の WHERE 句に似ています。

このパラメーターが空の場合、除外される行はありません。

primary-keys

結果テーブルのプライマリキー列を指定します。

いいえ

このパラメーターが空の場合、プライマリキー定義はソーステーブルのスキーマから継承されます。プライマリキー列はコンマ (,) 区切りリストとして入力します。

重要

デフォルトでは、上流テーブルからプライマリキー制約を削除することはできません。上流のプライマリキー列を削減するには、pipeline: 設定項目 transform.allow.trimming.pk-columns を追加します。

カスタムのプライマリキー列を定義する場合、インジェストされるすべてのデータがプライマリキー制約に準拠していることを確認してください。パーティションをまたがる書き込み中にデータの不整合を避けるため、カスタム定義にソーステーブルのプライマリキー列を含めてください。

partition-keys

結果テーブルのパーティションキー列を指定します。

いいえ

このパラメーターが空の場合、パーティションキー定義はソーステーブルのスキーマから継承されます。パーティションキー列はコンマ (,) 区切りリストとして入力します。

重要

カスタムのパーティションキー列を定義する場合、インジェストされるすべてのデータがプライマリキー制約に準拠していることを確認してください。これにより、パーティションをまたがる書き込み中のデータの不整合を防ぎます。

table-options

シンクに渡す追加の設定項目を指定します。

いいえ

Paimon シンクのバケット数やコメントなどのオプションのプロパティ。

コンマ (,) を使用して設定項目を区切り、等号 (=) を使用してキーと値を区切ります。

例:

key1=value1,key2=value2

description

変換ルールの説明。

いいえ

N/A。

converter-after-transform

変換後に追加の処理を実行するコンバーターです。

いいえ

変換後のコンバーターの使用」をご参照ください。

注意事項

  • 変換モジュール内の文を変更した後は、ジョブのステートレス再起動を実行してください。

  • 通常、`projection` 文と `filter` 文を引用符で囲む必要はありません。

    transform:
      - projection: a, b, c
        # 上記の行は次の行と同等です。
      - projection: "a, b, c"

    ただし、射影式が特殊文字 (例:*') で始まる場合は、式全体をシングルクォーテーション (') またはダブルクォーテーション (") で囲むか、特殊文字をバックスラッシュ (\) でエスケープする必要があります。

    transform:
      - projection: *, 42      # 無効
      - projection: '*, 42'    # OK
      - projection: \*, 42     # OK  

フィールドのフィルタリング

変換モジュールは、SQL に似た構文を使用して射影ルールを定義します。これらのルールを使用して、選択した列を同期したり、計算列を追加したり、メタデータ列を参照したりできます。

列のプルーニング

ソーステーブルから特定の列のみを下流に同期するには、射影ルールでそれらの列を指定します。指定されていない列は下流に送信されません。

transform:
  - source-table: db.tbl
    projection: col_1, col_3, col_4 # col_2 は除外されます
重要

列のプルーニングにより、ソーススキーマの更新がシンクに到達しなくなる可能性があります。列のプルーニングは同期する列の明示的なリストに依存するため、ソーススキーマへの変更 (列の追加や削除など) は自動的に下流に反映されません。

ワイルドカードの使用

ソーステーブルのすべての列と、後で追加される新しい列をそのまま下流に送信したい場合は、射影ルールでアスタリスク (*) ワイルドカード文字を使用できます。

説明

射影ルールがワイルドカード (*) を使用しない場合、その結果のスキーマは静的になります。ルールで定義されたバージョンに固定され、スキーマ変更は正しく伝播されません。

たとえば、*, 'extras' AS extras は、ソーススキーマの末尾に追加の列を追加し、ソースからシンクへのスキーマ変更を継続的に伝播します。

transform:
  - source-table: db.tbl
    projection: \*, 'extras' AS extras

計算列

射影ルール内で <Expression> AS <ColName> 構文を使用して計算列を追加できます。この式は各データ行に対して評価され、その結果が計算列に設定されます。

説明

計算列の式は、別の計算列を参照できません。この制限は、定義される順序に関係なく適用されます。たとえば、a, b AS c, c AS d は無効です。

たとえば、上流テーブル db.tbl から [+I, id = 1] データレコードを受信した場合、それを [+I, id = 1, inc_id = 2] データ行に変換して下流に送信します。

transform:
  - source-table: db.tbl
    projection: id, id + 1 AS inc_id

メタデータ列

射影ルールでは、以下の事前定義されたメタデータ列を通常のデータ列のように使用できます。

重要

メタデータ列と同じ名前の通常のデータ列を定義しないでください。

すべてのコネクタは、以下のメタデータ列をサポートしています。

メタデータ列名

データ型

説明

__namespace_name__

String

ソーステーブルの名前空間名。

__schema_name__

String

ソーステーブルのスキーマ名。

__table_name__

String

ソーステーブルの名前。

__data_event_type__

String

データ変更イベントの操作タイプ (+I-U+U-D)。

重要

CDC イベントは常に単一の更新の更新前 (Update Before) と更新後 (Update After) の状態を単一のイベントにバンドルするため、__data_event_type__ フィールドには同じ更新イベント内に -U+U の両方が含まれます。このフィールドをプライマリキーとして使用しないでください。

例:ソーステーブルの完全修飾名を格納する計算列を追加します。

transform:
  - source-table: \.*.\.*
    projection: \*, __namespace_name__ || __schema_name__ || __table_name__ AS identifier

データベースシステム間での名前空間、スキーマ、テーブルの対応関係:

データベースタイプ

名前空間名

スキーマ名

テーブル名

MySQL

-

データベース

テーブル

Kafka

-

-

トピック

SLS

-

プロジェクト

LogStore

MongoDB

-

データベース

コレクション

Paimon

-

データベース

テーブル

Hologres

-

スキーマ

テーブル

StarRocks

-

データベース

テーブル

Doris

-

データベース

テーブル

Postgres

データベース

説明

これは、table-id.include-database パラメーターが有効になっている場合にのみ効果があります。

スキーマ

テーブル

データフィルタリング

変換モジュールは、SQL に似た構文を使用してデータフィルタリングルールを定義します。

フィルター ルールは、ブール値に評価される式です。ソーステーブルの列または計算列を参照できます。

フィルター式が FALSE と評価されたデータ変更イベントは、下流に送信されません。

説明

射影ルールでソーステーブルの列を計算列で上書きした場合、フィルター式でのその列への参照は、計算列の値を参照します。

以下は変換ルールの例です。

transform:
  - source-table: db.tbl
    projection: CAST(id AS VARCHAR) AS id
    filter: CHAR_LENGTH(id) > 5

filter 式で参照される id は、VARCHAR 型にキャストされた計算列です。

高度な設定ルール

プルーニングされていない列とメタデータの参照

例 1:プルーニングされた列に基づくフィルター

ソーステーブルの構造が [INT a, INT b, BOOLEAN c] であるとします。列 ab を出力し、ctrue の行のみを保持するには、次の設定を使用します。

transform:
  - source-table: ...
    projection: a, b
    filter: c

例 2:メタデータ列に基づくフィルター

projection で明示的に定義しなくても、メタデータ列を直接フィルター条件として使用できます。たとえば、DELETE タイプの変更イベントを除外します。

transform:
  - source-table: db.tbl
    projection: a, b
    filter: __data_event_type__ = '+I'

既存の列の上書き

上流の列と同じ名前のフィールドを projection で定義して、その値または型を上書きします。これにより、スキーマ進化が正しく機能することが保証されます。

例:型変換の強制

ソーステーブルの構造が [INT a, INT b, BOOLEAN c] であるとします。列名を保持しつつ、列 a を文字列型に強制的に変換するには、次の設定を使用します。

transform:
  - source-table: db.tbl
    projection: \*, CAST(a AS STRING)

結果テーブルの構造は [STRING a, INT b, BOOLEAN c] になります。元の a 列は新しい型定義に置き換えられます。

フィルター条件での計算列の再利用

フィルター ルールは、射影で定義された計算列のエイリアスを直接参照できます。

例:計算結果の参照

projection で新しい列 d を定義し、それを filter で直接使用します。

transform:
  - source-table: db.tbl
    projection: a, b, c, a + b + c AS d
    filter: d > 100

この設定は、以下の設定と同じ結果を生成しますが、より読みやすくなっています。

transform:
  - source-table: ...
    projection: a, b, c, a + b + c AS d
    filter: a + b + c > 100

変換後のコンバーター

すべての変換ルールが適用された後にデータ変更を処理するには、converter-after-transform パラメーターを使用します。複数のコンバーターをコンマ (,) 区切りリストで指定します。Flink は、定義した順序でコンバーターを順次適用します。有効な値は次のとおりです。

コンバーター名

機能

サポートされているバージョン

SOFT_DELETE

削除を挿入に変換します。

Ververica Runtime (VVR) 8.0.11 以降

FIELD_NAME_LOWER_CASE

テーブルのすべての列名を小文字に変換します。

VVR 11.1 以降

論理削除の実行

論理削除を実行するには、__data_event_type__ メタデータ列と SOFT_DELETE コンバーターを組み合わせます。たとえば、次の変換設定は論理削除を実装します。削除されたデータはシンクから物理的に削除されません。代わりに、Flink は削除操作を挿入操作に変換し、削除を示すために op_type の値を -D としてマークします。

transform: 
  - source-table: db.tbl
    projection: \*, __data_event_type__ AS op_type
    converter-after-transform: SOFT_DELETE