All Products
Search
Document Center

Realtime Compute for Apache Flink:Flink CDC transform module

Last Updated:Oct 21, 2025

This topic describes the syntax for the transform module in Flink CDC data ingestion job drafts.

Parameters

The transform module lets you add, remove, or modify columns, and filter data during synchronization. Use the following parameters to define transform rules:

Parameter

Description

Required

Note

source-table

Specifies the source table to transform.

Yes

Regular expressions are supported.

projection

Specifies the projection rule for the source table. This defines the output columns after the transformation.

No

The syntax is similar to a SQL SELECT statement.

If this parameter is left empty, no columns are added or deleted.

For more information, see Define projection rules. For available built-in functions, see Flink CDC built-in functions.

filter

Specifies the rule for filtering data rows.

No

The syntax is similar to a SQL WHERE clause.

If this parameter is left empty, no rows will be excluded.

primary-keys

Specifies the primary key columns for the sink table.

No

If this parameter is left empty, the primary key definition will be inherited from the source table's schema. To specify primary key columns, enter them as a comma-separated list.

Important

If you define this parameter, ensure all ingested data adheres to the primary key constraints. To avoid data disorder issues, particularly with cross-partition writes, we recommend including the source table's primary key columns in your custom definition.

partition-keys

Specifies the partition key columns for the sink table.

No

If this parameter is left empty, the partition key definition will be inherited from the source table's schema. To specify partition key columns, enter them as a comma-separated list.

Important

When defining custom partition key columns, ensure that all ingested data adheres to the primary key constraints. This can prevent data disorder issues during cross-partition writes.

table-options

Specifies additional configuration items to pass to the sink.

No

Optional properties, such as the number of buckets or comments for a Paimon sink.

Use commas (,) to separate configuration items and equal signs (=) to separate keys and values.

Example:

key1=value1,key2=value2

description

A description of the transformation rule.

No

N/A.

converter-after-transform

A converter that performs additional processing after the transformation.

No

See Use converters after transform.

Usage notes

  • After you modify statements in the transform module, perform a stateless restart for the job.

  • Typically, you do not need to enclose projection and filter statements in quotation marks.

    transform:
      - projection: a, b, c
        # The preceding line is equivalent to the following line.
      - projection: "a, b, c"

    However, to ensure correct parsing of a projection expression begins with a special character (e.g., * or '), enclose the entire expression in single (') or double (") quotation marks, or escape the special character with a backslash (\):

    transform:
      - projection: *, 42      # Invalid
      - projection: '*, 42'    # OK
      - projection: \*, 42     # OK  

Define projection rules

The transform module uses a SQL-like syntax to define projection rules. You can use these rules to sync a selection of columns, add computed columns, or reference metadata columns.

Perform column pruning

To sync specific columns from source to sink, specify them in the projection rule. Unspecified columns are excluded:

transform:
  - source-table: db.tbl
    projection: col_1, col_3, col_4 # col_2 will be excluded
Important

Column pruning can prevent source schema updates from reaching the sink. Because column pruning relies on an explicit list of columns to sync, changes to the source schema (like added or removed columns) will not automatically be reflected downstream.

Use wildcards

To send all existing and any newly added columns downstream as-is, use the wildcard character (*) in the projection rule.

Note

Using an explicit list of columns instead of the wildcard (*) results in a static schema. This means new columns added to the source table will not be automatically included in the sink.

For example, *, 'extras' AS extras appends an extra column to the end of the source schema and continuously propagates schema changes from the source to the sink.

transform:
  - source-table: db.tbl
    projection: \*, 'extras' AS extras

Add computed columns

You can add a computed column using the <Expression> AS <ColName> syntax within the projection rule. This expression is evaluated for each data row, and its result populates the computed column.

Note

A computed column's expression cannot reference another computed column. This limitation applies regardless of the order in which they are defined. For example, a, b AS c, c AS d is not valid.

For example, transform [+I, id = 1] from the source table db.tbl into [+I, id = 1, inc_id = 2], and ingest it to the sink table:

transform:
  - source-table: db.tbl
    projection: id, id + 1 AS inc_id

Reference metadata columns

You can use the following predefined metadata columns in a projection rule as if they were regular data columns:

Important

Do not define regular data columns with identical names to metadata columns.

Metadata column name

Data type

Description

__namespace_name__

String

The source table's namespace name.

__schema_name__

String

The source table's schema name.

__table_name__

String

The source table's name.

__data_event_type__

String

The operation type of a data change event (+I, -U, +U, -D).

Important

Do not use this metadata column as a primary key. Flink CDC update events always bundle the "before" (-U) and "after" (+U) states of a single update into one event. Therefore, __data_event_type__ will contain both -U and +U within the same update event.

Example: Add a computed column that stores the source table's fully qualified name.

transform:
  - source-table: \.*.\.*
    projection: \*, __namespace_name__ || __schema_name__ || __table_name__ AS identifier

Namespace, schema, and table equivalents across database systems:

Database system

Namespace

Schema

Table

MySQL

Database

-

Table

Kafka

-

-

Topic

SLS

-

Project

LogStore

MongoDB

-

Database

Collection

Paimon

-

Database

Table

Hologres

-

Schema

Table

StarRocks

Database

-

Table

Doris

Database

-

Table

Add filter rules

The Transform module uses a SQL-like syntax to define data filtering rules.

A filter rule is an expression that evaluates to a BOOLEAN type. It can reference any source table column or computed column.

Data change events for which a filter expression evaluates to FALSE will not sent downstream.

Note

If you overwrite a source table column with a computed column in a projection rule, any reference to that column in a filter expression will refer to the computed column's value.

For example, the following transform rule is valid:

transform:
  - source-table: db.tbl
    projection: CAST(id AS VARCHAR) AS id
    filter: CHAR_LENGTH(id) > 5

The id referenced in the filter expression is the computed column, cast to the VARCHAR type.

Use converters after transform

Use the converter-after-transform parameter to process data changes after all transform rules are applied. Specify multiple converters in a comma-separated list, and Flink applies them sequentially in the defined order. Valid values:

Converter name

Description

Supported version

SOFT_DELETE

Converts deletes to inserts.

Ververica Runtime (VVR) 8.0.11 and later

FIELD_NAME_LOWER_CASE

Converts all column names of the table to lowercase.

VVR 11.1 and later

Perform a soft delete

To perform a soft delete, combine the SOFT_DELETE converter with the __data_event_type__ metadata column. This approach ensures that data is not physically removed from the sink upon a delete event. Instead, Flink converts deletes to inserts, marking them with -D in the op_type column.

transform: 
  - source-table: db.tbl
    projection: \*, __data_event_type__ AS op_type
    converter-after-transform: SOFT_DELETE