All Products
Search
Document Center

MaxCompute:Java UDAFs

Last Updated:Mar 26, 2026

A user-defined aggregate function (UDAF) reduces multiple input rows into a single output value — the same behavior as built-in functions like SUM or AVG, but with custom logic you define in Java.

How it works

MaxCompute processes UDAFs in a distributed pipeline across three phases:

PhaseMethod calledWhat happens
Map (iterate)iterate()Each worker reads its partition of input rows and calls iterate() once per row, accumulating results into a local buffer.
Combine/shuffle (merge)merge()Workers exchange partial buffers. merge() folds a partial buffer from one worker into the main buffer, collapsing results across map tasks.
Reduce (terminate)terminate()After all buffers are merged, terminate() extracts the final output value from the buffer.

Because data moves between distributed workers, the aggregation buffer must be serializable. Implement the Writable interface to convert in-memory objects to byte sequences for network transmission and disk storage.

UDAF code structure

A Java UDAF consists of the following components:

ComponentRequiredDescription
Java packageNoPackages your classes into a JAR file
Base classesYescom.aliyun.odps.udf.Aggregator and com.aliyun.odps.udf.annotation.Resolve
com.aliyun.odps.udf.UDFExceptionNoUsed in initialization and termination methods
@Resolve annotationYesDeclares the input and output data types
Custom Java classYesExtends Aggregator; defines your buffer and logic

Required methods

Extend com.aliyun.odps.udf.Aggregator and implement the following methods:

import com.aliyun.odps.udf.ContextFunction;
import com.aliyun.odps.udf.ExecutionContext;
import com.aliyun.odps.udf.UDFException;

public abstract class Aggregator implements ContextFunction {
    // Called once before processing starts. Use for initialization.
    @Override
    public void setup(ExecutionContext ctx) throws UDFException {}

    // Called once after processing ends. Use for cleanup.
    @Override
    public void close() throws UDFException {}

    // Creates a new, empty aggregation buffer for each group.
    abstract public Writable newBuffer();

    // Map phase: called once per input row. Accumulate data into buffer.
    // args cannot be null, but individual values in args can be null (indicating SQL NULL input).
    abstract public void iterate(Writable buffer, Writable[] args) throws UDFException;

    // Combine/reduce phase: merges a partial buffer (partial) into the main buffer.
    abstract public void merge(Writable buffer, Writable partial) throws UDFException;

    // Reduce phase: extracts the final output value from the fully merged buffer.
    abstract public Writable terminate(Writable buffer) throws UDFException;
}

Mandatory vs. optional methods:

MethodRequiredWhen it runs
newBuffer()AlwaysBefore processing each group
iterate()AlwaysMap phase — once per input row
merge()AlwaysCombine/shuffle phase — combines partial buffers from different workers
terminate()AlwaysReduce phase — produces final output
setup()Only when you need initializationBefore the task starts
close()Only when you need cleanupAfter the task ends
iterate() and merge() share the same buffer instance within a group. Design your buffer so it can accumulate data from both without conflict.

Aggregation buffer

The aggregation buffer holds intermediate results as data flows through the distributed pipeline. Implement the Writable interface to make it serializable:

import com.aliyun.odps.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

private static class MyBuffer implements Writable {
    // Serializes buffer fields to a byte stream for network transmission.
    @Override
    public void write(DataOutput out) throws IOException {
        // Write each field in a fixed order.
    }

    // Deserializes fields back from the byte stream.
    @Override
    public void readFields(DataInput in) throws IOException {
        // Read each field in the same order as write().
    }
}

Read and write fields in the same order. A mismatch between write() and readFields() causes silent data corruption that is difficult to debug.

@Resolve annotation

Declare input and return types using the @Resolve annotation. MaxCompute checks type consistency during semantic parsing — a mismatch returns an error before the job runs.

Format:

@Resolve('<arg_type_list> -> <return_type>')
PartDescription
arg_type_listComma-separated input types. Set to * to accept any number of arguments, or leave blank to accept none.
return_typeA single return type. UDAFs always return one column.

Supported types include: BIGINT, STRING, DOUBLE, BOOLEAN, DATETIME, DECIMAL, FLOAT, BINARY, DATE, DECIMAL(precision,scale), CHAR, VARCHAR, ARRAY, MAP, STRUCT, and nested complex types.

Select data types based on the data type edition of your MaxCompute project. For details, see Data type editions.

Examples:

AnnotationInput typesReturn type
@Resolve('bigint,double->string')BIGINT or DOUBLESTRING
@Resolve('*->string')AnySTRING
@Resolve('->double')NoneDOUBLE
@Resolve('array<bigint>->struct<x:string, y:int>')ARRAY\<BIGINT\>STRUCT\<x:STRING, y:INT\>

Data types

Write UDAFs using Java object types or Java Writable types. The first letter of Java type names must be uppercase (for example, String, not string). Do not use primitive types — they cannot represent SQL NULL values, which MaxCompute maps to Java null.

Java Writable types as input or return types require MaxCompute V2.0 data type edition or later.
MaxCompute typeJava typeJava Writable type
TINYINTjava.lang.ByteByteWritable
SMALLINTjava.lang.ShortShortWritable
INTjava.lang.IntegerIntWritable
BIGINTjava.lang.LongLongWritable
FLOATjava.lang.FloatFloatWritable
DOUBLEjava.lang.DoubleDoubleWritable
DECIMALjava.math.BigDecimalBigDecimalWritable
BOOLEANjava.lang.BooleanBooleanWritable
STRINGjava.lang.StringText
VARCHARcom.aliyun.odps.data.VarcharVarcharWritable
BINARYcom.aliyun.odps.data.BinaryBytesWritable
DATEjava.sql.DateDateWritable
DATETIMEjava.util.DateDatetimeWritable
TIMESTAMPjava.sql.TimestampTimestampWritable
INTERVAL_YEAR_MONTHN/AIntervalYearMonthWritable
INTERVAL_DAY_TIMEN/AIntervalDayTimeWritable
ARRAYjava.util.ListN/A
MAPjava.util.MapN/A
STRUCTcom.aliyun.odps.data.StructN/A

Write a UDAF

Example 1: average (AggrAvg)

This example develops AggrAvg, a UDAF that calculates the average of a DOUBLE column, using MaxCompute Studio.

The buffer stores two fields: a running sum and a row count. iterate() accumulates each input row during the map phase, merge() combines partial buffers from different workers during the combine phase, and terminate() divides the total sum by the total count during the reduce phase.

求平均值逻辑

The input data is sliced and distributed to workers. You can configure the odps.stage.mapper.split.size parameter to adjust the size of each slice. Each worker counts the number of records and total in its slice (intermediate result), workers collect slice information, and the final output computes r.sum / r.count as the average.

Prerequisites

Before you begin, make sure you have:

Create the UDAF class

  1. In the Project tab, navigate to src > main > java, right-click java, and choose New > MaxCompute Java.

    新建Java Class

  2. In the Create new MaxCompute java class dialog box, click UDAF, enter a class name in the Name field, and press Enter. This example uses AggrAvg. If you have not created a package, specify the Name field in packagename.classname format. The system generates the package automatically.

    创建Java Class

  3. Write the UDAF code in the editor.

    编写代码

    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    import com.aliyun.odps.io.DoubleWritable;
    import com.aliyun.odps.io.Writable;
    import com.aliyun.odps.udf.Aggregator;
    import com.aliyun.odps.udf.UDFException;
    import com.aliyun.odps.udf.annotation.Resolve;
    
    // Input: one DOUBLE column. Output: DOUBLE average.
    @Resolve("double->double")
    public class AggrAvg extends Aggregator {
    
      // Buffer roles: iterate() accumulates (IN -> BUF); terminate() extracts (BUF -> OUT).
      private static class AvgBuffer implements Writable {
        private double sum = 0;   // running total (BUF)
        private long count = 0;   // row count (BUF)
    
        @Override
        public void write(DataOutput out) throws IOException {
          out.writeDouble(sum);
          out.writeLong(count);
        }
    
        @Override
        public void readFields(DataInput in) throws IOException {
          sum = in.readDouble();
          count = in.readLong();
        }
      }
    
      private DoubleWritable ret = new DoubleWritable();
    
      @Override
      public Writable newBuffer() {
        return new AvgBuffer();
      }
    
      // Map phase: accumulate each non-null input row into the buffer.
      @Override
      public void iterate(Writable buffer, Writable[] args) throws UDFException {
        DoubleWritable arg = (DoubleWritable) args[0];
        AvgBuffer buf = (AvgBuffer) buffer;
        if (arg != null) {
          buf.count += 1;
          buf.sum += arg.get();
        }
      }
    
      // Combine phase: merge a partial buffer from another worker into this buffer.
      @Override
      public void merge(Writable buffer, Writable partial) throws UDFException {
        AvgBuffer buf = (AvgBuffer) buffer;
        AvgBuffer p = (AvgBuffer) partial;
        buf.sum += p.sum;
        buf.count += p.count;
      }
    
      // Reduce phase: compute the average from the fully merged buffer.
      @Override
      public Writable terminate(Writable buffer) throws UDFException {
        AvgBuffer buf = (AvgBuffer) buffer;
        if (buf.count == 0) {
          ret.set(0);
        } else {
          ret.set(buf.sum / buf.count);
        }
        return ret;
      }
    }

Debug locally

Run the UDAF on your local machine to verify the logic before deploying. For debugging steps, see the "Perform a local run to debug the UDF" section in Develop a UDF.

调试UDAF
The parameter settings in the preceding figure are for reference only.

Package and register the UDAF

Package the UDAF code into a JAR file, upload it to your MaxCompute project, and create the UDAF. For packaging steps, see the "Procedure" section in Package a Java program, upload the package, and create a MaxCompute UDF.

This example registers the UDAF as user_udaf.

打包

Call the UDAF

In the Project Explorer, right-click your MaxCompute project to open the MaxCompute client, then run a SQL statement to call the UDAF.

Given this input table (my_table):

+------------+------------+
| col0       | col1       |
+------------+------------+
| 1.2        | 2.0        |
| 1.6        | 2.1        |
+------------+------------+

Run the following statement:

SELECT user_udaf(col0) AS c0 FROM my_table;

Result:

+----+
| c0 |
+----+
| 1.4|
+----+

Example 2: string concatenation (AggrConcat)

AggrAvg uses a simple numeric buffer. This example shows a more realistic pattern: collecting string values from multiple rows and concatenating them with a separator.

The buffer stores a StringBuilder and a separator string. Because StringBuilder is not directly serializable, the buffer serializes it as a plain String. This is the pattern to follow whenever your buffer holds a non-primitive type — serialize to a wire-safe form in write() and reconstruct the in-memory form in readFields().

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import com.aliyun.odps.io.Text;
import com.aliyun.odps.io.Writable;
import com.aliyun.odps.udf.Aggregator;
import com.aliyun.odps.udf.UDFException;
import com.aliyun.odps.udf.annotation.Resolve;

// Input: STRING column, STRING separator. Output: concatenated STRING.
@Resolve("string,string->string")
public class AggrConcat extends Aggregator {

  // Buffer: accumulated text and the separator to use between values.
  private static class ConcatBuffer implements Writable {
    private StringBuilder sb = new StringBuilder();
    private String separator = ",";

    @Override
    public void write(DataOutput out) throws IOException {
      // Serialize StringBuilder as a String so it survives network transfer.
      out.writeUTF(sb.toString());
      out.writeUTF(separator);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
      sb = new StringBuilder(in.readUTF());
      separator = in.readUTF();
    }
  }

  private Text ret = new Text();

  @Override
  public Writable newBuffer() {
    return new ConcatBuffer();
  }

  // Map phase: append each non-null value to the buffer.
  @Override
  public void iterate(Writable buffer, Writable[] args) throws UDFException {
    Text value = (Text) args[0];
    Text sep   = (Text) args[1];
    ConcatBuffer buf = (ConcatBuffer) buffer;
    if (value != null) {
      if (sep != null) {
        buf.separator = sep.toString();
      }
      if (buf.sb.length() > 0) {
        buf.sb.append(buf.separator);
      }
      buf.sb.append(value.toString());
    }
  }

  // Combine phase: merge a partial buffer from another worker.
  @Override
  public void merge(Writable buffer, Writable partial) throws UDFException {
    ConcatBuffer buf = (ConcatBuffer) buffer;
    ConcatBuffer p   = (ConcatBuffer) partial;
    if (p.sb.length() > 0) {
      if (buf.sb.length() > 0) {
        buf.sb.append(buf.separator);
      }
      buf.sb.append(p.sb);
    }
  }

  // Reduce phase: return the concatenated result.
  @Override
  public Writable terminate(Writable buffer) throws UDFException {
    ConcatBuffer buf = (ConcatBuffer) buffer;
    ret.set(buf.sb.toString());
    return ret;
  }
}

Key design decisions:

DecisionExplanation
Serialize StringBuilder as StringStringBuilder is not serializable. Convert to String in write() and rebuild in readFields().
Carry separator in the bufferThe separator is passed as an argument to iterate() but must survive serialization to merge(). Store it in the buffer.
Guard against empty partial in merge()If a worker's partition had no matching rows, its buffer is empty. Check before appending to avoid a leading separator.
String concatenation order is not guaranteed across workers because the map-phase partition assignment is determined by the execution engine, not by row order. If order matters, use an ORDER BY clause in your SQL statement and a single-reducer strategy, or sort the concatenated output afterward.

Call a UDAF in SQL

After developing and registering a Java UDAF, call it in MaxCompute SQL. For the full development process, see the "Development process" section in Overview.

Two calling methods are available:

Limitations

Internet access

By default, MaxCompute does not allow UDAFs to access the Internet. To enable Internet access, submit a network connection application form based on your business requirements. After approval, the MaxCompute technical support team will help you establish the connection. For instructions on filling in the form, see Network connection process.

VPC access

By default, MaxCompute does not allow UDAFs to access resources in a virtual private cloud (VPC). To enable VPC access, establish a network connection between MaxCompute and the VPC. See Use UDFs to access resources in VPCs.

Table read restrictions

UDFs, UDAFs, and user-defined table-valued functions (UDTFs) cannot read data from the following table types:

  • Tables on which schema evolution has been performed

  • Tables containing complex data types

  • Tables containing JSON data types

  • Transactional tables

Usage notes

  • Do not package classes with the same name but different logic into JAR files for different UDAFs. If two UDAFs share a class name (for example, com.aliyun.UserFunction.class) across separate JAR files and are called in the same SQL statement, MaxCompute loads the class from one file unpredictably — causing incorrect results or compilation failures.

  • Java data types in UDAFs are object types. The first letter must be uppercase (for example, String, not string).

  • NULL values in MaxCompute SQL map to null in Java. Do not use Java primitive types — they cannot represent null.

What's next

  • To add more complex UDFs such as table-valued functions, see Overview.

  • To learn about all supported data types and editions, see Data type editions.