本文向您介紹Flink CDC資料攝入作業的 Transform 模組支援的文法規則及內建函數。
Transform規則參數
Transform 模組支援使用者直接操作資料列,可對現有列執行刪除或擴充,同時支援在資料同步過程中過濾不需要的資料。您可以使用以下參數來定義Transform規則:
參數 | 含義 | 是否必填 | 備忘 |
| 指定需要轉換的上遊表。 | 是 | 支援使用Regex。 |
| 指定上遊表的投影規則,決定了上遊錶轉換後的所有資料列。 | 否 | 使用的句法與SQL SELECT語句類似。 不填則不追加或刪除任何列。 詳情請參見資料篩選,可使用的內建函數請參考Flink CDC內建函數文檔。 |
| 資料行過濾規則。 | 否 | 使用的句法與SQL WHERE語句類似。 不填則不過濾任何行。 |
| 指定轉換後的主鍵列。 | 否 | 不填則保留原Schema的主鍵定義。主鍵列表使用英文逗號( 重要 預設情況下,不支援刪除來自上遊的主鍵列約束。您需要在 自訂主鍵列時,您需要確保上遊到來的資料符合主鍵約束。建議自訂主鍵列中包含上遊表的主鍵列,以避免跨分區寫入時的資料亂序問題。 |
| 指定轉換後分區鍵列表。 | 否 | 不填則保留原Schema的分區鍵定義,分區鍵列表使用英文逗號( 重要 自訂分區列時,您需要確保上遊到來的資料符合主鍵約束,以避免跨分區寫入時的資料亂序問題。 |
| 傳遞給Sink的額外配置資訊。 | 否 | 可選的屬性列表,例如Paimon Sink的分桶數、注釋等資訊。 不同配置項通過 配置樣本:
|
| 轉換規則的描述資訊。 | 否 | 無。 |
| 在轉換完成後對資料額外處理的轉換器。 | 否 |
注意事項
修改transform模組的語句後,需要無狀態重新啟動作業。
通常情況下,projection和filter語句無需使用引號包裹。
transform: - projection: a, b, c # 等價於 - projection: "a, b, c"然而,如果Projection運算式的第一個字元為
*、'等特殊字元,則整行運算式可能無法被作為合法的YAML字串字面量解析。此時需要手動使用'或"包裹整個運算式,或是使用\轉義:transform: - projection: *, 42 # 不是合法的YAML - projection: '*, 42' # OK - projection: \*, 42 # OK
欄位篩選
資料攝入Transform模組採用類 SQL 文法來定義欄位篩選(Projection)規則,可以完成選取部分列、添加計算資料行、添加中繼資料列等功能。
列裁剪
如果您希望取出源表中某些特定列並同步給下遊,可以在projection規則中將需要同步的列寫出,未被指定的列將不會被發送給下遊:
transform:
- source-table: db.tbl
projection: col_1, col_3, col_4 # col_2 會被裁剪裁剪部分列可能導致上遊表發生結構變更時,上下遊表結構失去同步。
萬用字元
如果您希望將源表中的所有列以及後續追加的新列按原樣發送給下遊,則可以在projection規則中使用星號(*)萬用字元。
如果一個projection規則中沒有使用萬用字元(*),則其產生的Schema就是固定的,並且始終與projection規則中寫出的版本保持一致,結構變更將無法正常同步。
例如,*, 'extras' AS extras表示會在上遊Schema的列尾追加額外的列,並持續將上遊的表結構變更發送給下遊。
transform:
- source-table: db.tbl
projection: \*, 'extras' AS extras計算資料行
您可以在projection規則中使用<Expression> AS <ColName>句法來添加計算資料行,運算式將對上遊的每條資料分別求值後填入相應列。
計算資料行的運算式不能引用其他計算資料行的值,即使被引用的列出現在該計算資料行之前。例如a, b AS c, c AS d不是合法的運算式。
例如,在接收到來自上遊db.tbl表的[+I, id = 1]資料記錄時,將其轉化為[+I, id = 1, inc_id = 2]資料行並發送給下遊。
transform:
- source-table: db.tbl
projection: id, id + 1 AS inc_id中繼資料列(Metadata Column)
在編寫projection規則時,可以將以下預先定義的中繼資料列作為普通資料列使用:
請勿定義與中繼資料列同名的普通資料列。
以下中繼資料列適用於所有連接器。
中繼資料列名稱 | 資料類型 | 說明 |
| String | 這條資料變更記錄對應源表的Namespace名稱。 |
| String | 這條資料變更記錄對應源表的Schema名稱。 |
| String | 這條資料變更記錄對應源表的Table名稱。 |
| String | 這條資料變更記錄對應的操作類型( 重要 由於CDC Event總是將一次更新對應的Update Before和Update After打包為一條事件,因此 |
例如,將上遊表的全限定名稱寫入計算資料行中,並發送給下遊。
transform:
- source-table: \.*.\.*
projection: \*, __namespace_name__ || __schema_name__ || __table_name__ AS identifier各個資料庫連接器對Namespace、Schema和Table名稱的映射關係如下表所示。
資料庫類型 | Namespace名稱 | Schema名稱 | Table名稱 |
MySQL | - | Database | Table |
Kafka | - | - | Topic |
SLS | - | Project | LogStore |
MongoDB | - | Database | Collection |
Paimon | - | Database | Table |
Hologres | - | Schema | Table |
StarRocks | - | Database | Table |
Doris | - | Database | Table |
Postgres | Database 說明 在啟用 | Schema | Table |
資料過濾
資料攝入Transform採用類 SQL 文法來定義行過濾規則。
Filter規則應當是一個可被求值為BOOLEAN類型的運算式,可以引用源表中的任意列及計算資料行。
如果某條資料變更記錄匹配了一個Filter不為空白的Transform規則,並且Filter運算式的求值結果為FALSE,那麼該行資料將不會被發送給下遊。
如果您在Projection規則中使用計算資料行覆蓋了上遊已存在的某一列,那麼在Filter運算式中引用的是計算資料行。
例如如下的這個Transform規則:
transform:
- source-table: db.tbl
projection: CAST(id AS VARCHAR) AS id
filter: CHAR_LENGTH(id) > 5是合法的,filter運算式中所引用的 id是已經被轉換為 VARCHAR 類型的計算資料行。
進階配置規則
引用非裁剪列與中繼資料
樣本1:基於被裁剪的列過濾
上遊表結構為[INT a, INT b, BOOLEAN c]。若需輸出a、b列,但僅保留c為true的行,可使用如下配置規則:
transform:
- source-table: ...
projection: a, b
filter: c樣本 2:基於中繼資料列過濾
直接使用中繼資料列作為過濾條件,無需在projection中顯式定義它們。例如,過濾掉刪除(DELETE)類型的變更資料。
transform:
- source-table: db.tbl
projection: a, b
filter: __data_event_type__ = '+I'覆寫現有列
在projection中定義與上遊列同名的欄位,即可覆寫該列的值或類型,通過這種使用方式能夠保證結構變更正常同步。
樣本:強制類型轉換
上遊表結構為[INT a, INT b, BOOLEAN c]。若需保持列名不變,但強制將a列轉換為字串類型。
transform:
- source-table: db.tbl
projection: \*, CAST(a AS STRING)下遊表結構將變為[STRING a, INT b, BOOLEAN c]。原有的a列已被新定義的類型覆蓋。
過濾條件複用計算資料行
filter 可以直接引用 projection 中定義的計算資料行別名。
樣本:引用計算結果
在projection中定義新列d,並在filter中直接使用它。
transform:
- source-table: db.tbl
projection: a, b, c, a + b + c AS d
filter: d > 100此配置和以下寫法效果相同,但更可讀:
transform:
- source-table: ...
projection: a, b, c, a + b + c AS d
filter: a + b + c > 100Transform 後轉換器(Converter after Transform)
converter-after-transform 用於在全部的transform規則後處理資料變更。可以使用英文,串連多個轉換器使用,訊息會按照轉換器的順序進行修改。目前支援配置值如下。
轉換器名稱 | 功能 | 支援版本 |
SOFT_DELETE | 將刪除變更轉換為插入。 | VVR 8.0.11及以上版本。 |
FIELD_NAME_LOWER_CASE | 表的欄位名全部轉為小寫。 | VVR 11.1及以上版本。 |
邏輯刪除
SOFT_DELETE轉換器結合中繼資料列 __data_event_type__可以實現邏輯刪除。例如如下的transform配置可以實現邏輯刪除,刪除資料不會在下遊真正地刪除資料。刪除的資料會轉化為插入,對應資料的op_type更新為-D來表示刪除。
transform:
- source-table: db.tbl
projection: \*, __data_event_type__ AS op_type
converter-after-transform: SOFT_DELETE