Configure the transform module in Flink CDC (Change Data Capture) jobs, including projection and filter syntax, metadata columns, and post-transform converters.
Limitations
Before configuring the transform module, note the following constraints:
-
Stateless restart required after changes: After modifying any transform rule, perform a stateless restart of the job for the changes to take effect.
-
Computed columns cannot reference each other: A computed column's expression cannot reference another computed column, regardless of definition order. For example,
a, b AS c, c AS dis invalid. -
Column pruning disables schema propagation: When
projectionexplicitly lists columns without the*wildcard, the schema is static. Changes to the source schema (such as added or removed columns) are not reflected downstream. -
Primary key columns cannot be removed by default: To reduce the number of upstream primary key columns, add the
transform.allow.trimming.pk-columnsconfiguration item to thepipeline:block. -
Custom primary keys must cover source primary keys: To prevent data disorder during cross-partition writes, include the source table's primary key columns in your custom primary key definition.
-
`__data_event_type__` cannot be used as a primary key: A single CDC update event bundles both the before (
-U) and after (+U) states, so this field is not unique within an event.
Parameters
Use the transform module to add, remove, or modify columns and filter rows during synchronization. Each transform rule targets one source table and supports the following parameters:
| Parameter | Required | Description |
|---|---|---|
source-table |
Yes | Source table to transform. Supports regular expressions. |
projection |
No | Defines all output columns after transformation, using SQL SELECT-like syntax. If blank, no columns are added or removed. For available built-in functions, see Flink CDC built-in functions. |
filter |
No | Filters which rows are sent downstream, using SQL WHERE-like syntax. If blank, no rows are excluded. |
primary-keys |
No | Primary key columns for the sink table, as a comma-separated list. If blank, inherited from the source schema. |
partition-keys |
No | Partition key columns for the sink table, as a comma-separated list. If blank, inherited from the source schema. |
table-options |
No | Additional sink configuration as comma-separated key=value pairs — for example, bucket count or comments for a Paimon sink. Example: key1=value1,key2=value2 |
description |
No | Human-readable description of the transformation rule. |
converter-after-transform |
No | Post-transform converters to apply after all transform rules, as a comma-separated list. Applied sequentially in the order defined. See Post-transform converters. |
Quotation marks in projection and filter expressions
Projection and filter expressions do not require quotation marks in most cases:
transform:
- projection: a, b, c
description: equivalent to the quoted form below
- projection: "a, b, c"
description: quoted form — same result
If an expression starts with a special character such as * or ', enclose the entire expression in single (') or double (") quotation marks, or escape the special character with a backslash (\):
transform:
- projection: *, 42 # Invalid — starts with *
- projection: '*, 42' # OK — quoted
- projection: \*, 42 # OK — escaped
Field filtering
Use projection rules to control which columns are sent downstream. Projection rules support selecting specific columns, adding computed columns, and referencing metadata columns.
Replicate specific columns
List the column names in projection to send only those columns downstream:
transform:
- source-table: db.tbl
projection: col_1, col_3, col_4
description: sync only col_1, col_3, and col_4 — col_2 is excluded
Column pruning prevents source schema changes from reaching the sink. Because the projection rule lists columns explicitly, any columns added or removed in the source are not automatically reflected downstream.
Project all columns with wildcard
To forward all existing columns and automatically propagate future schema changes, include the * wildcard in projection.
Ifprojectiondoes not include*, the resulting schema is static. Schema changes in the source will not propagate correctly to the sink.
For example, \*, 'extras' AS extras appends an extra column after all source columns and continuously propagates schema changes:
transform:
- source-table: db.tbl
projection: \*, 'extras' AS extras
description: append a static extras column while forwarding all source columns
Add a computed column
Use <Expression> AS <ColumnName> syntax in projection to define a computed column. The expression is evaluated for each data row.
transform:
- source-table: db.tbl
projection: id, id + 1 AS inc_id
description: add inc_id as id + 1
Given the following input:
| id |
|---|
| 1 |
| 2 |
The output sent downstream is:
| id | inc_id |
|---|---|
| 1 | 2 |
| 2 | 3 |
A computed column's expression cannot reference another computed column. For example,a, b AS c, c AS dis invalid —dcannot referencec.
Metadata columns
All connectors support the following predefined metadata columns. Reference them in projection or filter expressions as if they were regular data columns.
| Metadata column | Data type | Description |
|---|---|---|
__namespace_name__ |
String | Namespace name of the source table |
__schema_name__ |
String | Schema name of the source table |
__table_name__ |
String | Name of the source table |
__data_event_type__ |
String | Operation type of a data change event: +I (insert), -U (update before), +U (update after), -D (delete) |
Do not define regular data columns with the same names as metadata columns.
A single CDC update event bundles both -U and +U states. Do not use __data_event_type__ as a primary key.
Example: Add a fully qualified table identifier column
transform:
- source-table: \.*.\.*
projection: \*, __namespace_name__ || __schema_name__ || __table_name__ AS identifier
description: append a column that stores the fully qualified source table name
Metadata mapping by database
The meaning of namespace, schema, and table varies by database system:
| Database | Namespace | Schema | Table |
|---|---|---|---|
| MySQL | — | Database | Table |
| Apache Kafka | — | — | Topic |
| Simple Log Service (SLS) | — | Project | LogStore |
| MongoDB | — | Database | Collection |
| Paimon | — | Database | Table |
| Hologres | — | Schema | Table |
| StarRocks | — | Database | Table |
| Doris | — | Database | Table |
| PostgreSQL | Database (requires table-id.include-database) |
Schema | Table |
Data filtering
Use the filter parameter to define row filtering rules. A filter rule is a SQL expression that evaluates to a BOOLEAN value. Data change events for which the expression evaluates to FALSE are not sent downstream.
Filter expressions can reference:
-
Any source table column
-
Any computed column defined in
projection -
Any metadata column (without explicitly including it in
projection)
If a computed column inprojectionoverwrites a source column (same name), any reference to that column infilterresolves to the computed column's value.
Example: Filter using a computed column
transform:
- source-table: db.tbl
projection: CAST(id AS VARCHAR) AS id
filter: CHAR_LENGTH(id) > 5
description: cast id to VARCHAR, then keep only rows where the string length exceeds 5
Here, id in the filter expression refers to the computed column (VARCHAR), not the original source column.
Advanced examples
Reference non-pruned columns and metadata
Example 1: Filter by a pruned column
Source schema: [INT a, INT b, BOOLEAN c]. Output only columns a and b, but include only rows where c is true:
transform:
- source-table: db.tbl
projection: a, b
filter: c
description: output a and b, include only rows where c is true
Although c is excluded from the projection output, it remains available as a filter condition.
Example 2: Filter by a metadata column
Exclude DELETE change events without adding __data_event_type__ to the output:
transform:
- source-table: db.tbl
projection: a, b
filter: __data_event_type__ = '+I'
description: forward only INSERT events
Overwrite existing columns
Define a field in projection with the same name as an upstream column to overwrite its value or type. This is the correct way to enforce type changes while preserving schema evolution.
Example 3: Enforce type conversion
Source schema: [INT a, INT b, BOOLEAN c]. Convert column a to STRING while keeping its name:
transform:
- source-table: db.tbl
projection: \*, CAST(a AS STRING)
description: convert column a from INT to STRING
The sink schema becomes [STRING a, INT b, BOOLEAN c]. The original INT a is replaced by the STRING definition.
Reuse computed columns in filter conditions
Filter rules can reference computed column aliases defined in projection, which improves readability.
Example 4: Filter using a computed column alias
Define column d in projection, then reference it in filter:
transform:
- source-table: db.tbl
projection: a, b, c, a + b + c AS d
filter: d > 100
description: keep rows where the sum of a, b, and c exceeds 100
This produces the same result as writing filter: a + b + c > 100, but is easier to read and maintain.
Post-transform converters
Use the converter-after-transform parameter to apply additional processing after all transform rules are evaluated. List multiple converters as a comma-separated string — Flink applies them sequentially in the order defined.
| Converter | Description | Minimum version |
|---|---|---|
SOFT_DELETE |
Converts delete operations to inserts | Ververica Runtime (VVR) 8.0.11 |
FIELD_NAME_LOWER_CASE |
Converts all column names to lowercase | VVR 11.1 |
Perform a soft delete
Soft delete (logical deletion) marks deleted rows as inactive rather than removing them from the sink. Combine __data_event_type__ with the SOFT_DELETE converter to implement this pattern: Flink converts physical delete operations into inserts, with an op_type value of -D to indicate the row was deleted.
transform:
- source-table: db.tbl
projection: \*, __data_event_type__ AS op_type
converter-after-transform: SOFT_DELETE
description: implement soft delete — physical deletes become inserts with op_type = -D