All Products
Search
Document Center

Realtime Compute for Apache Flink:Flink CDC Transform module

Last Updated:Feb 10, 2026

This topic describes the supported syntax rules and built-in functions for the Transform module in Flink CDC data ingestion jobs.

Transform rule parameters

The Transform module lets you directly manipulate data columns. You can delete or extend existing columns and also filter out unwanted data during synchronization. Define Transform rules using the following parameters:

Parameter

Description

Required

Note

source-table

Specify the ancestor table to transform.

Yes

Regular expressions are supported.

projection

Specify the projection rule for the ancestor table. This defines all data columns after the ancestor table is transformed.

No

The syntax is similar to a SQL SELECT statement.

If empty, no columns are appended or deleted.

For more information, see Define projection rules. For available built-in functions, see Flink CDC built-in functions.

filter

Data row filtering rule.

No

The syntax is similar to a SQL WHERE statement.

If empty, no rows are filtered.

primary-keys

Specify the primary key columns after transformation.

No

If this parameter is left empty, the primary key definition is inherited from the source table's schema. To specify primary key columns, use a comma (,) to separate them.

Important

By default, deleting primary key column constraints from the ancestor is not supported. Add the transform.allow.trimming.pk-columns parameter to the pipeline: configuration to reduce ancestor primary key columns.

When customizing primary key columns, ensure that incoming data from the ancestor conforms to primary key constraints. Include the ancestor table's primary key columns in your custom definition to avoid data disorder issues during cross-partition writes.

partition-keys

Specify the partition key list after transformation.

No

If this parameter is left empty, the partition key definition is inherited from the source table's schema. To specify partition key columns, enter them as a comma (,)-separated list.

Important

When customizing partition key columns, ensure that incoming data from the ancestor conforms to primary key constraints to avoid data disorder issues during cross-partition writes.

table-options

Extra configuration information passed to the Sink.

No

Optional attribute list, such as the number of buckets or comments for a Paimon Sink.

Separate different configuration items with ,. Separate configuration item keys and values with =.

Configuration example:

key1=value1,key2=value2

description

Description information for the transform rule.

No

None.

converter-after-transform

A converter that performs additional data processing after transformation is complete.

No

For more information, see Use converters after transform.

Notes

  • After modifying statements in the Transform module, restart the job statelessly.

  • Typically, do not enclose projection and filter statements in quotation marks.

    transform:
      - projection: a, b, c
        # Equivalent to
      - projection: "a, b, c"

    However, if a projection expression begins with a special character such as * or ', the entire expression may not be parsed as a valid YAML string literal. In this case, you must manually enclose the entire expression in ' or ", or use \ to escape the special character:

    transform:
      - projection: *, 42      # Not valid YAML
      - projection: '*, 42'    # OK
      - projection: \*, 42     # OK  

Field screening

The data ingestion Transform module uses SQL-like syntax to define field screening (projection) rules. Use these rules to select specific columns, add computed columns, or include metadata columns.

Column pruning

To synchronize specific columns from the source table to the downstream, list the columns to sync in the projection rule. Unspecified columns are not sent downstream:

transform:
  - source-table: db.tbl
    projection: col_1, col_3, col_4 # col_2 is pruned
Important

Clipping columns from an ancestor table may trigger schema evolution, which can cause structural synchronization to be lost between the ancestor and descendant tables.

Wildcard characters

If you want to send all existing and any newly added columns from the source table downstream as-is, you can use the asterisk (*) wildcard character in the projection rule.

Note

If a projection rule does not use a wildcard character (*), the resulting schema is fixed and always matches the version specified in the projection rule.

For example, *, 'extras' AS extras appends an extra column to the end of the source schema and continuously propagates schema changes from the source to the sink.

transform:
  - source-table: db.tbl
    projection: \*, 'extras' AS extras

Computed columns

You can add a computed column using the <Expression> AS <ColName> syntax in a projection rule. The expression is evaluated for each upstream data record, and the result populates the corresponding column.

Note

A computed column's expression cannot reference another computed column. This limitation applies regardless of the order in which they are defined. For example, a, b AS c, c AS d is not valid.

For example, when receiving the data record [+I, id = 1] from the upstream table db.tbl, transform it into the data row [+I, id = 1, inc_id = 2] and send it to the downstream.

transform:
  - source-table: db.tbl
    projection: id, id + 1 AS inc_id

Metadata columns

When writing projection rules, use the following predefined metadata columns as regular data columns:

Important

Do not define regular data columns with the same names as metadata columns.

The following metadata columns apply to all connectors.

Metadata column name

Data type

Description

__namespace_name__

String

The namespace name of the source table corresponding to this data change record.

__schema_name__

String

The schema name of the source table corresponding to this data change record.

__table_name__

String

The table name of the source table corresponding to this data change record.

__data_event_type__

String

The operation type of this data change record (+I, -U, +U, -D).

Important

Because a CDC event always bundles the Update Before and Update After states for a single update into one event, the __data_event_type__ field contains both -U and +U within the same update event. Do not use this field as a primary key.

For example, write the fully qualified name of the ancestor table into a computed column and send it to the downstream.

transform:
  - source-table: \.*.\.*
    projection: \*, __namespace_name__ || __schema_name__ || __table_name__ AS identifier

The mapping relationship between Namespace, Schema, and Table names for each database connector is shown in the following table.

Database type

Namespace name

Schema name

Table name

MySQL

-

Database

Table

Kafka

-

-

Topic

SLS

-

Project

LogStore

MongoDB

-

Database

Collection

Paimon

-

Database

Table

Hologres

-

Schema

Table

StarRocks

-

Database

Table

Doris

-

Database

Table

Postgres

Database

Note

Effective when the table-id.include-database parameter is enabled.

Schema

Table

Data filtering

The data ingestion Transform module uses SQL-like syntax to define row filtering rules.

A filter rule must be an expression that evaluates to a BOOLEAN type. It can reference any column from the source table or computed columns.

If a data change record matches a Transform rule with a non-empty filter and the filter expression evaluates to FALSE, that row of data will not be sent downstream.

Note

If you use a computed column in a projection rule to overwrite an existing ancestor column, the filter expression references the computed column.

For example, consider the following Transform rule:

transform:
  - source-table: db.tbl
    projection: CAST(id AS VARCHAR) AS id
    filter: CHAR_LENGTH(id) > 5

This is valid because the id referenced in the filter expression is the computed column, cast to the VARCHAR type.

Advanced configuration rules

Reference non-pruned columns and metadata

Example 1: Filter based on pruned columns

The ancestor table schema is [INT a, INT b, BOOLEAN c]. To output columns a and b, but only retain rows where c is true, use the following configuration rule:

transform:
  - source-table: ...
    projection: a, b
    filter: c

Example 2: Filter based on metadata columns

Directly use metadata columns as filter conditions. No need to explicitly define them in projection. For example, filter out DELETE-type change data.

transform:
  - source-table: db.tbl
    projection: a, b
    filter: __data_event_type__ = '+I'

Overwrite existing columns

Define fields with the same name as ancestor columns in projection to overwrite the column's value or type.

Example: Force type conversion

The ancestor table schema is [INT a, INT b, BOOLEAN c]. To keep the column name unchanged but force column a to convert to a string type:

transform:
  - source-table: db.tbl
    projection: \*, CAST(a AS STRING)

The descendant table schema becomes [STRING a, INT b, BOOLEAN c]. The original column a is overwritten by the newly defined type.

Reuse computed columns in filter conditions

A filter can directly reference computed column aliases defined in projection.

Example: Reference computed results

Define a new column d in projection and directly use it in filter.

transform:
  - source-table: db.tbl
    projection: a, b, c, a + b + c AS d
    filter: d > 100

This configuration has the same effect as the following syntax, but is more readable:

transform:
  - source-table: ...
    projection: a, b, c, a + b + c AS d
    filter: a + b + c > 100

Converter after Transform

Use the converter-after-transform parameter to process data changes after all transform rules are applied. You can specify multiple converters by separating them with English commas (,), and messages are modified in the order of the converters. The following configuration values are supported:

Converter name

Function

Supported version

SOFT_DELETE

Converts delete changes to inserts.

VVR 8.0.11 and later versions.

FIELD_NAME_LOWER_CASE

Converts all table field names to lowercase.

VVR 11.1 and later versions.

Soft delete

The SOFT_DELETE converter, combined with the __data_event_type__ metadata column, enables soft deletion. For example, the following transform configuration performs a soft delete. Deleted data is not truly deleted in the downstream. Instead, deleted data is converted to inserts, and the corresponding data's op_type is updated to -D to indicate deletion.

transform: 
  - source-table: db.tbl
    projection: \*, __data_event_type__ AS op_type
    converter-after-transform: SOFT_DELETE