This topic describes how to build a development environment and use user-defined extensions (UDXs) in Realtime Compute for Apache Flink.
Notice
- Only Realtime Compute for Apache Flink in exclusive mode supports UDXs.
- Blink is developed based on open source Flink SQL by Alibaba Cloud Realtime Compute for Apache Flink to improve computing performance. UDXs can only be used in Blink.
UDX types
Realtime Compute for Apache Flink supports three types of UDXs, as described in the
following table.
UDX type | Description |
---|---|
UDF | User-defined scalar function. The relationship between the input and output of UDFs is one-to-one mapping, which indicates that one value is returned each time a UDF reads one row of data. |
UDAF | User-defined aggregation function. The relationship between the input and output of UDAFs is many-to-one mapping. A UDAF aggregates multiple input records into one output record. A UDAF can be used with the GROUP BY clause of SQL. For more information, see Aggregate functions. |
UDTF | User-defined table-valued function. When a UDTF is called, it generates multiple columns or rows of data. |
Example
Realtime Compute for Apache Flink provides an example of a UDX to facilitate your
business development. This example demonstrates the implementation of a UDF, UDAF,
and UDTF.
Note
- In this example, a development environment of the required version is configured. You do not need to build another development environment.
- The example provides Maven projects. You can use IntelliJ IDEA for development. For more information, see Develop a job.
- Realtime Compute for Apache Flink V3.0
- Realtime Compute for Apache Flink V2.0
- Realtime Compute for Apache Flink V1.0
Build a development environment
The development of UDXs depends on the following JAR packages. You can download the
packages as needed.
- Realtime Compute for Apache Flink versions earlier than V3.2.1
- Realtime Compute for Apache Flink V3.2.1 and later
Add a POM dependency based on your open source software version. Download and view the example of a complete dependency.Note If you want to use Snapshot, you can add a POM dependency based on your Snapshot version.
Register and use resources
- Log on to the Realtime Compute for Apache Flink console.
- In the top navigation bar, click Development.
- In the left-side navigation pane, click the Resources tab.
- In the upper-right corner of the Resources pane, click Create Resource.
- In the Upload Resource dialog box, configure resource parameters.
Parameter Description Location You can upload JAR packages only from your machine in the Realtime Compute for Apache Flink console. Note The maximum size of a JAR package that can be uploaded from your machine is 300 MB. If the JAR package exceeds 300 MB, you must upload it to the Object Storage Service (OSS) bucket bound to your cluster or use APIs to upload it.Resource Click Upload Resource and select the resource that you want to reference. Resource Name Enter a name for the resource. Resource Description Enter a resource description. Resource Type Select the type of the resource. The type can be JAR, DICTIONARY, or PYTHON. - In the Resources pane, find the new resource, and move the pointer over More in the Actions column.
- In the drop-down list, select Reference.
- In the code editor, declare the UDX at the beginning. The following statement is an
example:
CREATE FUNCTION stringLengthUdf AS 'com.hjc.test.blink.sql.udx.StringLengthUdf';
Types of parameters and return values
When you define Java UDXs in Realtime Compute for Apache Flink, you can use Java data
types in parameters and return values. The following table lists the mappings between
Realtime Compute for Apache Flink and Java data types.
Data type of Realtime Compute for Apache Flink | Java data type |
---|---|
TINYINT | java.lang.Byte |
SAMLLINT | java.lang.Short |
INT | java.lang.Integer |
BIGINT | java.lang.Long |
FLOAT | java.lang.Float |
DOUBLE | java.lang.Double |
DECIMAL | java.math.BigDecimal |
BOOLEAN | java.lang.Boolean |
DATE | java.sql.Date |
TIME | java.sql.Time |
TIMESTAMP | java.sql.Timestamp |
CHAR | java.lang.Character |
STRING | java.lang.String |
VARBINARY | java.lang.byte[] |
ARRAY | Not supported |
MAP | Not supported |
Obtain parameters of UDXs
UDXs support an optional open(FunctionContext context)
method. You can use FunctionContext
to pass custom configuration items.
Assume that you must add the following two parameters to your job:
testKey1=lincoln
test.key2=todd
Take a UDTF as an example. You can use
context.getJobParameter
in the open method to obtain parameters. The following code shows an example: public void open(FunctionContext context) throws Exception {
String key1 = context.getJobParameter("testKey1", "empty");
String key2 = context.getJobParameter("test.key2", "empty");
System.err.println(String.format("end open: key1:%s, key2:%s", key1, key2));
}
Note For more information, see Job parameters.