Learn how to create, register, and use Java user-defined functions (UDFs) in Flink CDC jobs. When built-in functions don't meet your needs, custom UDFs provide flexible ways to extend data processing capabilities.
User-defined functions (UDFs)
When built-in Flink CDC functions don't cover your data transformation logic—such as encrypting or decrypting fields, parsing custom data formats, or calling a third-party library—write a Java UDF and call it just like any built-in function.
The overall workflow has three steps:
-
Define — Write a Java class that implements the UDF interface.
-
Register — Declare the UDF in the
pipelinesection of your Flink CDC job configuration. -
Use — Call the UDF in
projectionandfilterexpressions inside atransformblock.
Define a UDF
Add the following dependency to your pom.xml:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-common</artifactId>
<version>${Apache Flink CDC version}</version>
<scope>provided</scope>
</dependency>
Select the Apache Flink CDC version that matches your Ververica Runtime (VVR) engine version:
| VVR engine version | Apache Flink CDC version |
|---|---|
| 11.3 and later | 3.5.0 |
| 11.0 to 11.2 | 3.4.0 |
| 8.0.11 | 3.3.0 |
| 8.0.10 and earlier | 3.2.1 |
A Java class must satisfy the following requirements to work as a Flink CDC UDF:
-
Implements the
org.apache.flink.cdc.common.udf.UserDefinedFunctioninterface -
Has a public, parameterless constructor
-
Has at least one public method named
eval
Optionally, override the following methods for more control:
-
getReturnType— Explicitly specify the return type whenevalhas an ambiguous return type. -
openandclose— Add initialization and cleanup logic for lifecycle management.
The following example defines a UDF that increments an integer by 1:
public class AddOneFunctionClass implements UserDefinedFunction {
public Object eval(Integer num) {
return num + 1;
}
@Override
public DataType getReturnType() {
// The return type of eval is ambiguous (Object).
// Use getReturnType to declare the exact CDC type.
return DataTypes.INT();
}
@Override
public void open() throws Exception {
// Initialize resources here, for example, open a database connection.
}
@Override
public void close() throws Exception {
// Release resources here, for example, close a database connection.
}
}
Type mappings
The following table maps CDC column types to the corresponding Java types used in eval method parameters, return values, and getReturnType:
| CDC column type | Java class |
|---|---|
| BOOLEAN | java.lang.Boolean |
| TINYINT | java.lang.Byte |
| SMALLINT | java.lang.Short |
| INTEGER | java.lang.Integer |
| BIGINT | java.lang.Long |
| FLOAT | java.lang.Float |
| DOUBLE | java.lang.Double |
| DECIMAL | java.math.BigDecimal |
| DATE | java.time.LocalDate |
| TIME | java.time.LocalTime |
| TIMESTAMP | java.time.LocalDateTime |
| TIMESTAMP_TZ | java.time.ZonedDateTime |
| TIMESTAMP_LTZ | java.time.Instant |
| CHAR / VARCHAR / STRING | java.lang.String |
| BINARY / VARBINARY / BYTES | byte[] |
| ARRAY | java.util.List (element types map to the List generic parameter) |
| MAP | java.util.Map (key and value types map to the Map generic parameters) |
| ROW | java.util.List |
| VARIANT | org.apache.flink.cdc.common.types.variant.Variant |
The VARIANT class path differs from the Variant class path in Flink SQL.
Register a UDF
Add a user-defined-function entry to the pipeline section of your Flink CDC job configuration:
pipeline:
user-defined-function:
- name: inc
classpath: org.apache.flink.cdc.udf.examples.java.AddOneFunctionClass
- name: format
classpath: org.apache.flink.cdc.udf.examples.java.FormatFunctionClass
On the Configurations tab of your job draft, upload the JAR file for the specified classpath as an additional dependency in the More Configurations section. The name field sets the alias used in expressions—it doesn't need to match the class name.
Use a UDF
Call the registered UDF in projection and filter expressions exactly as you would a built-in function:
transform:
- source-table: db\.*
projection: "*, inc(inc(inc(id))) as inc_id, format(id, 'id -> %d') as formatted_id"
filter: inc(id) < 100
Compatibility with Flink SQL UDFs
Flink SQL UDFs that extend ScalarFunction can be registered and called directly as Flink CDC UDFs. The following limitations apply:
-
Parameterization: Parameterized
ScalarFunctionclasses are not supported. -
Type annotations: Flink-style
TypeInformationannotations are ignored. -
Lifecycle hooks:
openandcloselifecycle hooks are not called.