This topic describes the syntax for the transform module in Flink CDC data ingestion job drafts.
Parameters
The transform module lets you add, remove, or modify columns, and filter data during synchronization. Use the following parameters to define transform rules:
Parameter | Description | Required | Note |
| Specifies the source table to transform. | Yes | Regular expressions are supported. |
| Specifies the projection rule for the source table. This defines the output columns after the transformation. | No | The syntax is similar to a SQL If this parameter is left empty, no columns are added or deleted. For more information, see Define projection rules. For available built-in functions, see Flink CDC built-in functions. |
| Specifies the rule for filtering data rows. | No | The syntax is similar to a SQL If this parameter is left empty, no rows will be excluded. |
| Specifies the primary key columns for the sink table. | No | If this parameter is left empty, the primary key definition will be inherited from the source table's schema. To specify primary key columns, enter them as a comma-separated list. Important If you define this parameter, ensure all ingested data adheres to the primary key constraints. To avoid data disorder issues, particularly with cross-partition writes, we recommend including the source table's primary key columns in your custom definition. |
| Specifies the partition key columns for the sink table. | No | If this parameter is left empty, the partition key definition will be inherited from the source table's schema. To specify partition key columns, enter them as a comma-separated list. Important When defining custom partition key columns, ensure that all ingested data adheres to the primary key constraints. This can prevent data disorder issues during cross-partition writes. |
| Specifies additional configuration items to pass to the sink. | No | Optional properties, such as the number of buckets or comments for a Paimon sink. Use commas ( Example:
|
| A description of the transformation rule. | No | N/A. |
| A converter that performs additional processing after the transformation. | No |
Usage notes
After you modify statements in the
transformmodule, perform a stateless restart for the job.Typically, you do not need to enclose
projectionandfilterstatements in quotation marks.transform: - projection: a, b, c # The preceding line is equivalent to the following line. - projection: "a, b, c"However, to ensure correct parsing of a projection expression begins with a special character (e.g.,
*or'), enclose the entire expression in single (') or double (") quotation marks, or escape the special character with a backslash (\):transform: - projection: *, 42 # Invalid - projection: '*, 42' # OK - projection: \*, 42 # OK
Define projection rules
The transform module uses a SQL-like syntax to define projection rules. You can use these rules to sync a selection of columns, add computed columns, or reference metadata columns.
Perform column pruning
To sync specific columns from source to sink, specify them in the projection rule. Unspecified columns are excluded:
transform:
- source-table: db.tbl
projection: col_1, col_3, col_4 # col_2 will be excludedColumn pruning can prevent source schema updates from reaching the sink. Because column pruning relies on an explicit list of columns to sync, changes to the source schema (like added or removed columns) will not automatically be reflected downstream.
Use wildcards
To send all existing and any newly added columns downstream as-is, use the wildcard character (*) in the projection rule.
Using an explicit list of columns instead of the wildcard (*) results in a static schema. This means new columns added to the source table will not be automatically included in the sink.
For example, *, 'extras' AS extras appends an extra column to the end of the source schema and continuously propagates schema changes from the source to the sink.
transform:
- source-table: db.tbl
projection: \*, 'extras' AS extrasAdd computed columns
You can add a computed column using the <Expression> AS <ColName> syntax within the projection rule. This expression is evaluated for each data row, and its result populates the computed column.
A computed column's expression cannot reference another computed column. This limitation applies regardless of the order in which they are defined. For example, a, b AS c, c AS d is not valid.
For example, transform [+I, id = 1] from the source table db.tbl into [+I, id = 1, inc_id = 2], and ingest it to the sink table:
transform:
- source-table: db.tbl
projection: id, id + 1 AS inc_idReference metadata columns
You can use the following predefined metadata columns in a projection rule as if they were regular data columns:
Do not define regular data columns with identical names to metadata columns.
Metadata column name | Data type | Description |
| String | The source table's namespace name. |
| String | The source table's schema name. |
| String | The source table's name. |
| String | The operation type of a data change event ( Important Do not use this metadata column as a primary key. Flink CDC update events always bundle the "before" ( |
Example: Add a computed column that stores the source table's fully qualified name.
transform:
- source-table: \.*.\.*
projection: \*, __namespace_name__ || __schema_name__ || __table_name__ AS identifierNamespace, schema, and table equivalents across database systems:
Database system | Namespace | Schema | Table |
MySQL | Database | - | Table |
Kafka | - | - | Topic |
SLS | - | Project | LogStore |
MongoDB | - | Database | Collection |
Paimon | - | Database | Table |
Hologres | - | Schema | Table |
StarRocks | Database | - | Table |
Doris | Database | - | Table |
Add filter rules
The Transform module uses a SQL-like syntax to define data filtering rules.
A filter rule is an expression that evaluates to a BOOLEAN type. It can reference any source table column or computed column.
Data change events for which a filter expression evaluates to FALSE will not sent downstream.
If you overwrite a source table column with a computed column in a projection rule, any reference to that column in a filter expression will refer to the computed column's value.
For example, the following transform rule is valid:
transform:
- source-table: db.tbl
projection: CAST(id AS VARCHAR) AS id
filter: CHAR_LENGTH(id) > 5The id referenced in the filter expression is the computed column, cast to the VARCHAR type.
Use converters after transform
Use the converter-after-transform parameter to process data changes after all transform rules are applied. Specify multiple converters in a comma-separated list, and Flink applies them sequentially in the defined order. Valid values:
Converter name | Description | Supported version |
| Converts deletes to inserts. | Ververica Runtime (VVR) 8.0.11 and later |
| Converts all column names of the table to lowercase. | VVR 11.1 and later |
Perform a soft delete
To perform a soft delete, combine the SOFT_DELETE converter with the __data_event_type__ metadata column. This approach ensures that data is not physically removed from the sink upon a delete event. Instead, Flink converts deletes to inserts, marking them with -D in the op_type column.
transform:
- source-table: db.tbl
projection: \*, __data_event_type__ AS op_type
converter-after-transform: SOFT_DELETE