A user-defined scalar function (UDSF) maps zero, one, or more scalar values to a new scalar value, producing a one-to-one relationship between input rows and output values. Use a UDSF when built-in Flink SQL functions cannot express your custom logic.
For an overview of all user-defined function (UDF) types, see User-defined functions.
How it works
A UDSF extends Apache Flink's ScalarFunction class and implements one or more eval() methods. Flink calls eval() once per input row and uses the return value as the output scalar.
public class SubstringFunction extends ScalarFunction {
public String eval(String s, Integer begin, Integer end) {
return s.substring(begin, end);
}
}
eval() method requirements:
-
Must be declared
public -
Supports method overloading — define multiple
eval()signatures for different input types -
Supports varargs (e.g.,
eval(Integer...)) -
Use boxed primitives (e.g.,
Integerinstead ofint) to handleNULLinputs correctly
Prerequisites
Before you begin, make sure you have:
-
IntelliJ IDEA installed
-
Apache Maven installed
-
Access to a Realtime Compute for Apache Flink workspace
Develop a UDSF
Flink provides UDF examples that pre-configure the development environment. The examples cover UDSFs, user-defined aggregate functions (UDAFs), and user-defined table-valued functions (UDTFs).
-
Download and decompress the ASI_UDX_Demo example to your local machine. After decompression, the
ASI_UDX-mainfolder is created with the following structure:-
pom.xml: The Maven project configuration file. It defines the project's coordinates, dependencies, build rules, and related metadata. -
\ASI_UDX-main\src\main\java\ASI_UDF\ASI_UDF.java: The sample Java implementation of the UDSF.
ASI_UDX_Demo is hosted on a third-party website. You may experience access failures or delays.
-
-
In IntelliJ IDEA, click File > Open and select the
ASI_UDX-mainfolder. -
Open
\ASI_UDX-main\src\main\java\ASI_UDF\ASI_UDF.javaand update theeval()method with your custom logic. The sample implementation extracts characters from thebeginposition to theendposition of each input string:package ASI_UDF; import org.apache.flink.table.functions.ScalarFunction; public class ASI_UDF extends ScalarFunction { public String eval(String s, Integer begin, Integer end) { return s.substring(begin, end); } } -
Open
\ASI_UDX-main\pom.xmland configure the Maven dependencies for your Flink version. The following example shows the main JAR package dependencies for Flink 1.11. If your UDSF does not depend on additional JAR packages, skip this step.<dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.12</artifactId> <version>1.11.0</version> <!--<scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table</artifactId> <version>1.11.0</version> <type>pom</type> <!--<scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> <version>1.11.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>1.11.0</version> </dependency> </dependencies> -
In the directory that contains the
pom.xmlfile, run the following command to package the project:mvn package -Dcheckstyle.skipAfter the build completes,
ASI_UDX-1.0-SNAPSHOT.jaris created in the\ASI_UDX-main\target\directory.
Register a UDSF
To register the JAR package as a UDSF, see Manage user-defined functions (UDFs).
Use a UDSF
After registration, call the UDSF in a Flink SQL job.
-
Create a Flink SQL job. For guidance, see Job development map. The following sample SQL extracts characters from the second to the fourth position of the string in the
afield ofASI_UDSF_Sourceand writes the results toASI_UDSF_Sink:CREATE TEMPORARY TABLE ASI_UDSF_Source ( a VARCHAR, b INT, c INT ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE ASI_UDSF_Sink ( a VARCHAR ) WITH ( 'connector' = 'blackhole' ); INSERT INTO ASI_UDSF_Sink SELECT ASI_UDSF(a, 2, 4) FROM ASI_UDSF_Source; -
On the Operation Center > Job O&M page, find your job and click Start in the Actions column. After the job starts, the characters from the second to the fourth position of the
afield in each row ofASI_UDSF_Sourceare inserted intoASI_UDSF_Sink.