This topic describes how to write a user-defined aggregate function (UDAF) in Java.

UDAF code structure

You can use Maven in IntelliJ IDEA or MaxCompute Studio to write a UDAF in Java. The UDAF code can contain the following information:
  • Java package: optional.

    You can package Java classes that are defined into a JAR file for future use.

  • Base UDAF classes: required.

    The required UDAF classes are com.aliyun.odps.udf.Aggregator and com.aliyun.odps.udf.annotation.Resolve. The com.aliyun.odps.udf.annotation.Resolve class corresponds to the @Resolve annotation. com.aliyun.odps.udf.UDFException: optional. This class corresponds to the methods that are used to initialize and terminate Java classes. If you want to use other UDAF classes or complex data types, add the required classes by following the instructions provided in MaxCompute SDK.

  • @Resolve annotation: required.

    The annotation is in the @Resolve(<signature>) format. The signature is a function signature that defines the data types of input parameters and the return values of a UDAF. UDAFs cannot obtain function signatures by using the reflection feature. You can obtain a function signature only by using a @Resolve annotation, such as @Resolve("smallint->varchar(10)"). For more information about the @Resolve annotation, see @Resolve annotations.

  • Custom Java class: required.

    A custom Java class is the organizational unit of UDAF code. This class defines the variables and methods that are used to meet your business requirements.

  • Methods to implement the custom Java class: required.
    Java UDAFs must inherit the com.aliyun.odps.udf.Aggregator class and implement the following methods:
    import com.aliyun.odps.udf.ContextFunction;
    import com.aliyun.odps.udf.ExecutionContext;
    import com.aliyun.odps.udf.UDFException;
    public abstract class Aggregator implements ContextFunction {
        // The initialization method. 
        @Override
        public void setup(ExecutionContext ctx) throws UDFException {
        }
        // The terminate method. 
        @Override
        public void close() throws UDFException {
        }
        // Create an aggregation buffer. 
        abstract public Writable newBuffer();
        // The iterate method. 
        // The buffer is an aggregation buffer, which stores the data that is aggregated in a specific phase. The aggregated data refers to the dataset that is obtained after GROUP BY is performed for different Map tasks. One buffer is created for each row of data that is aggregated. 
        // Writable[] indicates a row of data, which specifies the passed column in the code. For example, writable[0] indicates the first column, and writable[1] indicates the second column. 
        // args specifies the parameters that are used to call a UDAF in SQL. It cannot be NULL, but the values in args can be NULL, which indicates that the input data is NULL. 
        abstract public void iterate(Writable buffer, Writable[] args) throws UDFException;
        // The terminate method. 
        abstract public Writable terminate(Writable buffer) throws UDFException;
        // The merge method. 
        abstract public void merge(Writable buffer, Writable partial) throws UDFException;
    }
    The most important methods are iterate, merge, and terminate, which are used to implement the main logic of the UDAF. In addition, you must implement a user-defined writable buffer.

    A user-defined writable buffer converts the objects in memory into byte sequences (or other data transmission protocols) for persistent storage in disks and network transmission. MaxCompute uses distributed computing to process UDAFs. Therefore, MaxCompute must serialize or deserialize data before the data can be transmitted between different devices.

    When you write a Java UDAF, you can use Java data types or Java writable data types. For more information about the mappings between the data types that are supported by MaxCompute projects, Java data types, and Java writable data types, see Data types.

Sample code:
// Package the defined Java classes into a file named org.alidata.odps.udaf.examples. 
package org.alidata.odps.udaf.examples;
// The base UDAF classes. 
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import com.aliyun.odps.io.DoubleWritable;
import com.aliyun.odps.io.Writable;
import com.aliyun.odps.udf.Aggregator;
import com.aliyun.odps.udf.UDFException;
import com.aliyun.odps.udf.annotation.Resolve;
// The custom Java class. 
// The @Resolve annotation. 
@Resolve("double->double")
public class AggrAvg extends Aggregator {
// The methods that are used to implement the custom Java class. 
  private static class AvgBuffer implements Writable {
    private double sum = 0;
    private long count = 0;
    @Override
    public void write(DataOutput out) throws IOException {
      out.writeDouble(sum);
      out.writeLong(count);
    }
    @Override
    public void readFields(DataInput in) throws IOException {
      sum = in.readDouble();
      count = in.readLong();
    }
  }
  private DoubleWritable ret = new DoubleWritable();
  @Override
  public Writable newBuffer() {
    return new AvgBuffer();
  }
  @Override
  public void iterate(Writable buffer, Writable[] args) throws UDFException {
    DoubleWritable arg = (DoubleWritable) args[0];
    AvgBuffer buf = (AvgBuffer) buffer;
    if (arg != null) {
      buf.count += 1;
      buf.sum += arg.get();
    }
  }
  @Override
  public Writable terminate(Writable buffer) throws UDFException {
    AvgBuffer buf = (AvgBuffer) buffer;
    if (buf.count == 0) {
      ret.set(0);
    } else {
      ret.set(buf.sum / buf.count);
    }
    return ret;
  }
  @Override
  public void merge(Writable buffer, Writable partial) throws UDFException {
    AvgBuffer buf = (AvgBuffer) buffer;
    AvgBuffer p = (AvgBuffer) partial;
    buf.sum += p.sum;
    buf.count += p.count;
  }
}

Limits

You cannot access the Internet by using UDFs. If you want to access the Internet by using UDFs, fill in the network connection application form based on your business requirements and submit the application. After the application is approved, the MaxCompute technical support team will contact you and help you establish network connections. For more information about how to fill in the network connection application form, see Network connection process.

Usage notes

Before you write a Java UDAF, take note of the following points:
  • We recommend that you do not package classes that have the same name but different logic into the JAR files of different UDAFs. For example, the JAR file of UDAF 1 is named udaf1.jar and the JAR file of UDAF 2 is named udaf2.jar. Both packages contain a class named com.aliyun.UserFunction.class, but the class has different logic in the packages. If UDAF 1 and UDAF 2 are called in the same SQL statement, MaxCompute loads the com.aliyun.UserFunction.class from one of the two packages. As result, the UDAFs cannot run as expected and a compilation error may occur.
  • The data type of an input parameter or a return value in a Java UDAF is an object. The first letter of the data types that you specify in the Java UDAF code must be in uppercase, such as String.
  • NULL values in MaxCompute SQL are represented by NULL in Java. Primitive data types in Java cannot represent NULL values in MaxCompute SQL. Therefore, these data types cannot be used.

@Resolve annotations

@Resolve annotation format:
@Resolve(<signature>)
The signature parameter is a string that specifies the data types of input parameters and return values. When you run a UDAF, the data types of input parameters and return values of the UDAF must be consistent with the data types specified in the function signature. The data type consistency is checked during semantic parsing. If the data types are inconsistent, an error is returned. Format of a signature:
'arg_type_list -> type'
  • arg_type_list: indicates the data types of input parameters. If multiple input parameters are used, specify multiple data types and separate them with commas (,). The following data types are supported: BIGINT, STRING, DOUBLE, BOOLEAN, DATETIME, DECIMAL, FLOAT, BINARY, DATE, DECIMAL(precision,scale), CHAR, VARCHAR, complex data types (ARRAY, MAP, and STRUCT), and nested complex data types.
    arg_type_list can also be set to an asterisk (*) or left empty.
    • If arg_type_list is set to an asterisk (*), a random number of input parameters are used.
    • If arg_type_list is left empty, no input parameters are used.
  • type: specifies the data type of return values. For a UDAF, only one column of values is returned. The following data types are supported: BIGINT, STRING, DOUBLE, BOOLEAN, DATETIME, DECIMAL, FLOAT, BINARY, DATE, and DECIMAL(precision, scale). Complex data types, such as ARRAY, MAP, and STRUCT, and nested complex data types are also supported.
Note When you write UDAF code, you can select a data type based on the data type edition used by your MaxCompute project. For more information about data type editions and data types supported by each edition, see Data type editions.

The following table provides examples of @Resolve annotations.

@Resolve annotation Description
@Resolve('bigint,double->string') The data types of input parameters are BIGINT and DOUBLE, and the data type of return values is STRING.
@Resolve('*->string') A random number of input parameters are used and the data type of return values is STRING.
@Resolve('->double') No input parameters are used and the data type of return values is DOUBLE.
@Resolve('array<bigint>->struct<x:string, y:int>') The data type of input parameters is ARRAY<BIGINT> and the data type of return values is STRUCT<x:STRING, y:INT>.

Data types

In MaxCompute, different data type editions support different data types. In MaxCompute V2.0 and later, more data types and complex data types, such as ARRAY, MAP, and STRUCT, are supported. For more information about MaxCompute data type editions, see Data type editions.

The following table describes the mappings among the data types that are supported by MaxCompute projects, Java data types, and Java writable data types. You must write Java UDAFs based on the mappings to ensure data type consistency.

MaxCompute data type Java data type Java writable data type
TINYINT java.lang.Byte ByteWritable
SMALLINT java.lang.Short ShortWritable
INT java.lang.Integer IntWritable
BIGINT java.lang.Long LongWritable
FLOAT java.lang.Float FloatWritable
DOUBLE java.lang.Double DoubleWritable
DECIMAL java.math.BigDecimal BigDecimalWritable
BOOLEAN java.lang.Boolean BooleanWritable
STRING java.lang.String Text
VARCHAR com.aliyun.odps.data.Varchar VarcharWritable
BINARY com.aliyun.odps.data.Binary BytesWritable
DATETIME java.util.Date DatetimeWritable
TIMESTAMP java.sql.Timestamp TimestampWritable
INTERVAL_YEAR_MONTH N/A IntervalYearMonthWritable
INTERVAL_DAY_TIME N/A IntervalDayTimeWritable
ARRAY java.util.List N/A
MAP java.util.Map N/A
STRUCT com.aliyun.odps.data.Struct N/A
Note The input parameters or return values of a UDAF can be of Java writable data types only if your MaxCompute project uses the MaxCompute V2.0 data type edition.

Instructions

After you develop a Java UDAF, you can use MaxCompute SQL to call this UDAF. For more information about how to develop a Java UDAF, see Development process. You can use one of the following methods to call the Java UDAF:
  • Use a UDF in a MaxCompute project: The method is similar to that of using built-in functions.
  • Use a UDF across projects: Use a UDF of Project B in Project A. The following statement shows an example: select B:udf_in_other_project(arg0, arg1) as res from table_t;. For more information about resource sharing across projects, see Package-based resource sharing across projects.

For more information about how to use MaxCompute Studio to develop and call a Java UDAF, see Example.

Example

This example describes how to develop a UDAF named AggrAvg by using MaxCompute Studio. The AggrAvg UDAF is used to calculate average values. The following figure shows the logic of the AggrAvg UDAF.

Function logic
  1. Slice the input data. MaxCompute slices the input data into the specified size based on the MapReduce processing workflow. The size of each slice is suitable for a worker to finish the calculation within a specified period of time.

    You can configure the odps.stage.mapper.split.size parameter to adjust the size of the slices. For more information about the logic of data slicing, see Process.

  2. Each worker counts the number of data records and total data volume in a slice. You can use the number of data records and total data volume in each slice as an intermediate result.
  3. Each worker collects the information of each slice generated in Step 2.
  4. In the final output, r.sum/r.count is the average value of all input data.

To use MaxCompute Studio to develop and call a Java UDAF, perform the following steps:

  1. Make the following preparations on IntelliJ IDEA:
    1. Install MaxCompute Studio.
    2. Establish a connection to a MaxCompute project.
    3. Create a MaxCompute Java module.
  2. Write UDAF code.
    1. In the left-side navigation pane of the Project tab, choose src > main > java, right-click java, and then choose New > MaxCompute Java. Create a Java class
    2. In the Create new MaxCompute java class dialog box, click UDAF, enter a class name in the Name field, and then press Enter. In this example, the Java class is named AggrAvg. Create a Java class

      Name: the name of the MaxCompute Java class. If no package is created, configure this parameter in the Package name.Class name format. The system automatically creates a package.

    3. Write code in the code editor. Write codeSample UDAF code:
      import java.io.DataInput;
      import java.io.DataOutput;
      import java.io.IOException;
      import com.aliyun.odps.io.DoubleWritable;
      import com.aliyun.odps.io.Writable;
      import com.aliyun.odps.udf.Aggregator;
      import com.aliyun.odps.udf.UDFException;
      import com.aliyun.odps.udf.annotation.Resolve;
      @Resolve("double->double")
      public class AggrAvg extends Aggregator {
        private static class AvgBuffer implements Writable {
          private double sum = 0;
          private long count = 0;
          @Override
          public void write(DataOutput out) throws IOException {
            out.writeDouble(sum);
            out.writeLong(count);
          }
          @Override
          public void readFields(DataInput in) throws IOException {
            sum = in.readDouble();
            count = in.readLong();
          }
        }
        private DoubleWritable ret = new DoubleWritable();
        @Override
        public Writable newBuffer() {
          return new AvgBuffer();
        }
        @Override
        public void iterate(Writable buffer, Writable[] args) throws UDFException {
          DoubleWritable arg = (DoubleWritable) args[0];
          AvgBuffer buf = (AvgBuffer) buffer;
          if (arg != null) {
            buf.count += 1;
            buf.sum += arg.get();
          }
        }
        @Override
        public Writable terminate(Writable buffer) throws UDFException {
          AvgBuffer buf = (AvgBuffer) buffer;
          if (buf.count == 0) {
            ret.set(0);
          } else {
            ret.set(buf.sum / buf.count);
          }
          return ret;
        }
        @Override
        public void merge(Writable buffer, Writable partial) throws UDFException {
          AvgBuffer buf = (AvgBuffer) buffer;
          AvgBuffer p = (AvgBuffer) partial;
          buf.sum += p.sum;
          buf.count += p.count;
        }
      }
  3. Debug the UDAF on your on-premises machine and verify that the code runs as expected.

    For more information about how to debug a UDAF, see Perform a local run to debug the UDF.

    Debug a UDAF
    Note The parameter settings in the preceding figure are for reference.
  4. Package the UDAF code into a JAR file, upload the file to your MaxCompute project, and then create the UDAF. In this example, a UDAF named user_udaf is created.

    For more information about how to package a UDAF, see Package the code.

    Package a UDAF
  5. In the left-side navigation pane of MaxCompute Studio, click Project Explorer. Right-click your MaxCompute project to start the MaxCompute client, and execute an SQL statement to call the created UDAF.
    The following example shows the data structure of the my_table table that you want to query.
    +------------+------------+
    | col0       | col1       |
    +------------+------------+
    | 1.2        | 2.0        |
    | 1.6        | 2.1        |
    +------------+------------+
    Execute the following statement to call the UDAF:
    select user_udaf(col0) as c0 from my_table;
    The following result is returned:
    +----+
    | c0 |
    +----+
    | 1.4|
    +----+