AnalyticDB for MySQL allows you to use user-defined functions (UDFs) in Spark SQL to meet the requirements that cannot be satisfied by built-in functions. This topic describes how to define and use a UDF in Spark SQL.

Prerequisites

  • A UDF used in AnalyticDB for MySQL Spark SQL is implemented from the following interfaces:
    • Spark UDFs (UDF0 to UDF22) obtained from the org.apache.spark.sql.api.java package.
    • Hive UDFs, user-defined table generating functions (UDTFs), or user-defined aggregate functions (UDAFs).
    • MaxCompute UDFs or UDTFs
  • The UDF is developed in an on-premises development tool such as IntelliJ IDEA and compressed into a JAR package. The JAR package must meet the following requirements:
    • The JAR package must contain second-party or third-party packages on which the UDF depends on.
    • The JAR package cannot contain JAR packages that already exist in the Spark computing framework, such as Hive or Spark JAR packages, to avoid conflicts.

Procedure

  1. Create a Maven project in the development tool and add the following dependency to the pom.xml file. IntelliJ IDEA is used in the example.
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.12</artifactId>
        <version>3.0.1</version>
        <scope>provided</scope>
    </dependency>
    5
  2. Define the UDF in IntelliJ IDEA and compress the UDF into a JAR package. In this example, the UDF to be developed is named hi. The following section describes the definition of the function:
    package spark.udf
    import org.apache.spark.sql.api.java.UDF1;
    public class UdfHi implements UDF1<String, String> {
        @Override
        public String call(String s) throws Exception {
            return "Hi, " + s;
        }
    }
    1
  3. Upload the JAR package from your on-premises machine to an Object Storage Service (OSS) directory by using the AnalyticDB for MySQL console. For more information, see Manage resources.
    Note After the package is uploaded, click Copy Path in the Actions column corresponding to the desired file. The copied path can be pasted to the _udfs file of the UDF in Step 4.
  4. Create a TEXT file named _udfs that contains the UDF definition information for automatic registration in AnalyticDB for MySQL Spark. This allows you to call this UDF when you execute Spark SQL jobs.
    Note
    • The UDF definition contains the following fields:
      • <udf_name>: the function name.
      • <udf_class>: the class name.
      • <jar_path>: the OSS path of the JAR package, which is copied in Step 3.

      The file format is <udf_name> <udf_class> <jar_path>. Fields must be separated by spaces. Example:

      hi spark.udf.UdfHi oss://your_oss_bucket/user_uploads/adb-spark-udf-1.0.0.0.jar
    • By default, AnalyticDB for MySQL reads the UDF registration information from the file named _udfs. If the name of the definition file generated for the UDF is followed by a suffix such as .txt, you must delete the suffix before you upload the file to OSS.
    • If you define multiple UDFs at a time, each UDF corresponds to a line of definition text.
  5. Upload the UDF definition file from your on-premises machine to the OSS directory by using the AnalyticDB for MySQL console. For more information, see Manage resources.

What to do next

After you complete the preceding operations, you can use this UDF when you submit Spark SQL jobs in the same manner as common functions.

For example, you can use the UDF to convert the INSERT INTO target_table SELECT k, name, concat('Hi ', name) FROM source_table; statement in Step 4 of the Submit a Spark SQL job in the DMS console topic to the following one:

INSERT INTO target_table select k, name, hi(name) FROM source_table;