This topic describes how to build a development environment, write code, and publish a user-defined table function (UDTF) in Realtime Compute.

Note Currently, Realtime Compute in shared mode does not support UDXs. UDXs are supported only in exclusive mode.

Definition

Similar to a user-defined scalar function (USDF), a UDTF uses zero, one, or multiple scalar values as input parameters (including variable-length parameters). Different from a UDSF, a UDTF returns any number of rows, rather than a single value. The returned rows can consist of one or more columns.

Build a development environment

For more information, see Build a development environment.

Write code that implements business logic

A UDTF needs to implement the eval method in the TableFunction class. The open and close methods are optional. The following sample code is written in Java:

package com.hjc.test.blink.sql.udx;

import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.TableFunction;

public class SplitUdtf extends TableFunction<String> {

    // The open method is optional. To use the open method, you must add 'import org.apache.flink.table.functions.FunctionContext;'.
    @Override
    public void open(FunctionContext context) {
        // ... ...
        }

    public void eval(String str) {
        String[] split = str.split("\\|");
        for (String s : split) {
            collect(s);
        }
    }

    // The close method is optional.
    @Override
    public void close() {
        // ... ...
        }
}

Return multiple rows

A UDTF can convert the output result from a single row to multiple rows by calling the collect method multiple times.

Return multiple columns

A UDTF can also convert the output result from a single column to multiple columns. If you want a UDTF to return multiple columns, declare the return value as a tuple or row.

  • Declare the return value as a tuple.
    Realtime Compute supports Tuple1 to Tuple25, which define 1 to 25 fields. The following example is a UDTF that uses Tuple3 to return three fields:
    import org.apache.flink.api.java.tuple.Tuple3;
    import org.apache.flink.table.functions.TableFunction;
    
    // If the return value is declared as a tuple, you must explicitly declare the generic types of the tuple, such as STRING, LONG, and INTEGER in this example.
    public class ParseUdtf extends TableFunction<Tuple3<String, Long, Integer>> {
    
    public void eval(String str) {
    String[] split = str.split(",");
    // The code is for demonstration only. In practice, you may need to add more verification logic.
    String first = split[0];
    long second = Long.parseLong(split[1]);
    int third = Integer.parseInt(split[2]);
    Tuple3<String, Long, Integer> tuple3 = Tuple3.of(first, second, third);
    collect(tuple3);
    }
    }
    Note If the return value is declared as a tuple, it can contain a maximum of 25 fields, and the field value cannot be null.
  • Declare the return value as a row.
    The following example is a UDTF that uses a row to return three fields:
    import org.apache.flink.table.types.DataType;
    import org.apache.flink.table.types.DataTypes;
    import org.apache.flink.table.functions.TableFunction;
    import org.apache.flink.types.Row;
    
    public class ParseUdtf extends TableFunction<Row> {
    
    public void eval(String str) {
    String[] split = str.split(",");
    String first = split[0];
    long second = Long.parseLong(split[1]);
    int third = Integer.parseInt(split[2]);
    Row row = new Row(3);
    row.setField(0, first);
    row.setField(1, second);
    row.setField(2, third);
    collect(row);
    }
    
    @Override
    // If the return value is declared as a row, you must overload the getResultType method to explicitly declare the types of fields to be returned.
    public DataType getResultType(Object[] arguments, Class[] argTypes) {
    return DataTypes.createRowType(DataTypes.STRING, DataTypes.LONG, DataTypes.INT);
    }
    }
    Note If the return value is declared as a row, the field value can be null. However, you must overload the getResultType method.

Syntax

A UDTF supports CROSS JOIN and LEFT JOIN. When you use a UDTF, you must add the keywords LATERAL and TABLE. You must also specify an alias for the UDTF, for example, ParseUdtf in the preceding code.

CREATE FUNCTION parseUdtf AS 'com.alibaba.blink.sql.udtf.ParseUdtf';

CROSS JOIN: Each row of data in the left table correlates with each row of data generated by the UDTF. If the UDTF does not generate any data for a row, the row is not returned.

select S.id, S.content, T.a, T.b, T.c
from input_stream as S,
lateral table(parseUdtf(content)) as T(a, b, c);
LEFT JOIN: Each row of data in the left table correlates with each row of data generated by the UDTF. If the UDTF does not generate any data for a row, the UDTF fields in the row are returned as null.
Note A LEFT JOIN statement that uses a UDTF must end with on true.
select S.id, S.content, T.a, T.b, T.c
from input_stream as S
left join lateral table(parseUdtf(content)) as T(a, b, c) on true;

Publish and use a UDTF

For information about how to publish and use a UDTF, see Develop a job and Publish a job.

Example

-- UDTF str.split("\\|");
create function splitUdtf as 'com.hjc.test.blink.sql.udx.SplitUdtf';

create table sls_stream(
a INT,
b BIGINT,
c VARCHAR
) with (
type='sls',
endPoint='yourEndpoint',
accessKeyId='yourAccessKeyId',
accessKeySecret='yourAccessSecret',
startTime = '2017-07-04 00:00:00',
project='yourProjectName',
logStore='yourLogStoreName',
consumerGroup='consumerGroupTest2'
);

-- Pass the c field to splitUdtf to generate table T(s) that consists of one column and multiple rows after splitting. In the table name T(s), s indicates the field name.
create view v1 as
select a,b,c,s
from sls_stream,
lateral table(splitUdtf(c)) as T(s);

create table rds_output(
id INT,
len BIGINT,
content VARCHAR
) with (
type='rds',
url='yourDatabaseURL',
tableName='yourDatabaseTableName',
userName='yourDatabaseUserName',
password='yourDatabasePassword'
);

insert into rds_output
select
a,b,s
from v1;