This topic describes the usage notes when you develop user-defined functions (UDFs) in Flink. It also describes the classification and definitions of UDFs and how to pass parameters of UDFs in Flink.

Usage notes

To prevent conflicts between JAR package dependencies, take note of the following items when you develop UDFs:
  • Make sure that the Flink version you select on the Draft Editor page is the same as the Flink version in the POM dependency.
  • Specify <scope>provided</scope> for Flink-related dependencies.
  • Use the Shade plug-in to package other third-party dependencies. For more information, see Apache Maven Shade plug-in.

For more information about how to handle Flink dependency conflicts, see How do I resolve dependency conflicts of Flink?.

UDF classification

The following table describes the three types of the UDFs that are supported by Flink.
Type Description
User-defined scalar function (UDF) A UDF maps zero, one, or more scalar values to a new scalar value. A one-to-one mapping is established between the input and output of a UDF. Each time a UDF reads one row of data, it writes an output value. For more information, see UDFs.
User-defined aggregate function (UDAF) A UDAF aggregates multiple records into a single value. A many-to-one mapping is established between the input and output of a UDAF. Multiple input records are aggregated to generate one output value. For more information, see UDAFs.
User-defined table-valued function (UDTF) A UDTF takes zero, one, or more scalar values as input parameters. These parameters can be variable-length parameters. UDTFs are similar to UDFs but differ in that UDTFs can return any number of rows instead of a single value. Returned rows consist of one or more columns. Multiple rows or columns are returned each time a UDTF is called. For more information, see UDTFs.

Pass UDF parameters

If you want to modify a UDF parameter, we recommend that you do not modify the parameter in UDF code because the process is complex. In this case, you can configure this parameter in the Additional Configuration section of the Advanced tab in the console of fully managed Flink and then use this parameter in UDF code. This way, you can directly change the parameter value in the console of fully managed Flink.

UDFs support the open(FunctionContext context) method, which is optional. You can use FunctionContext to pass custom configuration items. To pass UDF parameters, perform the following steps:
  1. On the right side of the Draft Editor page, click the Advanced tab and add the required parameters and their values to the Additional Configuration section. Sample code:
    maxJobCreationAttempts: value
  2. Use the ConfigOptions.key() method in UDF code to obtain the UDF parameters. Sample code:
    public void open(FunctionContext context) throws Exception {
        LOG.info(String.format("maxJobCreationAttempts:%s%n",
        ConfigOptions.key("maxJobCreationAttempts").stringType().noDefaultValue()));
    }

Register a UDF

UDFs are classified into catalog UDFs and job-level UDFs. To register a UDF, perform the following steps based on the type of the UDF:
  • Catalog UDF
    1. Log on to the Realtime Compute for Apache Flink console.
    2. On the Fully Managed Flink tab, find the workspace that you want to manage and click Console in the Actions column.
    3. In the left-side navigation pane, click Draft Editor.
    4. Click the UDFs tab. In the upper-left corner of the page, click the New icon.
    5. Upload the JAR file of the UDF that you want to register. Register a UDF
      You can use one of the following methods to upload the JAR file of a UDF:
      • Upload a file: In the Register UDF Artifact dialog box, click click to select next to Select a file and select the JAR file of the UDF that you want to register. If you want to upload a dependency file, click click to select next to Dependencies to upload the dependency file of the UDF that you want to register.
      • External URL: Enter an external URL.
      Note
      • The JAR file of your UDF is uploaded and stored in the sql-artifacts directory of the Object Storage Service (OSS) bucket that you select. In the console of fully managed Flink, the system parses the JAR file of the UDF and checks whether the classes of the Flink UDF, UDAF, and UDTF interfaces are used in the file. Then, the system automatically extracts the class names and specifies the class names in the Function Name field.
      • For a Java UDF, you can package its dependencies into the JAR file of the UDF or upload the dependency files by using the Dependencies parameter.
      • If the JAR file of a UDF or its dependency file is large, we recommend that you upload the file by using an external URL. If the external URL is the endpoint of an OSS bucket, the dependency file of the UDF must be stored in the sql-artifacts/namespaces/{namespace} directory.
    6. Click Confirm.

      In the UDFs list on the left side of the SQL Editor page, you can view all the UDFs that are registered.

  • Job-level UDF
    By default, the latest Flink version is used for data parsing when you register a catalog UDF. If you use a catalog UDF in your job and select an earlier Flink version on the Advanced tab, an incompatibility issue may occur. To resolve this issue, you can implement UDF code based on the engine version that is required by your job and use job-level UDFs. To use a job-level UDF, perform the following steps:
    1. Upload the JAR file of the UDF that you want to use.

      In the left-side navigation pane, click Artifacts to upload the JAR file of a UDF.

    2. Specify a job-level UDF in your job.

      On the Advanced tab, select the JAR file of the UDF in Additional Dependencies.

    3. Register the UDF.
      Execute the CREATE TEMPORARY FUNCTION statement to register the job-level UDF.
      CREATE TEMPORARY FUNCTION MyScalarFunc AS 'com.test.MyScalarFunc';