This topic describes how to build a development environment and use user-defined extensions (UDXs) in Realtime Compute for Apache Flink.
Notice
- Only Realtime Compute for Apache Flink in exclusive mode supports UDXs.
- Blink is developed based on Apache Flink SQL by Alibaba Cloud Realtime Compute for Apache Flink to improve computing performance. UDXs can be used only in Blink.
UDX type
Realtime Compute for Apache Flink supports three types of UDXs, as described in the
following table.
UDX type | Description |
---|---|
UDF | User-defined scalar function. The relationship between the input and output of UDFs is one-to-one mapping, which indicates that one value is returned each time a UDF reads one row of data. |
UDAF | User-defined aggregation function. The relationship between the input and output of UDAFs is many-to-one mapping. A UDAF aggregates multiple input records into one output record. A UDAF can be used with the GROUP BY clause of SQL. For more information, see Aggregate functions. |
UDTF | User-defined table-valued function. When a UDTF is called, it generates multiple columns or rows of data. |
Example
Realtime Compute for Apache Flink provides an example of a UDX to facilitate your
business development. This example shows how to develop a UDF, UDAF, and UDTF.
Note
- In this example, a development environment of the required version is configured. You do not need to build another development environment.
- The example provides Maven projects. You can use IntelliJ IDEA for development. For more information, see Develop a job.
- Realtime Compute for Apache Flink V3.0
- Realtime Compute for Apache Flink V2.0
- Realtime Compute for Apache Flink V1.0
Build a development environment
The development of UDXs depends on the following JAR packages. You can download the
packages as required.
- Realtime Compute for Apache Flink versions earlier than V3.2.1
- Realtime Compute for Apache Flink V3.2.1 and later
Add a POM dependency based on your open source software version. Download and view the example of a complete dependency.Note If you want to use Snapshot, you can add a POM dependency based on your Snapshot version.
Register and use resources
- Log on to the Realtime Compute development platform.
- In the top navigation bar, click Development.
- In the left-side navigation pane, click the Resources tab.
- In the upper-right corner of the Resources pane, click Create Resource.
- In the Upload Resource dialog box, configure the resource parameters.
Parameter Description Location You can upload JAR packages only from your on-premises machine in the Realtime Compute for Apache Flink console. Note The maximum size of a JAR package that can be uploaded from your on-premises machine is 300 MB. If the JAR package exceeds 300 MB, you must upload it to the Object Storage Service (OSS) bucket that is bound to your cluster or use an API to upload it.Resource Click Upload Resource to select the resource that you want to reference. Select Resource Name Enter a name for the resource. Resource Description Enter a description for the resource. Authorization type Select the type of the resource: JAR, DICTIONARY, or PYTHON. - In the Resources pane, find the new resource, and move the pointer over More in the Actions column.
- In the drop-down list, select Reference.
- In the code editor, declare the UDX at the beginning. The following statement is an
example:
CREATE FUNCTION stringLengthUdf AS 'com.hjc.test.blink.sql.udx.StringLengthUdf';
Types of parameters and return values
When you define Java UDXs in Realtime Compute for Apache Flink, you can use Java data
types in parameters and return values. The following table lists the mappings between
Realtime Compute for Apache Flink and Java data types.
Data type of Realtime Compute for Apache Flink | Java data type |
---|---|
TINYINT | java.lang.Byte |
SMALLINT | java.lang.Short |
INT | java.lang.Integer |
BIGINT | java.lang.Long |
FLOAT | java.lang.Float |
DOUBLE | java.lang.Double |
DECIMAL | java.math.BigDecimal |
BOOLEAN | java.lang.Boolean |
DATE | java.sql.Date |
TIME | java.sql.Time |
TIMESTAMP | java.sql.Timestamp |
CHAR | java.lang.Character |
STRING | java.lang.String |
VARBINARY | java.lang.byte[] |
ARRAY | Not supported |
MAP | Not supported |
Obtain parameters of UDXs
UDXs support an optional open(FunctionContext context)
method. You can use FunctionContext
to pass custom configuration items.
For example, you must add the following two parameters to your job:
testKey1=lincoln
test.key2=todd
The following example shows how to use
context.getJobParameter
in the open method to obtain parameters of a UDTF. public void open(FunctionContext context) throws Exception {
String key1 = context.getJobParameter("testKey1", "empty");
String key2 = context.getJobParameter("test.key2", "empty");
System.err.println(String.format("end open: key1:%s, key2:%s", key1, key2));
}
Note For more information, see Job parameters.