All Products
Search
Document Center

Realtime Compute for Apache Flink:Java UDTFs

Last Updated:Mar 26, 2026

A user-defined table-valued function (UDTF) takes zero or more scalar values as input and returns multiple rows as output. Each returned row can have one or more columns. This differs from a scalar function, which maps input to a single output value.

Flink distinguishes between several kinds of functions:

Function type Behavior
Scalar function Maps scalar values to a new scalar value
Table-valued function (UDTF) Maps scalar values to new rows
Aggregate function Maps scalar values from multiple rows to a new scalar value
Table aggregate function Maps scalar values from multiple rows to new rows

Use a UDTF when your logic must produce multiple output rows from a single input row — for example, splitting a delimited string into separate rows.

For background on Flink user-defined functions, see User-defined functions.

Prerequisites

Before you begin, ensure that you have:

  • Apache Maven installed on your local machine

  • IntelliJ IDEA installed on your local machine

  • A Flink SQL job environment set up in Realtime Compute for Apache Flink

Develop a UDTF

Note

Realtime Compute for Apache Flink provides a sample project (ASI_UDX_Demo) with pre-configured development environments and example implementations for user-defined scalar functions, user-defined aggregate functions (UDAFs), and UDTFs. Access to this third-party GitHub repository may occasionally be slow or unavailable.

  1. Download and decompress the ASI_UDX_Demo sample project to your local machine. After decompression, an ASI_UDX-main folder is created with the following files:

    • pom.xml: The Maven project configuration file, which describes the project's coordinates, dependencies, and build settings.

    • \ASI_UDX-main\src\main\java\ASI_UDF\ASI_UDTF.java: The sample Java code for the UDTF.

  2. In IntelliJ IDEA, click File > Open and open the decompressed ASI_UDX-main folder.

  3. Open \ASI_UDX-main\src\main\java\ASI_UDF\ASI_UDTF.java and implement your UDTF logic. A UDTF extends TableFunction<T>, where T is the output type. The eval() method must return void — instead of a return statement, call collect() to emit output rows. This differs from scalar functions, which use return. The following example splits a string into two columns using a vertical bar (|) as the delimiter:

    package ASI_UDTF;
    
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.table.functions.TableFunction;
    
    public class ASI_UDTF extends TableFunction<Tuple2<String, String>> {
        public void eval(String str) {
            String[] split = str.split("\\|");
            String name = split[0];
            String place = split[1];
            // emit one row with two columns
            collect(Tuple2.of(name, place));
        }
    }

    Declaring output types Flink supports two approaches for declaring the output schema of a UDTF:

    • Row type with `@FunctionHint` and `@DataTypeHint` (recommended): Explicitly declares field names and types, making the output schema clear to Flink and to readers.

      @FunctionHint(output = @DataTypeHint("ROW<word STRING, length INT>"))
      public static class SplitFunction extends TableFunction<Row> {
          public void eval(String str) {
              for (String s : str.split(" ")) {
                  // use collect(...) to emit a row
                  collect(Row.of(s, s.length()));
              }
          }
      }
    • Tuple type (type inferred from generics): Simpler to write, but column names are not explicit.

      TableFunction<Tuple2<String, Integer>>
    Note

    The data types and type inference mechanism supported by TableFunction vary across major Flink versions. The examples above follow the Flink 1.15 documentation (Data types and Type inference). Check the Flink documentation that matches your Ververica Runtime (VVR) version. To find your Flink version, see Storage management and operations.

  4. Open pom.xml in the ASI_UDX-main folder and configure your dependencies. The sample project includes the following Maven dependencies for Flink 1.11:

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>1.11.0</version>
            <!--<scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table</artifactId>
            <version>1.11.0</version>
            <type>pom</type>
            <!--<scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-core</artifactId>
            <version>1.11.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>1.11.0</version>
        </dependency>
    </dependencies>

    If your UDTF has no additional dependencies beyond those listed, skip to the next step. Otherwise, add the required dependency entries to pom.xml.

  5. In the directory that contains pom.xml, run the following command to package the project:

    mvn package -Dcheckstyle.skip

    If the build succeeds, ASI_UDX-1.0-SNAPSHOT.jar is generated in the \ASI_UDX-main\target\ directory. The UDTF development is complete.

Register a UDTF

For instructions on how to register a UDTF, see Manage user-defined functions (UDFs).

Use a UDTF

After registering the UDTF, use it in a Flink SQL job.

  1. Develop a Flink SQL job. For more information, see Job development map. The following SQL splits the message field in ASI_UDTF_Source into two columns using the | delimiter, then inserts the results into ASI_UDTF_Sink. The LATERAL TABLE syntax cross-joins each input row with the rows emitted by the UDTF.

    CREATE TEMPORARY TABLE ASI_UDTF_Source (
      `message` VARCHAR
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE ASI_UDTF_Sink (
      name  VARCHAR,
      place VARCHAR
    ) WITH (
      'connector' = 'blackhole'
    );
    
    INSERT INTO ASI_UDTF_Sink
    SELECT name, place
    FROM ASI_UDTF_Source, LATERAL TABLE(ASI_UDTF(`message`)) AS T(name, place);
  2. On the Operation Center > Job O&M page, find the target job and click Start in the Actions column. After the job starts, the strings in the message field of ASI_UDTF_Source are split by the | delimiter, and the resulting columns are inserted into ASI_UDTF_Sink.

What's next