Realtime Compute for Apache Flink supports Java user-defined functions (UDFs) in Flink SQL jobs. This topic describes the types of Java UDFs, how to pass parameters, and provides important usage notes.
Important notes
To avoid JAR package dependency conflicts when you develop UDFs, consider the following:
Ensure that the Flink version selected on the SQL development page matches the Flink version in the Pom dependency.
For Flink-related dependencies, set the scope to `provided` by adding
<scope>provided</scope>.Package other third-party dependencies using the Shade method. For more information, see Apache Maven Shade Plugin.
For more information about Flink dependency conflicts, see How do I resolve Flink dependency conflicts?.
To prevent timeouts caused by frequent calls to a UDF in a SQL job, you can upload the UDF JAR package as a dependency file. Then, declare the function in the job using the
CREATE TEMPORARY FUNCTIONsyntax. For example:CREATE TEMPORARY FUNCTION 'GetJson' AS 'com.soybean.function.udf.MyGetJsonObjectFunction';
UDF classifications
Flink supports the following three types of UDFs.
Classification | Description |
User-defined scalar function (UDSF) | A UDSF maps zero, one, or more scalar values to a new scalar value. It has a one-to-one relationship between input and output. This means it reads one row of data and writes one output value. For more information, see User-defined scalar functions (UDSFs). |
User-defined aggregate function (UDAF) | A UDAF aggregates multiple records into a single record. It has a many-to-one relationship between input and output. This means it aggregates multiple input records into a single output value. For more information, see User-defined aggregate functions (UDAFs). |
User-defined table-valued function (UDTF) | A UDTF takes zero, one, or more scalar values as input parameters. The parameters can be of variable length. It is similar to a UDSF but can return any number of rows as output, not just a single value. The returned rows can consist of one or more columns. A single function call can output multiple rows or columns. For more information, see User-defined table-valued functions (UDTFs). |
UDF registration
For information about how to register a global UDF, see Global UDFs.
For information about how to register a job-level UDF, see Job-level UDFs.
Pass parameters to a UDF
You can configure parameters for a UDF in the Flink development console and use them in the UDF code. This lets you quickly modify UDF parameter values directly in the console.
UDFs provide an optional `open(FunctionContext context)` method. The `FunctionContext` object is used to pass custom configuration items as parameters. The process is as follows:
On the Deployment Details tab of the page in the Flink development console, add the pipeline.global-job-parameters configuration item in the Other Configurations section of Running Parameter Settings.
pipeline.global-job-parameters: | 'k1:{hi,hello}', 'k2:"str:ing,str:ing"', 'k3:"str""ing,str:ing"'`FunctionContext#getJobParameter` can only retrieve the value of the `pipeline.global-job-parameters` configuration item. Therefore, you must write all configuration items that the UDF uses into `pipeline.global-job-parameters`. The following steps describe how to configure this item.
Step
Action
Procedure
Example
Step 1
Define key-value pairs.
Separate the key and value with a colon (:) and enclose each key-value pair in single quotation marks (').
NoteIf a key or value contains a colon (:), enclose the key or value in double quotation marks (").
If a key or value contains a colon (:) or a double quotation mark ("), you must escape it with two consecutive double quotation marks ("").
If key = k1 and
value = {hi,hello}, define the pair as'k1:{hi,hello}'.If key = k2 and
value = str:ing,str:ing, define the pair as'k2:"str:ing,str:ing"'If key = k3 and
value = str"ing,str:ing, define the pair as'k3:"str""ing,str:ing"'
Step 2
Format the final pipeline.global-job-parameters value as a YAML file.
Place each key-value pair on a new line and separate them with commas (,).
NoteA multi-line string in a YAML file starts with a vertical bar (|).
Each line of a multi-line string in a YAML file must have the same indent.
pipeline.global-job-parameters: | 'k1:{hi,hello}', 'k2:"str:ing,str:ing"', 'k3:"str""ing,str:ing"'In the UDF code, use `FunctionContext#getJobParameter` to retrieve the value of each item. The following code is an example.
The following sample code is provided.
context.getJobParameter("k1", null); // Returns the string {hi,hello}. context.getJobParameter("k2", null); // Returns the string str:ing,str:ing. context.getJobParameter("k3", null); // Returns the string str"ing,str:ing. context.getJobParameter("pipeline.global-job-parameters", null); // Returns null. You can only get the content defined in pipeline.global-job-parameters, not any other job configuration item.
Named parameters
Only Ververica Runtime (VVR) 8.0.7 and later supports using named parameters to implement UDFs.
When you call a function in SQL, you must specify all parameters in the correct order. If a function has many parameters, it is easy to make mistakes with the parameter count or order. You also cannot omit optional parameters. Named parameters allow you to specify only the parameters you need, which reduces the chance of errors and is more convenient. The following example of a user-defined scalar function (ScalarFunction) shows how to use named parameters.
// Implement a user-defined scalar function. The last two input parameters are optional (isOptional = true).
public class MyFuncWithNamedArgs extends ScalarFunction {
private static final long serialVersionUID = 1L;
public String eval(@ArgumentHint(name = "f1", isOptional = false, type = @DataTypeHint("STRING")) String f1,
@ArgumentHint(name = "f2", isOptional = true, type = @DataTypeHint("INT")) Integer i2,
@ArgumentHint(name = "f3", isOptional = true, type = @DataTypeHint("LONG")) Long l3) {
if (i2 != null) {
return "i2#" + i2;
}
if (l3 != null) {
return "l3#" + l3;
}
return "default#" + f1;
}
}When you use this UDF in SQL, you can specify only the first required parameter or selectively specify optional parameters. The following code is an example.
CREATE TEMPORARY FUNCTION MyNamedUdf AS 'com.aliyun.example.MyFuncWithNamedArgs';
CREATE temporary TABLE s1 (
a INT,
b BIGINT,
c VARCHAR,
d VARCHAR,
PRIMARY KEY(a) NOT ENFORCED
) WITH (
'connector' = 'datagen',
'rows-per-second'='1'
);
CREATE temporary TABLE sink (
a INT,
b VARCHAR,
c VARCHAR,
d VARCHAR
) WITH (
'connector' = 'print'
);
INSERT INTO sink
SELECT a,
-- Specify only the first required parameter
MyNamedUdf(f1 => c) arg1_res,
-- Specify the first required parameter and the second optional parameter
MyNamedUdf(f1 => c, f2 => a) arg2_res,
-- Specify the first required parameter and the third optional parameter
MyNamedUdf(f1 => c, f3 => d) arg3_res
FROM s1;References
For examples of how to develop and use Java UDFs, see User-defined aggregate functions (UDAFs), User-defined scalar functions (UDSFs), and User-defined table-valued functions (UDTFs).
For information about how to debug and tune Python UDFs, see Overview.
For examples of how to develop and use Python UDFs, see User-defined aggregate functions (UDAFs), User-defined scalar functions (UDSFs), and User-defined table-valued functions (UDTFs).