A user-defined table-valued function (UDTF) takes zero or more scalar values as input and returns multiple rows as output. Each returned row can have one or more columns. This differs from a scalar function, which maps input to a single output value.
Flink distinguishes between several kinds of functions:
| Function type | Behavior |
|---|---|
| Scalar function | Maps scalar values to a new scalar value |
| Table-valued function (UDTF) | Maps scalar values to new rows |
| Aggregate function | Maps scalar values from multiple rows to a new scalar value |
| Table aggregate function | Maps scalar values from multiple rows to new rows |
Use a UDTF when your logic must produce multiple output rows from a single input row — for example, splitting a delimited string into separate rows.
For background on Flink user-defined functions, see User-defined functions.
Prerequisites
Before you begin, ensure that you have:
-
Apache Maven installed on your local machine
-
IntelliJ IDEA installed on your local machine
-
A Flink SQL job environment set up in Realtime Compute for Apache Flink
Develop a UDTF
Realtime Compute for Apache Flink provides a sample project (ASI_UDX_Demo) with pre-configured development environments and example implementations for user-defined scalar functions, user-defined aggregate functions (UDAFs), and UDTFs. Access to this third-party GitHub repository may occasionally be slow or unavailable.
-
Download and decompress the ASI_UDX_Demo sample project to your local machine. After decompression, an
ASI_UDX-mainfolder is created with the following files:-
pom.xml: The Maven project configuration file, which describes the project's coordinates, dependencies, and build settings. -
\ASI_UDX-main\src\main\java\ASI_UDF\ASI_UDTF.java: The sample Java code for the UDTF.
-
-
In IntelliJ IDEA, click File > Open and open the decompressed
ASI_UDX-mainfolder. -
Open
\ASI_UDX-main\src\main\java\ASI_UDF\ASI_UDTF.javaand implement your UDTF logic. A UDTF extendsTableFunction<T>, whereTis the output type. Theeval()method must returnvoid— instead of areturnstatement, callcollect()to emit output rows. This differs from scalar functions, which usereturn. The following example splits a string into two columns using a vertical bar (|) as the delimiter:package ASI_UDTF; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.table.functions.TableFunction; public class ASI_UDTF extends TableFunction<Tuple2<String, String>> { public void eval(String str) { String[] split = str.split("\\|"); String name = split[0]; String place = split[1]; // emit one row with two columns collect(Tuple2.of(name, place)); } }Declaring output types Flink supports two approaches for declaring the output schema of a UDTF:
-
Row type with `@FunctionHint` and `@DataTypeHint` (recommended): Explicitly declares field names and types, making the output schema clear to Flink and to readers.
@FunctionHint(output = @DataTypeHint("ROW<word STRING, length INT>")) public static class SplitFunction extends TableFunction<Row> { public void eval(String str) { for (String s : str.split(" ")) { // use collect(...) to emit a row collect(Row.of(s, s.length())); } } } -
Tuple type (type inferred from generics): Simpler to write, but column names are not explicit.
TableFunction<Tuple2<String, Integer>>
NoteThe data types and type inference mechanism supported by
TableFunctionvary across major Flink versions. The examples above follow the Flink 1.15 documentation (Data types and Type inference). Check the Flink documentation that matches your Ververica Runtime (VVR) version. To find your Flink version, see Storage management and operations. -
-
Open
pom.xmlin theASI_UDX-mainfolder and configure your dependencies. The sample project includes the following Maven dependencies for Flink 1.11:<dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.12</artifactId> <version>1.11.0</version> <!--<scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table</artifactId> <version>1.11.0</version> <type>pom</type> <!--<scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> <version>1.11.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>1.11.0</version> </dependency> </dependencies>If your UDTF has no additional dependencies beyond those listed, skip to the next step. Otherwise, add the required dependency entries to
pom.xml. -
In the directory that contains
pom.xml, run the following command to package the project:mvn package -Dcheckstyle.skipIf the build succeeds,
ASI_UDX-1.0-SNAPSHOT.jaris generated in the\ASI_UDX-main\target\directory. The UDTF development is complete.
Register a UDTF
For instructions on how to register a UDTF, see Manage user-defined functions (UDFs).
Use a UDTF
After registering the UDTF, use it in a Flink SQL job.
-
Develop a Flink SQL job. For more information, see Job development map. The following SQL splits the
messagefield inASI_UDTF_Sourceinto two columns using the|delimiter, then inserts the results intoASI_UDTF_Sink. TheLATERAL TABLEsyntax cross-joins each input row with the rows emitted by the UDTF.CREATE TEMPORARY TABLE ASI_UDTF_Source ( `message` VARCHAR ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE ASI_UDTF_Sink ( name VARCHAR, place VARCHAR ) WITH ( 'connector' = 'blackhole' ); INSERT INTO ASI_UDTF_Sink SELECT name, place FROM ASI_UDTF_Source, LATERAL TABLE(ASI_UDTF(`message`)) AS T(name, place); -
On the Operation Center > Job O&M page, find the target job and click Start in the Actions column. After the job starts, the strings in the
messagefield ofASI_UDTF_Sourceare split by the|delimiter, and the resulting columns are inserted intoASI_UDTF_Sink.