This topic describes how to create, register, and use a user-defined table-valued function (UDTF) in Realtime Compute for Apache Flink.

Definition

A UDTF takes zero, one, or multiple scalar values as input parameters. These parameters can be variable-length parameters. UDTFs are similar to user-defined scalar functions (UDSFs) except that UDTFs can return any number of rows instead of a single value. Returned rows consist of one or more columns. Multiple rows or columns are returned each time a UDTF is called. For more information, see User-defined Functions.

Create a UDTF

Note Realtime Compute for Apache Flink provides examples of user-defined functions (UDFs) to facilitate your business development. The examples include how to implement UDSFs, user-defined aggregate functions (UDAFs), and user-defined table-valued functions (UDTFs). The development environment of the related version is configured in each example.
  1. Download and decompress ASI_UDX_Demo to your on-premises machine.
    Note ASI_UDX_Demo is provided at a third-party website. When you access the website, the website may fail to be accessed or access to the website may be delayed.
    After you decompress the package, the ASI_UDX-main folder is generated. Parameters in the path:
    • pom.xml: a project-level configuration file that describes the Maven coordinates, dependencies, rules that developers must follow, defect management system, organizations, and licenses of a project, as well as all other project-related factors.
    • \ASI_UDX-main\src\main\java\ASI_UDF\ASI_UDTF.java: the Java code for the sample UDTF.
  2. Open IntelliJ IDEA and choose File > Open. Select the extracted ASI_UDX-main folder and click OK.
  3. Double-click the ASI_UDTF.java file in the \ASI_UDX-main\src\main\java\ASI_UDTF directory, and make configurations in the file based on your business requirements.
    In this example, ASI_UDTF.java is configured with code to separate a string in a row into multiple columns of strings with vertical bars (|).
    package ASI_UDTF;
    
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.table.functions.TableFunction;
    
    public class ASI_UDTF extends TableFunction<Tuple2<String,String>> {
        public void eval(String str){
            String[] split = str.split("\\|");
            String name = split[0];
            String place = split[1];
            Tuple2<String,String> tuple2 = Tuple2.of(name,place);
            collect(tuple2);
        }
    }
    For more information about the data types and type inference mechanism supported by UDTFs, see Data Types and Type Inference.
    Note The preceding two documents are referenced from the documentation of Apache Flink 1.15. The data types and type inference mechanism supported by UDTFs may vary based on different major versions of Apache Flink. For more information about the data types and type inference mechanism supported by UDTFs in Apache Flink of a specific version, see the documentation of the Apache Flink version based on the mappings between Ververica Runtime (VVR) and Apache Flink versions. For more information about how to view the engine version of Apache Flink, see How do I query the engine version of Flink that is used by a deployment? .
    The following sample UDTFs return values of the common composite types Tuple and Row:
    • Tuple type
      TableFunction<Tuple2<String,Integer>
    • Row type
      @FunctionHint(output = @DataTypeHint("ROW<word STRING, length INT>"))
      public static class SplitFunction extends TableFunction<Row> {
      
        public void eval(String str) {
          for (String s : str.split(" ")) {
            // use collect(...) to emit a row
            collect(Row.of(s, s.length()));
          }
        }
      }
  4. Double-click the pom.xml file in the \ASI_UDX-main\ directory and make configurations in the file.
    In this example, pom.xml is configured with the information of main JAR dependencies of Apache Flink 1.11. Perform one of the following operations based on your business requirements:
    • If your business does not depend on other JAR packages, proceed to the next step without the need to configure the pom.xml file.
    • If your business depends on other JAR packages, add information of the required JAR packages to the pom.xml file.
    Apache Flink 1.11 depends on the following JAR packages:
    <dependencies>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-java_2.12</artifactId>
                <version>1.11.0</version>
                <!--<scope>provided</scope>-->
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table</artifactId>
                <version>1.11.0</version>
                <type>pom</type>
                <!--<scope>provided</scope>-->
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-core</artifactId>
                <version>1.11.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-common</artifactId>
                <version>1.11.0</version>
            </dependency>
        </dependencies>
  5. Go to the directory where the pom.xml file is stored. Then, run the following command to package the file:
    mvn package -Dcheckstyle.skip

    If the ASI_UDX-1.0-SNAPSHOT.jar package appears in the \ASI_UDX-main\target\ directory, the UDTF is created.

Register a UDTF

For more information about how to register a UDTF, see Manage UDFs.

Use a UDTF

After a UDTF is registered, you can perform the following steps to use the UDTF:
  1. Use Flink SQL to create a deployment. For more information, see Develop a draft.
    In the ASI_UDTF_Source table, the string in each row of the message field is separated into multiple columns by vertical bars (|). The following code provides an example:
    CREATE TEMPORARY TABLE ASI_UDTF_Source (
      `message`  VARCHAR
    ) WITH (
      'connector'='datagen'
    );
    
    CREATE TEMPORARY TABLE ASI_UDTF_Sink (
      name  VARCHAR,
      place  VARCHAR
    ) WITH (
      'connector' = 'blackhole'
    );
    
    INSERT INTO ASI_UDTF_Sink
    SELECT name,place
    FROM ASI_UDTF_Source,lateral table(ASI_UDTF(`message`)) as T(name,place);
  2. On the Deployments page in the console of fully managed Flink, find the deployment that you want to start and click Start in the Actions column.

    After the deployment is started, multiple columns of strings of the message field in the ASI_UDTF_Source table are inserted into the ASI_UDTF_Sink table. These columns of strings are separated by vertical bars (|).