This topic describes the supported syntax rules and built-in functions for the Transform module in Flink CDC data ingestion jobs.
Transform rule parameters
The Transform module lets you directly manipulate data columns. You can delete or extend existing columns and also filter out unwanted data during synchronization. Define Transform rules using the following parameters:
Parameter | Description | Required | Note |
| Specify the ancestor table to transform. | Yes | Regular expressions are supported. |
| Specify the projection rule for the ancestor table. This defines all data columns after the ancestor table is transformed. | No | The syntax is similar to a SQL SELECT statement. If empty, no columns are appended or deleted. For more information, see Define projection rules. For available built-in functions, see Flink CDC built-in functions. |
| Data row filtering rule. | No | The syntax is similar to a SQL WHERE statement. If empty, no rows are filtered. |
| Specify the primary key columns after transformation. | No | If this parameter is left empty, the primary key definition is inherited from the source table's schema. To specify primary key columns, use a comma ( Important By default, deleting primary key column constraints from the ancestor is not supported. Add the When customizing primary key columns, ensure that incoming data from the ancestor conforms to primary key constraints. Include the ancestor table's primary key columns in your custom definition to avoid data disorder issues during cross-partition writes. |
| Specify the partition key list after transformation. | No | If this parameter is left empty, the partition key definition is inherited from the source table's schema. To specify partition key columns, enter them as a comma ( Important When customizing partition key columns, ensure that incoming data from the ancestor conforms to primary key constraints to avoid data disorder issues during cross-partition writes. |
| Extra configuration information passed to the Sink. | No | Optional attribute list, such as the number of buckets or comments for a Paimon Sink. Separate different configuration items with Configuration example:
|
| Description information for the transform rule. | No | None. |
| A converter that performs additional data processing after transformation is complete. | No | For more information, see Use converters after transform. |
Notes
After modifying statements in the Transform module, restart the job statelessly.
Typically, do not enclose projection and filter statements in quotation marks.
transform: - projection: a, b, c # Equivalent to - projection: "a, b, c"However, if a projection expression begins with a special character such as
*or', the entire expression may not be parsed as a valid YAML string literal. In this case, you must manually enclose the entire expression in'or", or use\to escape the special character:transform: - projection: *, 42 # Not valid YAML - projection: '*, 42' # OK - projection: \*, 42 # OK
Field screening
The data ingestion Transform module uses SQL-like syntax to define field screening (projection) rules. Use these rules to select specific columns, add computed columns, or include metadata columns.
Column pruning
To synchronize specific columns from the source table to the downstream, list the columns to sync in the projection rule. Unspecified columns are not sent downstream:
transform:
- source-table: db.tbl
projection: col_1, col_3, col_4 # col_2 is prunedClipping columns from an ancestor table may trigger schema evolution, which can cause structural synchronization to be lost between the ancestor and descendant tables.
Wildcard characters
If you want to send all existing and any newly added columns from the source table downstream as-is, you can use the asterisk (*) wildcard character in the projection rule.
If a projection rule does not use a wildcard character (*), the resulting schema is fixed and always matches the version specified in the projection rule.
For example, *, 'extras' AS extras appends an extra column to the end of the source schema and continuously propagates schema changes from the source to the sink.
transform:
- source-table: db.tbl
projection: \*, 'extras' AS extrasComputed columns
You can add a computed column using the <Expression> AS <ColName> syntax in a projection rule. The expression is evaluated for each upstream data record, and the result populates the corresponding column.
A computed column's expression cannot reference another computed column. This limitation applies regardless of the order in which they are defined. For example, a, b AS c, c AS d is not valid.
For example, when receiving the data record [+I, id = 1] from the upstream table db.tbl, transform it into the data row [+I, id = 1, inc_id = 2] and send it to the downstream.
transform:
- source-table: db.tbl
projection: id, id + 1 AS inc_idMetadata columns
When writing projection rules, use the following predefined metadata columns as regular data columns:
Do not define regular data columns with the same names as metadata columns.
The following metadata columns apply to all connectors.
Metadata column name | Data type | Description |
| String | The namespace name of the source table corresponding to this data change record. |
| String | The schema name of the source table corresponding to this data change record. |
| String | The table name of the source table corresponding to this data change record. |
| String | The operation type of this data change record ( Important Because a CDC event always bundles the Update Before and Update After states for a single update into one event, the |
For example, write the fully qualified name of the ancestor table into a computed column and send it to the downstream.
transform:
- source-table: \.*.\.*
projection: \*, __namespace_name__ || __schema_name__ || __table_name__ AS identifierThe mapping relationship between Namespace, Schema, and Table names for each database connector is shown in the following table.
Database type | Namespace name | Schema name | Table name |
MySQL | - | Database | Table |
Kafka | - | - | Topic |
SLS | - | Project | LogStore |
MongoDB | - | Database | Collection |
Paimon | - | Database | Table |
Hologres | - | Schema | Table |
StarRocks | - | Database | Table |
Doris | - | Database | Table |
Postgres | Database Note Effective when the | Schema | Table |
Data filtering
The data ingestion Transform module uses SQL-like syntax to define row filtering rules.
A filter rule must be an expression that evaluates to a BOOLEAN type. It can reference any column from the source table or computed columns.
If a data change record matches a Transform rule with a non-empty filter and the filter expression evaluates to FALSE, that row of data will not be sent downstream.
If you use a computed column in a projection rule to overwrite an existing ancestor column, the filter expression references the computed column.
For example, consider the following Transform rule:
transform:
- source-table: db.tbl
projection: CAST(id AS VARCHAR) AS id
filter: CHAR_LENGTH(id) > 5This is valid because the id referenced in the filter expression is the computed column, cast to the VARCHAR type.
Advanced configuration rules
Reference non-pruned columns and metadata
Example 1: Filter based on pruned columns
The ancestor table schema is [INT a, INT b, BOOLEAN c]. To output columns a and b, but only retain rows where c is true, use the following configuration rule:
transform:
- source-table: ...
projection: a, b
filter: cExample 2: Filter based on metadata columns
Directly use metadata columns as filter conditions. No need to explicitly define them in projection. For example, filter out DELETE-type change data.
transform:
- source-table: db.tbl
projection: a, b
filter: __data_event_type__ = '+I'Overwrite existing columns
Define fields with the same name as ancestor columns in projection to overwrite the column's value or type.
Example: Force type conversion
The ancestor table schema is [INT a, INT b, BOOLEAN c]. To keep the column name unchanged but force column a to convert to a string type:
transform:
- source-table: db.tbl
projection: \*, CAST(a AS STRING)The descendant table schema becomes [STRING a, INT b, BOOLEAN c]. The original column a is overwritten by the newly defined type.
Reuse computed columns in filter conditions
A filter can directly reference computed column aliases defined in projection.
Example: Reference computed results
Define a new column d in projection and directly use it in filter.
transform:
- source-table: db.tbl
projection: a, b, c, a + b + c AS d
filter: d > 100This configuration has the same effect as the following syntax, but is more readable:
transform:
- source-table: ...
projection: a, b, c, a + b + c AS d
filter: a + b + c > 100Converter after Transform
Use the converter-after-transform parameter to process data changes after all transform rules are applied. You can specify multiple converters by separating them with English commas (,), and messages are modified in the order of the converters. The following configuration values are supported:
Converter name | Function | Supported version |
SOFT_DELETE | Converts delete changes to inserts. | VVR 8.0.11 and later versions. |
FIELD_NAME_LOWER_CASE | Converts all table field names to lowercase. | VVR 11.1 and later versions. |
Soft delete
The SOFT_DELETE converter, combined with the __data_event_type__ metadata column, enables soft deletion. For example, the following transform configuration performs a soft delete. Deleted data is not truly deleted in the downstream. Instead, deleted data is converted to inserts, and the corresponding data's op_type is updated to -D to indicate deletion.
transform:
- source-table: db.tbl
projection: \*, __data_event_type__ AS op_type
converter-after-transform: SOFT_DELETE