All Products
Search
Document Center

Realtime Compute for Apache Flink:Overview

Last Updated:Jan 25, 2024

Realtime Compute for Apache Flink allows you to use Java user-defined functions (UDFs) in Flink SQL deployments. This topic describes the classification, parameter passing, and precautions of Java UDFs of Flink.

Precautions

  • To prevent conflicts between JAR file dependencies, take note of the following points when you develop UDFs:

    • Make sure that the Flink version that you select on the SQL Editor page is the same as the Flink version that is specified in the POM dependency.

    • Specify <scope>provided</scope> for Flink-related dependencies.

    • Use the Shade plug-in to package other third-party dependencies. For more information, see Apache Maven Shade plug-in.

    For more information about how to handle Flink dependency conflicts between JAR files, see How do I troubleshoot dependency conflicts of Flink?

  • To prevent a timeout that is caused by frequent calls of a UDF in the text of an SQL deployment, we recommend that you upload the JAR file of the UDF as a dependency file and declare the UDF in the deployment by using the CREATE TEMPORARY FUNCTION syntax. For example, you can use CREATE TEMPORARY FUNCTION 'GetJson' AS 'com.soybean.function.udf.MyGetJsonObjectFunction';.

UDF classification

The following table describes three types of UDFs supported by Flink.

Type

Description

User Defined Scalar Function (UDSF)

A UDF maps zero, one, or more scalar values to a new scalar value. A one-to-one mapping is established between the input and output of a UDF. Each time a UDF reads one row of data, the UDF writes an output value. For more information, see UDSFs.

User-defined aggregate function (UDAF)

A UDAF aggregates multiple values into a single value. A many-to-one mapping is established between the input and output of a UDAF. Multiple input records are aggregated to generate one output value. For more information, see UDAFs.

User-defined table-valued function (UDTF)

A UDTF takes zero, one, or more scalar values as input parameters. These parameters can be variable-length parameters. UDTFs are similar to UDFs except that UDTFs can return any number of rows instead of a single value. Returned rows consist of one or more columns. Multiple rows or columns are returned each time a UDTF is called. For more information, see UDTFs.

Register a UDF

Pass UDF parameters

You can configure parameters of a UDF in the console of fully managed Flink and use the parameters in UDF code. This way, you can directly change the parameter values of the UDF in the console of fully managed Flink.

UDFs support an optional open(FunctionContext context) method. You can use FunctionContext to pass custom configuration items. To pass UDF parameters, perform the following steps:

  1. On the Deployments page in the console of fully managed Flink, click the name of the desired deployment. On the Configuration tab, click Edit in the upper-right corner of the Parameters section. Then, add pipeline.global-job-parameters to the Other Configuration field. Sample code:

    pipeline.global-job-parameters: | 
      'k1:{hi,hello}',
      'k2:"str:ing,str:ing"',
      'k3:"str""ing,str:ing"'

    The FunctionContext#getJobParameter method can be used to obtain only the value of pipeline.global-job-parameters. Therefore, you must write all configuration items that are used by the UDF to pipeline.global-job-parameters. The following table describes how to configure pipeline.global-job-parameters.

    Step

    Action

    Operation

    Example

    Step 1

    Define key-value pairs.

    Separate keys and values with colons (:), and enclose each key-value pair with single quotation marks (').

    Note
    • If a key or value contains a colon (:), you must enclose the key or value with double quotation marks (").

    • If a key or value contains a colon (:) and a double quotation mark ("), you must escape the key or value by using two double quotation marks (").

    • If the configuration of a key-value pair is key = k1,value = {hi,hello}, the key-value pair is defined as 'k1:{hi,hello}'.

    • If the configuration of a key-value pair is key = k2,value = str:ing,str:ing, the key-value pair is defined as 'k2:"str:ing,str:ing"'.

    • If the configuration of a key-value pair is key = k3,value = str"ing,str:ing, the key-value pair is defined as 'k3:"str""ing,str:ing"'.

    Step 2

    Configure pipeline.global-job-parameters based on the YAML file format.

    Place different key-value pairs in different rows and separate the key-value pairs with commas (,).

    Note
    • The strings in different rows of a YAML file start with a vertical bar (|).

    • Each row of the strings must have the same indent.

    pipeline.global-job-parameters: | 
      'k1:{hi,hello}',
      'k2:"str:ing,str:ing"',
      'k3:"str""ing,str:ing"'
  2. Use the FunctionContext#getJobParameter method in the UDF code to obtain the content of the map.

    Sample code:

    context.getJobParameter("k1", null); // Obtain the string {hi,hello}. 
    context.getJobParameter("k2", null); // Obtain the string str:ing,str:ing. 
    context.getJobParameter("k3", null); // Obtain the string str"ing,str:ing. 
    context.getJobParameter("pipeline.global-job-parameters", null); // null indicates that only the content that is defined in pipeline.global-job-parameters can be obtained. The configuration items of the job cannot be obtained.

References