This topic describes how to build a development environment and use user-defined extensions (UDXs) in Realtime Compute for Apache Flink.

Notice
  • Only Realtime Compute for Apache Flink in exclusive mode supports UDXs.
  • Blink is developed based on Apache Flink SQL by Alibaba Cloud Realtime Compute for Apache Flink to improve computing performance. UDXs can be used only in Blink.

UDX type

Realtime Compute for Apache Flink supports three types of UDXs, as described in the following table.
UDX type Description
UDF User-defined scalar function. The relationship between the input and output of UDFs is one-to-one mapping, which indicates that one value is returned each time a UDF reads one row of data.
UDAF User-defined aggregation function. The relationship between the input and output of UDAFs is many-to-one mapping. A UDAF aggregates multiple input records into one output record. A UDAF can be used with the GROUP BY clause of SQL. For more information, see Aggregate functions.
UDTF User-defined table-valued function. When a UDTF is called, it generates multiple columns or rows of data.

Example

Realtime Compute for Apache Flink provides an example of a UDX to facilitate your business development. This example shows how to develop a UDF, UDAF, and UDTF.
Note
  • In this example, a development environment of the required version is configured. You do not need to build another development environment.
  • The example provides Maven projects. You can use IntelliJ IDEA for development. For more information, see Develop a job.

Build a development environment

The development of UDXs depends on the following JAR packages. You can download the packages as required.

Register and use resources

  1. Log on to the Realtime Compute development platform.
  2. In the top navigation bar, click Development.
  3. In the left-side navigation pane, click the Resources tab.
  4. In the upper-right corner of the Resources pane, click Create Resource.
  5. In the Upload Resource dialog box, configure the resource parameters.
    Parameter Description
    Location You can upload JAR packages only from your on-premises machine in the Realtime Compute for Apache Flink console.
    Note The maximum size of a JAR package that can be uploaded from your on-premises machine is 300 MB. If the JAR package exceeds 300 MB, you must upload it to the Object Storage Service (OSS) bucket that is bound to your cluster or use an API to upload it.
    Resource Click Upload Resource to select the resource that you want to reference.
    Select Resource Name Enter a name for the resource.
    Resource Description Enter a description for the resource.
    Authorization type Select the type of the resource: JAR, DICTIONARY, or PYTHON.
  6. In the Resources pane, find the new resource, and move the pointer over More in the Actions column.
  7. In the drop-down list, select Reference.
  8. In the code editor, declare the UDX at the beginning. The following statement is an example:
    CREATE FUNCTION stringLengthUdf AS 'com.hjc.test.blink.sql.udx.StringLengthUdf';

Types of parameters and return values

When you define Java UDXs in Realtime Compute for Apache Flink, you can use Java data types in parameters and return values. The following table lists the mappings between Realtime Compute for Apache Flink and Java data types.
Data type of Realtime Compute for Apache Flink Java data type
TINYINT java.lang.Byte
SMALLINT java.lang.Short
INT java.lang.Integer
BIGINT java.lang.Long
FLOAT java.lang.Float
DOUBLE java.lang.Double
DECIMAL java.math.BigDecimal
BOOLEAN java.lang.Boolean
DATE java.sql.Date
TIME java.sql.Time
TIMESTAMP java.sql.Timestamp
CHAR java.lang.Character
STRING java.lang.String
VARBINARY java.lang.byte[]
ARRAY Not supported
MAP Not supported

Obtain parameters of UDXs

UDXs support an optional open(FunctionContext context) method. You can use FunctionContext to pass custom configuration items.

For example, you must add the following two parameters to your job:
testKey1=lincoln
test.key2=todd
The following example shows how to use context.getJobParameter in the open method to obtain parameters of a UDTF.
public void open(FunctionContext context) throws Exception {
      String key1 = context.getJobParameter("testKey1", "empty");
      String key2 = context.getJobParameter("test.key2", "empty");
      System.err.println(String.format("end open: key1:%s, key2:%s", key1, key2));
}
Note For more information, see Job parameters.