このトピックでは、Flink CDC のデータインジェストジョブにおける変換モジュールでサポートされている構文とビルトイン関数について説明します。
変換ルールのパラメーター
変換モジュールを使用すると、同期中に列の追加、削除、変更、およびデータのフィルタリングができます。変換ルールを定義するには、次のパラメーターを使用します。
パラメーター | 説明 | 必須 | 注意 |
| 変換対象のソーステーブルを指定します。 | はい | 正規表現がサポートされています。 |
| ソーステーブルの射影ルールを指定します。これにより、変換後のすべての出力列が定義されます。 | いいえ | 構文は SQL の SELECT 文に似ています。 このパラメーターが空の場合、列の追加や削除は行われません。 詳細については、「射影ルールの定義」をご参照ください。利用可能なビルトイン関数については、「Flink CDC のビルトイン関数」をご参照ください。 |
| データ行をフィルタリングするためのルールを指定します。 | いいえ | 構文は SQL の WHERE 句に似ています。 このパラメーターが空の場合、除外される行はありません。 |
| 結果テーブルのプライマリキー列を指定します。 | いいえ | このパラメーターが空の場合、プライマリキー定義はソーステーブルのスキーマから継承されます。プライマリキー列はコンマ ( 重要 デフォルトでは、上流テーブルからプライマリキー制約を削除することはできません。上流のプライマリキー列を削減するには、 カスタムのプライマリキー列を定義する場合、インジェストされるすべてのデータがプライマリキー制約に準拠していることを確認してください。パーティションをまたがる書き込み中にデータの不整合を避けるため、カスタム定義にソーステーブルのプライマリキー列を含めてください。 |
| 結果テーブルのパーティションキー列を指定します。 | いいえ | このパラメーターが空の場合、パーティションキー定義はソーステーブルのスキーマから継承されます。パーティションキー列はコンマ ( 重要 カスタムのパーティションキー列を定義する場合、インジェストされるすべてのデータがプライマリキー制約に準拠していることを確認してください。これにより、パーティションをまたがる書き込み中のデータの不整合を防ぎます。 |
| シンクに渡す追加の設定項目を指定します。 | いいえ | Paimon シンクのバケット数やコメントなどのオプションのプロパティ。 コンマ ( 例:
|
| 変換ルールの説明。 | いいえ | N/A。 |
| 変換後に追加の処理を実行するコンバーターです。 | いいえ | 「変換後のコンバーターの使用」をご参照ください。 |
注意事項
変換モジュール内の文を変更した後は、ジョブのステートレス再起動を実行してください。
通常、`projection` 文と `filter` 文を引用符で囲む必要はありません。
transform: - projection: a, b, c # 上記の行は次の行と同等です。 - projection: "a, b, c"ただし、射影式が特殊文字 (例:
*や') で始まる場合は、式全体をシングルクォーテーション (') またはダブルクォーテーション (") で囲むか、特殊文字をバックスラッシュ (\) でエスケープする必要があります。transform: - projection: *, 42 # 無効 - projection: '*, 42' # OK - projection: \*, 42 # OK
フィールドのフィルタリング
変換モジュールは、SQL に似た構文を使用して射影ルールを定義します。これらのルールを使用して、選択した列を同期したり、計算列を追加したり、メタデータ列を参照したりできます。
列のプルーニング
ソーステーブルから特定の列のみを下流に同期するには、射影ルールでそれらの列を指定します。指定されていない列は下流に送信されません。
transform:
- source-table: db.tbl
projection: col_1, col_3, col_4 # col_2 は除外されます列のプルーニングにより、ソーススキーマの更新がシンクに到達しなくなる可能性があります。列のプルーニングは同期する列の明示的なリストに依存するため、ソーススキーマへの変更 (列の追加や削除など) は自動的に下流に反映されません。
ワイルドカードの使用
ソーステーブルのすべての列と、後で追加される新しい列をそのまま下流に送信したい場合は、射影ルールでアスタリスク (*) ワイルドカード文字を使用できます。
射影ルールがワイルドカード (*) を使用しない場合、その結果のスキーマは静的になります。ルールで定義されたバージョンに固定され、スキーマ変更は正しく伝播されません。
たとえば、*, 'extras' AS extras は、ソーススキーマの末尾に追加の列を追加し、ソースからシンクへのスキーマ変更を継続的に伝播します。
transform:
- source-table: db.tbl
projection: \*, 'extras' AS extras計算列
射影ルール内で <Expression> AS <ColName> 構文を使用して計算列を追加できます。この式は各データ行に対して評価され、その結果が計算列に設定されます。
計算列の式は、別の計算列を参照できません。この制限は、定義される順序に関係なく適用されます。たとえば、a, b AS c, c AS d は無効です。
たとえば、上流テーブル db.tbl から [+I, id = 1] データレコードを受信した場合、それを [+I, id = 1, inc_id = 2] データ行に変換して下流に送信します。
transform:
- source-table: db.tbl
projection: id, id + 1 AS inc_idメタデータ列
射影ルールでは、以下の事前定義されたメタデータ列を通常のデータ列のように使用できます。
メタデータ列と同じ名前の通常のデータ列を定義しないでください。
すべてのコネクタは、以下のメタデータ列をサポートしています。
メタデータ列名 | データ型 | 説明 |
| String | ソーステーブルの名前空間名。 |
| String | ソーステーブルのスキーマ名。 |
| String | ソーステーブルの名前。 |
| String | データ変更イベントの操作タイプ ( 重要 CDC イベントは常に単一の更新の更新前 (Update Before) と更新後 (Update After) の状態を単一のイベントにバンドルするため、 |
例:ソーステーブルの完全修飾名を格納する計算列を追加します。
transform:
- source-table: \.*.\.*
projection: \*, __namespace_name__ || __schema_name__ || __table_name__ AS identifierデータベースシステム間での名前空間、スキーマ、テーブルの対応関係:
データベースタイプ | 名前空間名 | スキーマ名 | テーブル名 |
MySQL | - | データベース | テーブル |
Kafka | - | - | トピック |
SLS | - | プロジェクト | LogStore |
MongoDB | - | データベース | コレクション |
Paimon | - | データベース | テーブル |
Hologres | - | スキーマ | テーブル |
StarRocks | - | データベース | テーブル |
Doris | - | データベース | テーブル |
Postgres | データベース 説明 これは、 | スキーマ | テーブル |
データフィルタリング
変換モジュールは、SQL に似た構文を使用してデータフィルタリングルールを定義します。
フィルター ルールは、ブール値に評価される式です。ソーステーブルの列または計算列を参照できます。
フィルター式が FALSE と評価されたデータ変更イベントは、下流に送信されません。
射影ルールでソーステーブルの列を計算列で上書きした場合、フィルター式でのその列への参照は、計算列の値を参照します。
以下は変換ルールの例です。
transform:
- source-table: db.tbl
projection: CAST(id AS VARCHAR) AS id
filter: CHAR_LENGTH(id) > 5filter 式で参照される id は、VARCHAR 型にキャストされた計算列です。
高度な設定ルール
プルーニングされていない列とメタデータの参照
例 1:プルーニングされた列に基づくフィルター
ソーステーブルの構造が [INT a, INT b, BOOLEAN c] であるとします。列 a と b を出力し、c が true の行のみを保持するには、次の設定を使用します。
transform:
- source-table: ...
projection: a, b
filter: c例 2:メタデータ列に基づくフィルター
projection で明示的に定義しなくても、メタデータ列を直接フィルター条件として使用できます。たとえば、DELETE タイプの変更イベントを除外します。
transform:
- source-table: db.tbl
projection: a, b
filter: __data_event_type__ = '+I'既存の列の上書き
上流の列と同じ名前のフィールドを projection で定義して、その値または型を上書きします。これにより、スキーマ進化が正しく機能することが保証されます。
例:型変換の強制
ソーステーブルの構造が [INT a, INT b, BOOLEAN c] であるとします。列名を保持しつつ、列 a を文字列型に強制的に変換するには、次の設定を使用します。
transform:
- source-table: db.tbl
projection: \*, CAST(a AS STRING)結果テーブルの構造は [STRING a, INT b, BOOLEAN c] になります。元の a 列は新しい型定義に置き換えられます。
フィルター条件での計算列の再利用
フィルター ルールは、射影で定義された計算列のエイリアスを直接参照できます。
例:計算結果の参照
projection で新しい列 d を定義し、それを filter で直接使用します。
transform:
- source-table: db.tbl
projection: a, b, c, a + b + c AS d
filter: d > 100この設定は、以下の設定と同じ結果を生成しますが、より読みやすくなっています。
transform:
- source-table: ...
projection: a, b, c, a + b + c AS d
filter: a + b + c > 100変換後のコンバーター
すべての変換ルールが適用された後にデータ変更を処理するには、converter-after-transform パラメーターを使用します。複数のコンバーターをコンマ (,) 区切りリストで指定します。Flink は、定義した順序でコンバーターを順次適用します。有効な値は次のとおりです。
コンバーター名 | 機能 | サポートされているバージョン |
SOFT_DELETE | 削除を挿入に変換します。 | Ververica Runtime (VVR) 8.0.11 以降 |
FIELD_NAME_LOWER_CASE | テーブルのすべての列名を小文字に変換します。 | VVR 11.1 以降 |
論理削除の実行
論理削除を実行するには、__data_event_type__ メタデータ列と SOFT_DELETE コンバーターを組み合わせます。たとえば、次の変換設定は論理削除を実装します。削除されたデータはシンクから物理的に削除されません。代わりに、Flink は削除操作を挿入操作に変換し、削除を示すために op_type の値を -D としてマークします。
transform:
- source-table: db.tbl
projection: \*, __data_event_type__ AS op_type
converter-after-transform: SOFT_DELETE