This topic describes how to build a development environment, write business logic code, and publish a user-defined scalar function (UDF) in Realtime Compute for Apache Flink.
Definition
A UDF maps zero, one, or more scalar values to a new scalar value.Build a development environment
For more information about how to build a development environment, see Build a development environment.
Write business logic code
eval
method. The open
and close
methods are optional.
override isDeterministic()
method to make it return false
. Otherwise, the output may not meet your expectations in some cases. For example,
a UDF operator moves forward.
package com.hjc.test.blink.sql.udx;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.ScalarFunction;
public class StringLengthUdf extends ScalarFunction {
// The open method is optional.
// To use the open method, you must add 'import org.apache.flink.table.functions.FunctionContext;' to the code.
@Override
public void open(FunctionContext context) {
}
public long eval(String a) {
return a == null ? 0 : a.length();
}
public long eval(String b, String c) {
return eval(b) + eval(c);
}
// The close method is optional.
@Override
public void close() {
}
}
Write SQL statements
-- udf str.length()
CREATE FUNCTION stringLengthUdf AS 'com.hjc.test.blink.sql.udx.StringLengthUdf';
create table sls_stream(
a int,
b int,
c varchar
) with (
type='sls',
endPoint='<yourEndpoint>',
accessKeyId='<yourAccessId>',
accessKeySecret='<yourAccessSecret>',
startTime = '2017-07-04 00:00:00',
project='<yourProjectName>',
logStore='<yourLogStoreName>',
consumerGroup='consumerGroupTest1'
);
create table rds_output(
id int,
len bigint,
content VARCHAR
) with (
type='rds',
url='yourDatabaseURL',
tableName='<yourDatabaseTableName>',
userName='<yourDatabaseUserName>',
password='<yourDatabasePassword>'
);
insert into rds_output
select
a,
stringLengthUdf(c),
c as content
from sls_stream;
Register and use resources
- Log on to the Realtime Compute development platform.
- In the top navigation bar, click Development.
- In the left-side navigation pane, click the Resources tab.
- In the upper-right corner of the Resources pane, click Create Resource.
- In the Upload Resource dialog box, configure the resource parameters.
Parameter Description Location You can upload only JAR packages from your on-premises machine in the Realtime Compute for Apache Flink console. Note The maximum size of a JAR package that can be uploaded from your on-premises machine is 300 MB. If the size of the JAR package exceeds 300 MB, you must upload the package to the Object Storage Service (OSS) bucket that is bound to your cluster or use an API to upload the package.Resource Click Upload Resource to select the resource that you want to reference. Resource Name Enter a name for the resource. Resource Description Enter a description for the resource. Resource Type Select the type of the resource. Valid values: JAR, DICTIONARY, and PYTHON. - In the Resources pane, find the new resource, and move the pointer over More in the Actions column.
- In the drop-down list, select Reference.
- In the code editor, declare the UDX at the beginning. The following statement is an
example:
CREATE FUNCTION stringLengthUdf AS 'com.hjc.test.blink.sql.udx.StringLengthUdf';
Publish and use a UDF
Click Publish on the Development page for the job where you want to publish a UDF. Then, find the job on the Administration page and click Start in the Actions column to publish the UDF.
FAQ
Q: Why does the random number generator always generate the same value at runtime?
A: If no input parameters are passed to a UDX and you do not declare it as nondeterministic,
the UDX may be optimized during compilation to return a constant value. To avoid this
issue, you can use the override isDeterministic()
method to make it return false
.