This topic describes how to build a development environment, write code, and publish a user-defined table-valued function (UDTF) in Realtime Compute for Apache Flink.
Description
Similar to a user-defined scalar function (UDF), a UDTF uses zero, one, or multiple scalar values as input parameters (including variable-length parameters). Different from a UDF, a UDTF returns any number of rows, rather than a single value. The returned rows can consist of one or more columns.
Build a development environment
For more information, see Build a development environment.
Write business logic code
package com.hjc.test.blink.sql.udx;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.TableFunction;
public class SplitUdtf extends TableFunction<String> {
// The open method is optional. To use the open method, you must add 'import org.apache.flink.table.functions.FunctionContext;' to the code.
@Override
public void open(FunctionContext context) {
// ... ...
}
public void eval(String str) {
String[] split = str.split("\\|");
for (String s : split) {
collect(s);
}
}
// The close method is optional.
@Override
public void close() {
// ... ...
}
}
Return multiple rows
A UDTF can convert the output result from a single row to multiple rows by calling
the collect
method multiple times.
Return multiple columns
- Declare the return value as a tuple.
Realtime Compute for Apache Flink supports Tuple1 to Tuple25, which define 1 to 25 fields. The following example is a UDTF that uses Tuple3 to return three fields:
import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.table.functions.TableFunction; // If the return value is declared as a tuple, you must explicitly declare the generic types of the tuple, such as STRING, LONG, and INTEGER in this example. public class ParseUdtf extends TableFunction<Tuple3<String, Long, Integer>> { public void eval(String str) { String[] split = str.split(","); // The code is for demonstration only. In practice, you may need to add more verification logic. String first = split[0]; long second = Long.parseLong(split[1]); int third = Integer.parseInt(split[2]); Tuple3<String, Long, Integer> tuple3 = Tuple3.of(first, second, third); collect(tuple3); } }
Note If the return value is declared as a tuple, it can contain a maximum of 25 fields, and the field value cannot be null. - Declare the return value as a row.
The following example is a UDTF that uses a row to return three fields:
import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.DataTypes; import org.apache.flink.table.functions.TableFunction; import org.apache.flink.types.Row; public class ParseUdtf extends TableFunction<Row> { public void eval(String str) { String[] split = str.split(","); String first = split[0]; long second = Long.parseLong(split[1]); int third = Integer.parseInt(split[2]); Row row = new Row(3); row.setField(0, first); row.setField(1, second); row.setField(2, third); collect(row); } @Override // If the return value is declared as a row, you must overload the getResultType method to explicitly declare the types of fields to be returned. public DataType getResultType(Object[] arguments, Class[] argTypes) { return DataTypes.createRowType(DataTypes.STRING, DataTypes.LONG, DataTypes.INT); } }
Note If the return value is declared as a row, the field value can be null. However, you must overload thegetResultType
method.
SQL syntax
LATERAL
and TABLE
. You must also specify an alias for the UDTF, for example, ParseUdtf
in the preceding code.CREATE FUNCTION parseUdtf AS 'com.alibaba.blink.sql.udtf.ParseUdtf';
- CROSS JOIN
Each row in the left table is joined with a row of data that is generated by the UDTF. If the UDTF does not generate any data for a row, the row is not returned.
select S.id, S.content, T.a, T.b, T.c from input_stream as S, lateral table(parseUdtf(content)) as T(a, b, c);
- LEFT JOIN
Each row in the left table is joined with a row of data that is generated by the UDTF. If the UDTF does not generate any data for a row, the UDTF fields in the row are filled with null.Note A LEFT JOIN statement that uses a UDTF must end with
on true
.select S.id, S.content, T.a, T.b, T.c from input_stream as S left join lateral table(parseUdtf(content)) as T(a, b, c) on true;
Register and use resources
- Log on to the Realtime Compute for Apache Flink console.
- In the top navigation bar, click Development.
- In the left-side navigation pane, click the Resources tab.
- In the upper-right corner of the Resources pane, click Create Resource.
- In the Upload Resource dialog box, configure resource parameters.
Parameter Description Location You can only upload JAR packages from your machine in the Realtime Compute for Apache Flink console. Note The maximum size of a JAR package that can be uploaded from your machine is 300 MB. If the JAR package exceeds 300 MB, you must upload it to the Object Storage Service (OSS) bucket bound to your cluster or use APIs to upload it.Resource Click Upload Resource and select the resource that you want to reference. Resource Name Enter a name for the resource. Resource Description Enter a resource description. Resource Type Select the type of the resource. The type can be JAR, DICTIONARY, or PYTHON. - In the Resources pane, find the new resource, and move the pointer over More in the Actions column.
- In the drop-down list, select Reference.
- In the code editor, declare the UDX at the beginning. The following code is an example:
CREATE FUNCTION stringLengthUdf AS 'com.hjc.test.blink.sql.udx.StringLengthUdf';
Publish and use a UDTF
For more information about how to publish and use a UDTF, see Publish a job and Start a job.Example
-- UDTF str.split("\\|");
create function splitUdtf as 'com.hjc.test.blink.sql.udx.SplitUdtf';
create table sls_stream(
a INT,
b BIGINT,
c VARCHAR
) with (
type='sls',
endPoint='yourEndpoint',
accessKeyId='yourAccessKeyId',
accessKeySecret='yourAccessSecret',
startTime = '2017-07-04 00:00:00',
project='yourProjectName',
logStore='yourLogStoreName',
consumerGroup='consumerGroupTest2'
);
-- Pass the c field to splitUdtf to generate table T(s) that consists of one column and multiple rows after splitting. In the table name T(s), s indicates the field name.
create view v1 as
select a,b,c,s
from sls_stream,
lateral table(splitUdtf(c)) as T(s);
create table rds_output(
id INT,
len BIGINT,
content VARCHAR
) with (
type='rds',
url='yourDatabaseURL',
tableName='yourDatabaseTableName',
userName='yourDatabaseUserName',
password='yourDatabasePassword'
);
insert into rds_output
select
a,b,s
from v1;