このトピックでは、Flink CDC データインジェスチョンジョブドラフトの transform モジュールの構文について説明します。
パラメーター
transform モジュールを使用すると、同期中に列を追加、削除、または変更したり、データをフィルタリングしたりできます。次のパラメーターを使用して変換ルールを定義します。
パラメーター | 説明 | 必須 | 注意 |
| 変換するソーステーブルを指定します。 | はい | 正規表現がサポートされています。 |
| ソーステーブルのプロジェクションルールを指定します。これにより、変換後の出力列が定義されます。 | いいえ | 構文は SQL の このパラメーターを空のままにすると、列は追加も削除もされません。 詳細については、「プロジェクションルールを定義する」をご参照ください。利用可能なビルトイン関数については、「Flink CDC ビルトイン関数」をご参照ください。 |
| データ行をフィルタリングするためのルールを指定します。 | いいえ | 構文は SQL の このパラメーターを空のままにすると、行は除外されません。 |
| 結果テーブルのプライマリキー列を指定します。 | いいえ | このパラメーターを空のままにすると、プライマリキーの定義はソーステーブルのスキーマから継承されます。プライマリキー列を指定するには、カンマ区切りのリストとして入力します。 重要 このパラメーターを定義する場合は、インジェストされるすべてのデータがプライマリキー制約に準拠していることを確認してください。特にパーティションをまたがる書き込みでデータの順序が乱れる問題を回避するために、ソーステーブルのプライマリキー列をカスタム定義に含めることをお勧めします。 |
| 結果テーブルのパーティションキー列を指定します。 | いいえ | このパラメーターを空のままにすると、パーティションキーの定義はソーステーブルのスキーマから継承されます。パーティションキー列を指定するには、カンマ区切りのリストとして入力します。 重要 カスタムパーティションキー列を定義する際は、インジェストされるすべてのデータがプライマリキー制約に準拠していることを確認してください。これにより、パーティションをまたがる書き込み中にデータの順序が乱れる問題を回避できます。 |
| シンクに渡す追加の設定項目を指定します。 | いいえ | Paimon シンクのバケット数やコメントなどのオプションのプロパティ。 カンマ ( 例:
|
| 変換ルールの説明。 | いいえ | N/A。 |
| 変換後に追加の処理を実行するコンバーター。 | いいえ | 「変換後にコンバーターを使用する」をご参照ください。 |
使用上の注意
transformモジュール内の文を変更した後は、ジョブをステートレスで再起動してください。通常、
projection文とfilter文を引用符で囲む必要はありません。transform: - projection: a, b, c # 上記の行は次の行と同等です。 - projection: "a, b, c"ただし、プロジェクション式が特殊文字 (例:
*または') で始まる場合に正しく解析されるようにするには、式全体を単一引用符 (') または二重引用符 (") で囲むか、バックスラッシュ (\) で特殊文字をエスケープします。transform: - projection: *, 42 # 無効 - projection: '*, 42' # OK - projection: \*, 42 # OK
プロジェクションルールを定義する
transform モジュールは、SQL に似た構文を使用してプロジェクションルールを定義します。これらのルールを使用して、選択した列を同期したり、計算列を追加したり、メタデータ列を参照したりできます。
列のプルーニングを実行する
ソースからシンクに特定の列を同期するには、projection ルールでそれらを指定します。指定されていない列は除外されます。
transform:
- source-table: db.tbl
projection: col_1, col_3, col_4 # col_2 は除外されます列のプルーニングにより、ソーススキーマの更新がシンクに到達するのを防ぐことができます。列のプルーニングは同期する列の明示的なリストに依存するため、ソーススキーマの変更 (列の追加や削除など) は自動的にダウンストリームに反映されません。
ワイルドカードを使用する
既存のすべての列と新しく追加された列をそのままダウンストリームに送信するには、projection ルールでワイルドカード文字 (*) を使用します。
ワイルドカード (*) の代わりに明示的な列のリストを使用すると、静的なスキーマになります。これは、ソーステーブルに追加された新しい列が自動的にシンクに含まれないことを意味します。
たとえば、*, 'extras' AS extras は、ソーススキーマの末尾に追加の列を追加し、ソースからシンクへのスキーマ変更を継続的に伝播します。
transform:
- source-table: db.tbl
projection: \*, 'extras' AS extras計算列を追加する
プロジェクションルール内で <Expression> AS <ColName> 構文を使用して計算列を追加できます。この式は各データ行に対して評価され、その結果が計算列に設定されます。
計算列の式は、別の計算列を参照できません。この制限は、定義される順序に関係なく適用されます。たとえば、a, b AS c, c AS d は無効です。
たとえば、ソーステーブル db.tbl からの [+I, id = 1] を [+I, id = 1, inc_id = 2] に変換し、それを結果テーブルにインジェストします。
transform:
- source-table: db.tbl
projection: id, id + 1 AS inc_idメタデータ列を参照する
projection ルールでは、次の事前定義されたメタデータ列を通常のデータ列であるかのように使用できます。
メタデータ列と同じ名前の通常のデータ列を定義しないでください。
メタデータ列名 | データ型 | 説明 |
| String | ソーステーブルの名前空間名。 |
| String | ソーステーブルのスキーマ名。 |
| String | ソーステーブルの名前。 |
| String | データ変更イベントの操作タイプ ( 重要 このメタデータ列をプライマリキーとして使用しないでください。Flink CDC の更新イベントは、常に単一の更新の「前」( |
例: ソーステーブルの完全修飾名を格納する計算列を追加します。
transform:
- source-table: \.*.\.*
projection: \*, __namespace_name__ || __schema_name__ || __table_name__ AS identifierデータベースシステム間での名前空間、スキーマ、およびテーブルの同等物:
データベースシステム | 名前空間 | スキーマ | テーブル |
MySQL | データベース | - | テーブル |
Kafka | - | - | Topic |
SLS | - | プロジェクト | LogStore |
MongoDB | - | データベース | Collection |
Paimon | - | データベース | テーブル |
Hologres | - | スキーマ | テーブル |
StarRocks | データベース | - | テーブル |
Doris | データベース | - | テーブル |
フィルターを追加する
Transform モジュールは、SQL に似た構文を使用してデータフィルタリングルールを定義します。
filter ルールは、BOOLEAN 型に評価される式です。ソーステーブルの任意の列または計算列を参照できます。
フィルター式が FALSE と評価されるデータ変更イベントは、ダウンストリームに送信されません。
プロジェクションルールでソーステーブルの列を計算列で上書きした場合、フィルター式でのその列への参照は、計算列の値を参照します。
たとえば、次の変換ルールは有効です。
transform:
- source-table: db.tbl
projection: CAST(id AS VARCHAR) AS id
filter: CHAR_LENGTH(id) > 5filter 式で参照される id は、VARCHAR 型にキャストされた計算列です。
変換後にコンバーターを使用する
converter-after-transform パラメーターを使用して、すべての変換ルールが適用された後にデータ変更を処理します。複数のコンバーターをカンマ区切りのリストで指定すると、Flink は定義された順序でそれらを順次適用します。有効な値:
コンバーター名 | 説明 | サポートされているバージョン |
| 削除を挿入に変換します。 | Ververica Runtime (VVR) 8.0.11 以降 |
| テーブルのすべての列名を小文字に変換します。 | VVR 11.1 以降 |
ソフトデリートを実行する
ソフトデリートを実行するには、SOFT_DELETE コンバーターを __data_event_type__ メタデータ列と組み合わせます。このアプローチにより、削除イベント時にデータがシンクから物理的に削除されないことが保証されます。代わりに、Flink は削除を挿入に変換し、op_type 列に -D でマークします。
transform:
- source-table: db.tbl
projection: \*, __data_event_type__ AS op_type
converter-after-transform: SOFT_DELETE