All Products
Search
Document Center

Realtime Compute for Apache Flink:Transform

Last Updated:Mar 26, 2026

Configure the transform module in Flink CDC (Change Data Capture) jobs, including projection and filter syntax, metadata columns, and post-transform converters.

Limitations

Before configuring the transform module, note the following constraints:

  • Stateless restart required after changes: After modifying any transform rule, perform a stateless restart of the job for the changes to take effect.

  • Computed columns cannot reference each other: A computed column's expression cannot reference another computed column, regardless of definition order. For example, a, b AS c, c AS d is invalid.

  • Column pruning disables schema propagation: When projection explicitly lists columns without the * wildcard, the schema is static. Changes to the source schema (such as added or removed columns) are not reflected downstream.

  • Primary key columns cannot be removed by default: To reduce the number of upstream primary key columns, add the transform.allow.trimming.pk-columns configuration item to the pipeline: block.

  • Custom primary keys must cover source primary keys: To prevent data disorder during cross-partition writes, include the source table's primary key columns in your custom primary key definition.

  • `__data_event_type__` cannot be used as a primary key: A single CDC update event bundles both the before (-U) and after (+U) states, so this field is not unique within an event.

Parameters

Use the transform module to add, remove, or modify columns and filter rows during synchronization. Each transform rule targets one source table and supports the following parameters:

Parameter Required Description
source-table Yes Source table to transform. Supports regular expressions.
projection No Defines all output columns after transformation, using SQL SELECT-like syntax. If blank, no columns are added or removed. For available built-in functions, see Flink CDC built-in functions.
filter No Filters which rows are sent downstream, using SQL WHERE-like syntax. If blank, no rows are excluded.
primary-keys No Primary key columns for the sink table, as a comma-separated list. If blank, inherited from the source schema.
partition-keys No Partition key columns for the sink table, as a comma-separated list. If blank, inherited from the source schema.
table-options No Additional sink configuration as comma-separated key=value pairs — for example, bucket count or comments for a Paimon sink. Example: key1=value1,key2=value2
description No Human-readable description of the transformation rule.
converter-after-transform No Post-transform converters to apply after all transform rules, as a comma-separated list. Applied sequentially in the order defined. See Post-transform converters.

Quotation marks in projection and filter expressions

Projection and filter expressions do not require quotation marks in most cases:

transform:
  - projection: a, b, c
    description: equivalent to the quoted form below
  - projection: "a, b, c"
    description: quoted form — same result

If an expression starts with a special character such as * or ', enclose the entire expression in single (') or double (") quotation marks, or escape the special character with a backslash (\):

transform:
  - projection: *, 42       # Invalid — starts with *
  - projection: '*, 42'     # OK — quoted
  - projection: \*, 42      # OK — escaped

Field filtering

Use projection rules to control which columns are sent downstream. Projection rules support selecting specific columns, adding computed columns, and referencing metadata columns.

Replicate specific columns

List the column names in projection to send only those columns downstream:

transform:
  - source-table: db.tbl
    projection: col_1, col_3, col_4
    description: sync only col_1, col_3, and col_4 — col_2 is excluded
Important

Column pruning prevents source schema changes from reaching the sink. Because the projection rule lists columns explicitly, any columns added or removed in the source are not automatically reflected downstream.

Project all columns with wildcard

To forward all existing columns and automatically propagate future schema changes, include the * wildcard in projection.

If projection does not include *, the resulting schema is static. Schema changes in the source will not propagate correctly to the sink.

For example, \*, 'extras' AS extras appends an extra column after all source columns and continuously propagates schema changes:

transform:
  - source-table: db.tbl
    projection: \*, 'extras' AS extras
    description: append a static extras column while forwarding all source columns

Add a computed column

Use <Expression> AS <ColumnName> syntax in projection to define a computed column. The expression is evaluated for each data row.

transform:
  - source-table: db.tbl
    projection: id, id + 1 AS inc_id
    description: add inc_id as id + 1

Given the following input:

id
1
2

The output sent downstream is:

id inc_id
1 2
2 3
A computed column's expression cannot reference another computed column. For example, a, b AS c, c AS d is invalid — d cannot reference c.

Metadata columns

All connectors support the following predefined metadata columns. Reference them in projection or filter expressions as if they were regular data columns.

Metadata column Data type Description
__namespace_name__ String Namespace name of the source table
__schema_name__ String Schema name of the source table
__table_name__ String Name of the source table
__data_event_type__ String Operation type of a data change event: +I (insert), -U (update before), +U (update after), -D (delete)
Important

Do not define regular data columns with the same names as metadata columns.

Important

A single CDC update event bundles both -U and +U states. Do not use __data_event_type__ as a primary key.

Example: Add a fully qualified table identifier column

transform:
  - source-table: \.*.\.*
    projection: \*, __namespace_name__ || __schema_name__ || __table_name__ AS identifier
    description: append a column that stores the fully qualified source table name

Metadata mapping by database

The meaning of namespace, schema, and table varies by database system:

Database Namespace Schema Table
MySQL Database Table
Apache Kafka Topic
Simple Log Service (SLS) Project LogStore
MongoDB Database Collection
Paimon Database Table
Hologres Schema Table
StarRocks Database Table
Doris Database Table
PostgreSQL Database (requires table-id.include-database) Schema Table

Data filtering

Use the filter parameter to define row filtering rules. A filter rule is a SQL expression that evaluates to a BOOLEAN value. Data change events for which the expression evaluates to FALSE are not sent downstream.

Filter expressions can reference:

  • Any source table column

  • Any computed column defined in projection

  • Any metadata column (without explicitly including it in projection)

If a computed column in projection overwrites a source column (same name), any reference to that column in filter resolves to the computed column's value.

Example: Filter using a computed column

transform:
  - source-table: db.tbl
    projection: CAST(id AS VARCHAR) AS id
    filter: CHAR_LENGTH(id) > 5
    description: cast id to VARCHAR, then keep only rows where the string length exceeds 5

Here, id in the filter expression refers to the computed column (VARCHAR), not the original source column.

Advanced examples

Reference non-pruned columns and metadata

Example 1: Filter by a pruned column

Source schema: [INT a, INT b, BOOLEAN c]. Output only columns a and b, but include only rows where c is true:

transform:
  - source-table: db.tbl
    projection: a, b
    filter: c
    description: output a and b, include only rows where c is true

Although c is excluded from the projection output, it remains available as a filter condition.

Example 2: Filter by a metadata column

Exclude DELETE change events without adding __data_event_type__ to the output:

transform:
  - source-table: db.tbl
    projection: a, b
    filter: __data_event_type__ = '+I'
    description: forward only INSERT events

Overwrite existing columns

Define a field in projection with the same name as an upstream column to overwrite its value or type. This is the correct way to enforce type changes while preserving schema evolution.

Example 3: Enforce type conversion

Source schema: [INT a, INT b, BOOLEAN c]. Convert column a to STRING while keeping its name:

transform:
  - source-table: db.tbl
    projection: \*, CAST(a AS STRING)
    description: convert column a from INT to STRING

The sink schema becomes [STRING a, INT b, BOOLEAN c]. The original INT a is replaced by the STRING definition.

Reuse computed columns in filter conditions

Filter rules can reference computed column aliases defined in projection, which improves readability.

Example 4: Filter using a computed column alias

Define column d in projection, then reference it in filter:

transform:
  - source-table: db.tbl
    projection: a, b, c, a + b + c AS d
    filter: d > 100
    description: keep rows where the sum of a, b, and c exceeds 100

This produces the same result as writing filter: a + b + c > 100, but is easier to read and maintain.

Post-transform converters

Use the converter-after-transform parameter to apply additional processing after all transform rules are evaluated. List multiple converters as a comma-separated string — Flink applies them sequentially in the order defined.

Converter Description Minimum version
SOFT_DELETE Converts delete operations to inserts Ververica Runtime (VVR) 8.0.11
FIELD_NAME_LOWER_CASE Converts all column names to lowercase VVR 11.1

Perform a soft delete

Soft delete (logical deletion) marks deleted rows as inactive rather than removing them from the sink. Combine __data_event_type__ with the SOFT_DELETE converter to implement this pattern: Flink converts physical delete operations into inserts, with an op_type value of -D to indicate the row was deleted.

transform:
  - source-table: db.tbl
    projection: \*, __data_event_type__ AS op_type
    converter-after-transform: SOFT_DELETE
    description: implement soft delete — physical deletes become inserts with op_type = -D