This topic describes the precautions when you develop user-defined functions (UDFs) in Realtime Compute for Apache Flink. This topic also describes the classification and definitions of UDFs and how to pass parameters of UDFs in Realtime Compute for Apache Flink.

Precautions

  • To prevent conflicts between JAR package dependencies, take note of the following points when you develop UDFs:
    • Make sure that the Flink version that you select on the Draft Editor page is the same as the Flink version that is specified in the POM dependency.
    • Specify <scope>provided</scope> for Flink-related dependencies.
    • Use the Shade plug-in to package other third-party dependencies. For more information, see Apache Maven Shade plug-in.

    For more information about how to handle Flink dependency conflicts, see Reference.

  • To prevent a timeout that is caused by frequent calls of a UDF in the text of an SQL deployment, we recommend that you upload the JAR package of the UDF as a dependency file and declare the UDF in the deployment by using the CREATE TEMPORARY FUNCTION syntax. For example, you can use CREATE TEMPORARY FUNCTION 'GetJson' AS 'com.soybean.function.udf.MyGetJsonObjectFunction';.

UDF classification

The following table describes three types of UDFs supported by Flink.
Type Description
User Defined Scalar Function (UDSF) A UDSF maps zero, one, or more scalar values to a new scalar value. A one-to-one mapping is established between the input and output of a UDSF. Each time a UDSF reads one row of data, the UDSF writes an output value. For more information, see UDSFs.
User-defined aggregate function (UDAF) A UDAF aggregates multiple values into a single value. A many-to-one mapping is established between the input and output of a UDAF. Multiple input values are aggregated to generate one output value. For more information, see UDAFs.
User-defined table-valued function (UDTF) A UDTF takes zero, one, or more scalar values as input parameters. These parameters can be variable-length parameters. UDTFs are similar to UDSFs except that UDTFs can return any number of rows instead of a single value. Returned rows consist of one or more columns. Multiple rows or columns are returned each time a UDTF is called. For more information, see UDTFs.

Pass UDF parameters

If you want to modify a UDF parameter, we recommend that you do not modify the parameter in the UDF code because the process is complex. You can configure the parameter in the Additional Configuration section of the Advanced tab in the console of fully managed Flink and use the parameter in the UDF code. This way, you can directly change the parameter value in the console of fully managed Flink.

The FunctionContext#getJobParameter method can be used to obtain only the value of pipeline.global-job-parameters. Therefore, you must write all configuration items that are used by the UDF to pipeline.global-job-parameters.

UDFs support an optional open(FunctionContext context) method. You can use FunctionContext to pass custom configuration items. To pass UDF parameters, perform the following steps:
  1. On the Advanced tab on the right side of the Draft Editor page, add pipeline.global-job-parameters to the Additional Configuration section. Sample code:
    pipeline.global-job-parameters: | 
      'k1:{hi,hello}',
      'k2:"str:ing,str:ing"',
      'k3:"str""ing,str:ing"'
    The following table shows the map that is defined in the configuration of pipeline.global-job-parameters in the sample code.
    Key Value
    k1 {hi,hello}
    k2 str:ing,str:ing
    k3 str"ing,str:ing
    The following table shows how to configure pipeline.global-job-parameters.
    Procedure Action Operation Example value
    Step 1 Redefine key-value pairs.
    Separate keys and values with colons (:), and enclose each key-value pair with single quotation marks (').
    Note
    • If a key or value contains a colon (:), enclose the key or value with double quotation marks (").
    • If a key or value contains a colon (:) and a double quotation mark ("), you must escape the key or value by using two double quotation marks (").
    • If the configuration of a key-value pair is key = k1,value = {hi,hello}, the key-value pair is defined as 'k1:{hi,hello}'.
    • If the configuration of a key-value pair is key = k2,value = str:ing,str:ing, the key-value pair is defined as 'k2:"str:ing,str:ing"'.
    • If the configuration of a key-value pair is key = k3,value = str"ing,str:ing, the key-value pair is defined as 'k3:"str""ing,str:ing"'.
    Step 2 Configure pipeline.global-job-parameters based on the YAML file format.
    Place different key-value pairs in different rows and separate the key-value pairs with commas (,).
    Note
    • The strings in different rows of a YAML file start with a vertical bar (|).
    • Each row of the strings must have the same indent.
    pipeline.global-job-parameters: | 
      'k1:{hi,hello}',
      'k2:"str:ing,str:ing"',
      'k3:"str""ing,str:ing"'
  2. Use the FunctionContext#getJobParameter method in the UDF code to obtain the content of the map.
    Sample statements:
    context.getJobParameter("k", null); // Obtain the string {hi,hello}. 
    context.getJobParameter("k2", null); // Obtain the string str:ing,str:ing. 
    context.getJobParameter("k3", null); // Obtain the string str"ing,str:ing. 
    context.getJobParameter("pipeline.global-job-parameters", null); // null indicates that only the content that is defined in pipeline.global-job-parameters can be obtained. The configuration items of the job cannot be obtained. 

Register a UDF

UDFs are classified into catalog UDFs and job-level UDFs. To register a UDF, perform the following steps based on the type of the UDF:
  • Catalog UDF
    1. Log on to the Realtime Compute for Apache Flink console.
    2. On the Fully Managed Flink tab, find the workspace that you want to manage and click Console in the Actions column.
    3. In the left-side navigation pane, click Draft Editor.
    4. Click the UDFs tab. In the UDFs list of the page, click the Create icon icon.
    5. In the Register UDF Artifact dialog box, upload the JAR file of the UDF that you want to register. Register UDF Artifact
      You can use one of the following methods to upload the JAR file of a UDF:
      • Upload a file: In the Register UDF Artifact dialog box, click click to select next to Select a file and select the JAR file of the UDF that you want to register. If you want to upload a dependency file, click click to select next to Dependencies to upload the dependency file of the UDF that you want to register.
      • External URL: Enter an external URL.
      Note
      • The JAR file of your UDF is uploaded and stored in the sql-artifacts directory of the Object Storage Service (OSS) bucket that you select. In the console of fully managed Flink, the system parses the JAR file of the UDF and checks whether the classes of the Flink UDSF, UDAF, and UDTF interfaces are used in the file. Then, the system automatically extracts the class names and specifies the class names in the Function Name field.
      • For a Java UDF, you can package its dependencies into the JAR file of the UDF or upload the dependency files by using the Dependencies parameter.
      • If the JAR file or the dependency file of a UDF is large, we recommend that you upload the file by using an external URL. If the external URL is the endpoint of an OSS bucket, the dependency file of the UDF must be stored in the sql-artifacts/namespaces/{namespace} directory.
    6. Click Ok.

      In the UDFs pane on the left side of the SQL editor, you can view all the UDFs that are registered.

  • Job-level UDF
    By default, the latest Flink version is used for data parsing when you register a catalog UDF. If you use a catalog UDF in your job and select an earlier Flink version on the Advanced tab, an incompatibility issue may occur. To resolve this issue, you can implement UDF code based on the engine version that is required by your job and use job-level UDFs. To use a job-level UDF, perform the following steps:
    1. Upload the JAR file of the UDF that you want to use.

      In the left-side navigation pane, click Artifacts. In the upper-right corner of the Artifacts page, click Upload Artifact to upload the JAR file of a UDF.

    2. Specify a job-level UDF in your job.

      On the Advanced tab of the Draft Editor page, select the JAR file of the UDF in Additional Dependencies.

    3. Register the UDF.
      Execute the CREATE TEMPORARY FUNCTION statement to register the job-level UDF.
      CREATE TEMPORARY FUNCTION MyScalarFunc AS 'com.test.MyScalarFunc';

Version mappings between Ververica Runtime (VVR) and Apache Flink

VVR version Apache Flink version
  • VVR 3.0.4
  • VVR 3.0.7
Flink 1.12
  • VVR 4.0.8
  • VVR 4.0.10
  • VVR 4.0.11
  • VVR 4.0.12
Flink 1.13
On the Advanced tab on the right side of the Draft Editor page, you can view the version mappings between VVR and Apache Flink in the drop-down list of the Engine Version parameter. Version mappings