This topic describes how to create and use user-defined functions (UDFs).
Flink CDC UDFs
If Flink CDC built-in functions do not meet your requirements, create custom Java UDFs for your Data Ingestion jobs. You can call these UDFs just like built-in functions.
You must upload the JAR file for the specified classpath as an additional dependency on the job draft's Configurations tab.
Define a UDF
To create a UDF class, add the following dependency to your pom.xml file:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-common</artifactId>
<version>${Apache Flink CDC version}</version>
<scope>provided</scope>
</dependency>Use the following table to select the correct Apache Flink CDC version based on your Ververica Runtime (VVR) engine version:
VVR engine version | Apache Flink CDC version |
VVR 11.0+ | 3.4.0 |
VVR 8.0.11 | 3.3.0 |
VVR 8.0.10 and earlier | 3.2.1 |
A Java class must meet the following requirements to be used as a UDF in a Flink CDC Data Ingestion job:
Implements the
org.apache.flink.cdc.common.udf.UserDefinedFunctioninterface.Has a public, parameterless constructor.
Contains at least one public method named
eval.
You can override the following methods in your UDF class for more granular control:
Override the
getReturnTypemethod to explicitly specify the function's return type.Override the
openandclosemethods to implement lifecycle functions.
For example, the following UDF definition increments an integer parameter by 1 and returns the result.
public class AddOneFunctionClass implements UserDefinedFunction {
public Object eval(Integer num) {
return num + 1;
}
@Override
public DataType getReturnType() {
// The return type of the eval function is ambiguous.
// You must explicitly specify the type with getReturnType.
return DataTypes.INT();
}
@Override
public void open() throws Exception {
// ...
}
@Override
public void close() throws Exception {
// ...
}
}Type mappings
The following table lists the mapping between the parameter and return value types of the eval method in UDF classes and the return value of the getReturnType method.
Flink CDC type | Java class | Notes |
BOOLEAN | java.lang.Boolean | |
TINYINT | java.lang.Byte | |
SMALLINT | java.lang.Short | |
INTEGER | java.lang.Integer | |
BIGINT | java.lang.Long | |
FLOAT | java.lang.Float | |
DOUBLE | java.lang.Double | |
DECIMAL | java.math.BigDecimal | |
DATE | java.time.LocalDate | |
TIME | java.time.LocalTime | |
TIMESTAMP | java.time.LocalDateTime | |
TIMESTAMP_TZ | java.time.ZonedDateTime | |
TIMESTAMP_LTZ | java.time.Instant | |
CHAR VARCHAR STRING | java.lang.String | |
BINARY VARBINARY BYTES | byte[] | |
ARRAY | java.util.List | The element types within the ARRAY are mapped to the generic parameters of the |
MAP | java.util.Map | The key and value types of the MAP are mapped to the generic parameters of the |
ROW | java.util.List |
Register a Flink CDC UDF
To register a UDF, add a definition to the pipeline module of your Data Ingestion job draft:
pipeline:
user-defined-function:
- name: inc
classpath: org.apache.flink.cdc.udf.examples.java.AddOneFunctionClass
- name: format
classpath: org.apache.flink.cdc.udf.examples.java.FormatFunctionClassTo use a UDF, upload the JAR file for the specified classpath to the job's additional dependencies. Steps: Go to the Development Console, open your job draft, click the Configurations tab on the right, and upload the JAR in the Additional Dependencies field.
You can specify a custom name for your UDF here. It does not need to match the UDF class name.
Use a Flink CDC UDF
After registering the UDF, you can call it directly in projection and filter expressions, just like a built-in function:
transform:
- source-table: db.\.*
projection: "*, inc(inc(inc(id))) as inc_id, format(id, 'id -> %d') as formatted_id"
filter: inc(id) < 100Compatibility with Flink SQL UDFs
Flink SQL UDFs that extend ScalarFunction can also be registered and used directly as Flink CDC UDFs, with the following limitations:
Parameterized
ScalarFunctionclasses are not supported.Flink-style
TypeInformationannotations are ignored.The
openandcloselifecycle hooks are not called.