このトピックでは、Flink CDC データインジェストジョブ向けに、変換モジュールがサポートする構文ルールおよびビルトイン関数について説明します。
Transform ルールのパラメーター
Transform モジュールを使用すると、データ列を直接操作できます。データ同期中に、既存の列を削除または拡張したり、不要なデータをフィルタリングしたりできます。次のパラメーターを使用して、変換ルールを定義します。
パラメーター | 説明 | 必須 | 備考 |
| 変換対象の上流テーブルを指定します。 | はい | 正規表現をサポートしています。 |
| 上流テーブルのプロジェクション(投影)ルールを指定します。この設定により、変換後のすべてのデータ列が決定されます。 | いいえ | 構文は SQL の SELECT 文に似ています。 空欄のままにした場合、列は追加も削除もされません。 詳細については、「データスクリーニング」をご参照ください。利用可能なビルトイン関数については、「Flink CDC ビルトイン関数」ドキュメントをご参照ください。 |
| 行フィルタリングルールです。 | いいえ | 構文は SQL の WHERE 文と類似しています。 空欄のままにした場合、行はフィルター処理されません。 |
| 変換後のプライマリキー列を指定します。 | いいえ | 空欄のままにした場合、元のスキーマから定義されたプライマリキーが保持されます。プライマリキーはカンマ ( 重要 カスタムプライマリキー列を定義する場合は、上流から受信されるデータが主キー制約 (PRIMARY KEY constraint) を満たすことを確認してください。また、上流テーブルのプライマリキー列をカスタムプライマリキー列に含めることを推奨します。これにより、クロスパーティション書き込み時のデータ順序不整合を防止できます。 |
| 変換後のパーティションキー列のリストを指定します。 | いいえ | 空欄のままにした場合、元のスキーマから定義されたパーティションキーが保持されます。パーティションキーはカンマ ( 重要 カスタムパーティションキー列を定義する場合は、上流から受信されるデータが主キー制約 (PRIMARY KEY constraint) を満たすことを確認してください。これにより、クロスパーティション書き込み時のデータ順序不整合を防止できます。 |
| シンクに渡される追加の構成情報です。 | いいえ | バケット数や Paimon シンク向けのコメントなど、任意の属性のリストです。 異なる構成項目はカンマ ( 構成例:
|
| 変換ルールの説明です。 | いいえ | なし。 |
| 変換完了後にデータに対して追加の処理を実行するコンバーターです。 | いいえ | 詳細については、「変換後のコンバーター」をご参照ください。 |
注意事項
変換モジュール内のステートメントを変更した後は、ジョブをステートレス再起動する必要があります。
通常、projection 文と filter 文を引用符で囲む必要はありません。
transform: - projection: a, b, c # 以下と同等 - projection: "a, b, c"ただし、プロジェクション式の先頭文字がアスタリスク (
*) やシングルクォート (') などの特殊文字である場合、その式全体が有効な YAML 文字列リテラルとして解析されない可能性があります。この場合、式全体をシングルクォート (') またはダブルクォート (") で囲むか、バックスラッシュ (\) を使用して文字をエスケープする必要があります:transform: - projection: *, 42 # 無効な YAML - projection: '*, 42' # OK - projection: \*, 42 # OK
フィールドのフィルタリング
データインジェストの変換モジュールでは、SQL に類似した構文を使用してプロジェクションルールを定義します。これらのルールを用いて、特定の列を選択したり、計算列やメタデータ列を追加したりできます。
列のプルーニング
ソーステーブルから特定の列を抽出し、それらを下流の宛先に同期したい場合、プロジェクションルールに同期対象の列を列挙します。指定されていない列は下流の宛先には送信されません:
transform:
- source-table: db.tbl
projection: col_1, col_3, col_4 # col_2 はプルーニングされる列の削減により、上流テーブルのスキーマが変更された場合に、上流および下流テーブルのスキーマが非同期状態になる可能性があります。
ワイルドカード
ソーステーブルのすべての列および今後追加される新しい列をそのまま下流に送信したい場合、プロジェクションルール内でアスタリスク (*) ワイルドカードを使用できます。
プロジェクションルールでアスタリスク (*) ワイルドカードを使用しない場合、生成されるスキーマは固定されます。projection ルールで指定されたバージョンと常に一致します。
たとえば、*, 'extras' AS extras は、上流スキーマの列の末尾に追加列を付与します。また、上流スキーマ進化 (schema evolution) を継続的に下流の宛先に送信します。
transform:
- source-table: db.tbl
projection: \*, 'extras' AS extras計算列
「<Expression> AS <ColName>」構文を projection ルールで使用して、計算列を追加できます。この式は、各上流のデータレコードに対して評価され、その結果が対応する列に挿入されます。
計算列の式は、たとえ参照される列がそれより前に出現していても、別の計算列の値を参照することはできません。たとえば、a, b AS c, c AS d は無効な式です。
たとえば、上流の db.tbl テーブルから [+I, id = 1] のデータレコードを受信した場合、それは [+I, id = 1, inc_id = 2] のデータ行に変換され、下流の宛先に送信されます。
transform:
- source-table: db.tbl
projection: id, id + 1 AS inc_idメタデータ列
プロジェクションルールを記述する際に、以下の定義済みメタデータ列を通常のデータ列として使用できます。
メタデータ列と同じ名前の通常のデータ列を定義しないでください。
以下のメタデータ列は、すべてのコネクタに適用されます。
メタデータ列名 | データ型 | 説明 |
| String | このデータ変更レコードに対応するソーステーブルの名前空間名です。 |
| String | このデータ変更レコードに対応するソーステーブルのスキーマ名です。 |
| String | このデータ変更レコードに対応するソーステーブルのテーブル名です。 |
| String | このデータ変更レコードの操作タイプ ( 重要 CDC イベントでは、単一の更新に対する Update Before および Update After 操作が常に 1 つのイベントにパッケージ化されます。したがって、同一の更新イベントにおける |
たとえば、上流テーブルの完全修飾名を計算列に書き込み、それを下流の宛先に送信します。
transform:
- source-table: \.*.\.*
projection: \*, __namespace_name__ || __schema_name__ || __table_name__ AS identifier以下の表は、データベースタイプごとのコネクタにおける名前空間名、スキーマ名、テーブル名のマッピングを示しています。
データベースタイプ | 名前空間名 | スキーマ名 | テーブル名 |
MySQL | - | Database | テーブル |
Kafka | - | - | Topic |
SLS | - | プロジェクト | LogStore |
MongoDB | - | データベース | Collection |
Paimon | - | データベース | テーブル |
Hologres | - | スキーマ | テーブル |
StarRocks | - | Database | テーブル |
Doris | - | Database | テーブル |
Postgres | Database 説明
| Schema | Table |
データのフィルタリング
データインジェストの Transform モジュールは、SQL に似た構文を使用して行フィルタリングルールを定義します。
フィルタールールは、BOOLEAN 型に評価可能な式である必要があります。ソーステーブルの任意の列および計算列を参照できます。
データ変更レコードが、空でないフィルターを持つ変換ルールに一致し、かつフィルター式の評価結果が FALSE の場合、そのデータ行は下流の宛先に送信されません。
プロジェクションルール内で計算列を定義して既存の上流列を上書きした場合、フィルター式はその計算列を参照します。
たとえば、以下の変換ルールを考えてください:
transform:
- source-table: db.tbl
projection: CAST(id AS VARCHAR) AS id
filter: CHAR_LENGTH(id) > 5これは有効です。id は、フィルター 式で参照される計算列であり、VARCHAR 型に変換されています。
高度な設定ルール
プルーニングされていない列とメタデータの参照
例 1:削減された列に基づくフィルタリング
上流テーブルのスキーマは [INT a, INT b, BOOLEAN c] です。列 a および b を出力し、かつ c が true のみの行を保持するには、以下の構成ルールを使用します:
transform:
- source-table: ...
projection: a, b
filter: c例 2:メタデータ列に基づくフィルタリング
メタデータ列を projection で明示的に定義せずに、フィルター条件として直接使用できます。たとえば、DELETE 型のデータ変更を除外するには:
transform:
- source-table: db.tbl
projection: a, b
filter: __data_event_type__ = '+I'既存の列の上書き
projection 内で、上流列と同じ名前のフィールドを定義すると、その列の値または型を上書きできます。
例:強制的な型変換
上流テーブルのスキーマは [INT a, INT b, BOOLEAN c] です。列名を変更せずに、列 a の型を STRING に強制変換するには:
transform:
- source-table: db.tbl
projection: \*, CAST(a AS STRING)下流テーブルのスキーマは [STRING a, INT b, BOOLEAN c] になります。元の列 a は新しく定義された型によって上書きされます。
フィルター条件での計算列の再利用
フィルターは、プロジェクション内で定義された計算列のエイリアスを直接参照できます。
例:計算結果の参照
d という新しい列を projection で定義し、それを filter で直接使用します。
transform:
- source-table: db.tbl
projection: a, b, c, a + b + c AS d
filter: d > 100この構成は、以下の構成と同等の効果を持ちますが、可読性が向上します:
transform:
- source-table: ...
projection: a, b, c, a + b + c AS d
filter: a + b + c > 100変換後のコンバーター
converter-after-transform パラメーターは、すべての変換ルールが適用された後のデータ変更を処理するために使用されます。カンマ (,) を使用して複数のコンバーターを接続できます。メッセージはコンバーターの並び順に従って順次変更されます。現在サポートされている構成値は以下のとおりです。
コンバーター名 | 機能 | サポートされているバージョン |
SOFT_DELETE | 削除変更を挿入に変換します。 | Ververica Runtime (VVR) 8.0.11 以降。 |
FIELD_NAME_LOWER_CASE | テーブル内のすべてのフィールド名を小文字に変換します。 | VVR 11.1 以降。 |
論理削除
SOFT_DELETE コンバーターと __data_event_type__ メタデータ列を組み合わせることで、論理削除を実装できます。たとえば、以下の変換構成は論理削除を実装します。削除されたデータは下流の宛先で実際に削除されず、代わりに削除操作が挿入に変換され、対応するデータの op_type が -D に更新されて削除であることが示されます。
transform:
- source-table: db.tbl
projection: \*, __data_event_type__ AS op_type
converter-after-transform: SOFT_DELETE