This topic describes how to build a development environment, write code, and publish a user-defined table-valued function (UDTF) in Realtime Compute for Apache Flink.
Definition
Similar to a user-defined scalar function (UDF), a UDTF uses zero, one, or multiple scalar values as input parameters (including variable-length parameters). Different from a UDF, a UDTF returns any number of rows, rather than a single value. The returned rows can consist of one or more columns.
Build a development environment
For more information, see Build a development environment.
Write code that implements business logic
package com.hjc.test.blink.sql.udx;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.TableFunction;
public class SplitUdtf extends TableFunction<String> {
// The open method is optional. To use the open method, you must add 'import org.apache.flink.table.functions.FunctionContext;' to the code.
@Override
public void open(FunctionContext context) {
// ... ...
}
public void eval(String str) {
String[] split = str.split("\\|");
for (String s : split) {
collect(s);
}
}
// The close method is optional.
@Override
public void close() {
// ... ...
}
}
Return multiple rows
A UDTF can convert the output result from a single row to multiple rows by calling
the collect
method multiple times.
Return multiple columns
- Declare the return value as a tuple.
Realtime Compute for Apache Flink supports Tuple1 to Tuple25, which define 1 to 25 fields. The following example is a UDTF that uses Tuple3 to return three fields:
import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.table.functions.TableFunction; // If the return value is declared as a tuple, you must explicitly declare the generic types of the tuple, such as STRING, LONG, and INTEGER in this example. public class ParseUdtf extends TableFunction<Tuple3<String, Long, Integer>> { public void eval(String str) { String[] split = str.split(","); // The following code is used for reference only. In actual scenarios, you must add code that implements verification logic. String first = split[0]; long second = Long.parseLong(split[1]); int third = Integer.parseInt(split[2]); Tuple3<String, Long, Integer> tuple3 = Tuple3.of(first, second, third); collect(tuple3); } }
Note If the return value is declared as a tuple, column values cannot be null and a maximum of 25 columns are allowed. - Declare the return value as a row.
For example, you can enable this feature to return three columns. The sample code is an example.
import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.DataTypes; import org.apache.flink.table.functions.TableFunction; import org.apache.flink.types.Row; public class ParseUdtf extends TableFunction<Row> { public void eval(String str) { String[] split = str.split(","); String first = split[0]; long second = Long.parseLong(split[1]); int third = Integer.parseInt(split[2]); Row row = new Row(3); row.setField(0, first); row.setField(1, second); row.setField(2, third); collect(row); } @Override // If the return value is declared as a row, you must overload the getResultType method to explicitly declare the data type of the return value. public DataType getResultType(Object[] arguments, Class[] argTypes) { return DataTypes.createRowType(DataTypes.STRING, DataTypes.LONG, DataTypes.INT); } }
Note If the return value is declared as a row, the field value can be null. However, you must overload thegetResultType
method.
SQL syntax
LATERAL
and TABLE
. You must also specify an alias for the UDTF, such as ParseUdtf
in the preceding code. CREATE FUNCTION parseUdtf AS 'com.alibaba.blink.sql.udtf.ParseUdtf';
- cross join
Each row in the left table is joined with a row of data that is generated by the UDTF. If the UDTF does not generate any data for a row, the row is not returned.
select S.id, S.content, T.a, T.b, T.c from input_stream as S, lateral table(parseUdtf(content)) as T(a, b, c);
- left join
Each row in the left table is joined with a row of data that is generated by the UDTF. If the UDTF does not generate any data for a row, the UDTF fields in the row are filled with null.Note A LEFT JOIN statement that uses a UDTF must end with
on true
.select S.id, S.content, T.a, T.b, T.c from input_stream as S left join lateral table(parseUdtf(content)) as T(a, b, c) on true;
Register and use resources
- Log on to the Realtime Compute development platform.
- In the top navigation bar, click Development.
- In the left-side navigation pane, click the Resources tab.
- In the upper-right corner of the Resources pane, click Create Resource.
- In the Upload Resource dialog box, configure resource parameters.
Parameter Description Location You can upload JAR packages only from your on-premises machine in the Realtime Compute for Apache Flink console. Note The maximum size of a JAR package that can be uploaded from your on-premises machine is 300 MB. If the JAR package exceeds 300 MB, you must upload it to the Object Storage Service (OSS) bucket that is bound to your cluster or use an API to upload it.Resource Click Upload Resource to select the resource that you want to reference. Resource Name Enter a name for the resource. Resource Description Enter a resource description. Resource Type Select the type of the resource. Valid values: JAR, DICTIONARY, and PYTHON. - In the Resources pane, find the new resource, and move the pointer over More in the Actions column.
- In the drop-down list, select Reference.
- In the code editor, declare the UDX at the beginning. The following statement is an
example:
CREATE FUNCTION stringLengthUdf AS 'com.hjc.test.blink.sql.udx.StringLengthUdf';
Publish and use a UDTF
For more information about how to publish and use a UDTF, see Publish a job and Start a job.Example
-- UDTF str.split("\\|");
create function splitUdtf as 'com.hjc.test.blink.sql.udx.SplitUdtf';
create table sls_stream(
a INT,
b BIGINT,
c VARCHAR
) with (
type='sls',
endPoint='yourEndpoint',
accessKeyId='yourAccessKeyId',
accessKeySecret='yourAccessSecret',
startTime = '2017-07-04 00:00:00',
project='yourProjectName',
logStore='yourLogStoreName',
consumerGroup='consumerGroupTest2'
);
-- Use the splitUdtf function to extract data from the c field. Table T(s) with multiple rows and one column is returned. s is the name of the column.
create view v1 as
select a,b,c,s
from sls_stream,
lateral table(splitUdtf(c)) as T(s);
create table rds_output(
id INT,
len BIGINT,
content VARCHAR
) with (
type='rds',
url='yourDatabaseURL',
tableName='yourDatabaseTableName',
userName='yourDatabaseUserName',
password='yourDatabasePassword'
);
insert into rds_output
select
a,b,s
from v1;