This topic describes how to build a development environment, write code, and publish a user-defined table-valued function (UDTF) in Realtime Compute for Apache Flink.

Note Only Realtime Compute for Apache Flink in exclusive mode supports user-defined extensions (UDXs).

Definition

Similar to a user-defined scalar function (UDF), a UDTF uses zero, one, or multiple scalar values as input parameters (including variable-length parameters). Different from a UDF, a UDTF returns any number of rows, rather than a single value. The returned rows can consist of one or more columns.

Build a development environment

For more information, see Build a development environment.

Write code that implements business logic

A UDTF needs to implement the eval method in the TableFunction class. The open and close methods are optional. The following Java code is an example:
package com.hjc.test.blink.sql.udx;

import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.TableFunction;

public class SplitUdtf extends TableFunction<String> {

    // The open method is optional. To use the open method, you must add 'import org.apache.flink.table.functions.FunctionContext;' to the code. 
    @Override
    public void open(FunctionContext context) {
        // ... ...
        }

    public void eval(String str) {
        String[] split = str.split("\\|");
        for (String s : split) {
            collect(s);
        }
    }

    // The close method is optional. 
    @Override
    public void close() {
        // ... ...
        }
}

Return multiple rows

A UDTF can convert the output result from a single row to multiple rows by calling the collect method multiple times.

Return multiple columns

A UDTF can also convert the output result from a single column to multiple columns. If you want a UDTF to return multiple columns, declare the return value as a tuple or row. The following examples show how to declare a return value as a tuple and how to declare a return value as a row.
  • Declare the return value as a tuple.
    Realtime Compute for Apache Flink supports Tuple1 to Tuple25, which define 1 to 25 fields. The following example is a UDTF that uses Tuple3 to return three fields:
    import org.apache.flink.api.java.tuple.Tuple3;
    import org.apache.flink.table.functions.TableFunction;
    
    // If the return value is declared as a tuple, you must explicitly declare the generic types of the tuple, such as STRING, LONG, and INTEGER in this example. 
    public class ParseUdtf extends TableFunction<Tuple3<String, Long, Integer>> {
    
    public void eval(String str) {
    String[] split = str.split(",");
    // The following code is used for reference only. In actual scenarios, you must add code that implements verification logic. 
    String first = split[0];
    long second = Long.parseLong(split[1]);
    int third = Integer.parseInt(split[2]);
    Tuple3<String, Long, Integer> tuple3 = Tuple3.of(first, second, third);
    collect(tuple3);
    }
    }
    Note If the return value is declared as a tuple, column values cannot be null and a maximum of 25 columns are allowed.
  • Declare the return value as a row.
    For example, you can enable this feature to return three columns. The sample code is an example.
    import org.apache.flink.table.types.DataType;
    import org.apache.flink.table.types.DataTypes;
    import org.apache.flink.table.functions.TableFunction;
    import org.apache.flink.types.Row;
    
    public class ParseUdtf extends TableFunction<Row> {
    
    public void eval(String str) {
    String[] split = str.split(",");
    String first = split[0];
    long second = Long.parseLong(split[1]);
    int third = Integer.parseInt(split[2]);
    Row row = new Row(3);
    row.setField(0, first);
    row.setField(1, second);
    row.setField(2, third);
    collect(row);
    }
    
    @Override
    // If the return value is declared as a row, you must overload the getResultType method to explicitly declare the data type of the return value. 
    public DataType getResultType(Object[] arguments, Class[] argTypes) {
    return DataTypes.createRowType(DataTypes.STRING, DataTypes.LONG, DataTypes.INT);
    }
    }
    Note If the return value is declared as a row, the field value can be null. However, you must overload the getResultType method.

SQL syntax

A UDTF supports CROSS JOIN and LEFT JOIN. When you use a UDTF, you must add the keywords LATERAL and TABLE. You must also specify an alias for the UDTF, such as ParseUdtf in the preceding code.
CREATE FUNCTION parseUdtf AS 'com.alibaba.blink.sql.udtf.ParseUdtf';
  • cross join
    Each row in the left table is joined with a row of data that is generated by the UDTF. If the UDTF does not generate any data for a row, the row is not returned.
    select S.id, S.content, T.a, T.b, T.c
    from input_stream as S,
    lateral table(parseUdtf(content)) as T(a, b, c);
  • left join
    Each row in the left table is joined with a row of data that is generated by the UDTF. If the UDTF does not generate any data for a row, the UDTF fields in the row are filled with null.
    Note A LEFT JOIN statement that uses a UDTF must end with on true.
    select S.id, S.content, T.a, T.b, T.c
    from input_stream as S
    left join lateral table(parseUdtf(content)) as T(a, b, c) on true;

Register and use resources

  1. Log on to the Realtime Compute development platform.
  2. In the top navigation bar, click Development.
  3. In the left-side navigation pane, click the Resources tab.
  4. In the upper-right corner of the Resources pane, click Create Resource.
  5. In the Upload Resource dialog box, configure resource parameters.
    Parameter Description
    Location You can upload JAR packages only from your on-premises machine in the Realtime Compute for Apache Flink console.
    Note The maximum size of a JAR package that can be uploaded from your on-premises machine is 300 MB. If the JAR package exceeds 300 MB, you must upload it to the Object Storage Service (OSS) bucket that is bound to your cluster or use an API to upload it.
    Resource Click Upload Resource to select the resource that you want to reference.
    Resource Name Enter a name for the resource.
    Resource Description Enter a resource description.
    Resource Type Select the type of the resource. Valid values: JAR, DICTIONARY, and PYTHON.
  6. In the Resources pane, find the new resource, and move the pointer over More in the Actions column.
  7. In the drop-down list, select Reference.
  8. In the code editor, declare the UDX at the beginning. The following statement is an example:
    CREATE FUNCTION stringLengthUdf AS 'com.hjc.test.blink.sql.udx.StringLengthUdf';

Publish and use a UDTF

For more information about how to publish and use a UDTF, see Publish a job and Start a job.

Example

-- UDTF str.split("\\|");
create function splitUdtf as 'com.hjc.test.blink.sql.udx.SplitUdtf';

create table sls_stream(
a INT,
b BIGINT,
c VARCHAR
) with (
type='sls',
endPoint='yourEndpoint',
accessKeyId='yourAccessKeyId',
accessKeySecret='yourAccessSecret',
startTime = '2017-07-04 00:00:00',
project='yourProjectName',
logStore='yourLogStoreName',
consumerGroup='consumerGroupTest2'
);

-- Use the splitUdtf function to extract data from the c field. Table T(s) with multiple rows and one column is returned. s is the name of the column. 
create view v1 as
select a,b,c,s
from sls_stream,
lateral table(splitUdtf(c)) as T(s);

create table rds_output(
id INT,
len BIGINT,
content VARCHAR
) with (
type='rds',
url='yourDatabaseURL',
tableName='yourDatabaseTableName',
userName='yourDatabaseUserName',
password='yourDatabasePassword'
);

insert into rds_output
select
a,b,s
from v1;