This topic describes how to build a development environment, write business logic code, and publish a user-defined aggregation function (UDAF) in Realtime Compute.

Notice Currently, only Realtime Compute in exclusive mode supports user-defined extensions (UDXs).

Definition

A UDAF aggregates multiple values into a single value.

Methods of the UDAF abstract class

The following code shows the core methods of the AggregateFunction class.

Note A UDAF can be implemented in Java or Scala. However, we recommend that you use Java because Scala data types may cause unnecessary performance overhead.
  • createAccumulator and getValue methods
    /*
    * @param <T> The type of the output returned by a UDAF.
    * @param <ACC> The accumulator type of a UDAF. An accumulator stores the intermediate aggregation results of a UDAF. You can design an accumulator for each UDAF as required.
    */
    public abstract class AggregateFunction<T, ACC> extends UserDefinedFunction {
    /*
    * Initialize the accumulator in AggregateFunction.
    * The system calls the following method before it aggregates data for the first time.
    */
    public ACC createAccumulator();
    /*
    * The system calls the following method after each aggregation is complete.
    */
    public T getValue(ACC accumulator);
    }
    Note
    • The createAccumulator and getValue methods can be defined in the AggregateFunction abstract class.
    • A UDAF must contain at least one accumulate method.
  • accumulate method
    public void accumulate(ACC accumulator, ...[ user input]...) ;
    Note
    • You must implement an accumulate method to describe how to compute input data and update an accumulator to the aggregation result.
    • The first parameter of the accumulate method must be an accumulator of the ACC type defined in AggregateFunction. During the code execution, the underlying runtime code sends the previous accumulator and the specified upstream data to the accumulate method for aggregation. The upstream data can be of any type and contains any number of data records.
  • retract and merge methods

    The createAccumulator, getValue, and accumulate methods can be used together to design a basic UDAF. However, Realtime Compute also requires the retract and merge methods in some special scenarios.

    In most scenarios, computing is an early firing for an infinite stream. To refine the early fired results, you can implement a retract method. The SQL optimizer automatically determines the conditions in which data needs to be retracted and the operations needed to process data marked with retract tags. You must implement a retract method to retract input data.
    public void retract(ACC accumulator, ...[ user input]...) ;
    Note
    • The retract method is a reverse operation of the accumulate method. For example, in a count UDAF, the computing result increases by one each time the accumulate method is called to process a data record, whereas the result decreases by one each time the retract method is called to process a data record.
    • Similar to the accumulate method, the first parameter of the retract method must be an accumulator of the ACC type defined in AggregateFunction. During the code execution, the underlying runtime code sends the previous accumulator and the specified upstream data to the retract method for aggregation. The upstream data can be of any type and contain any number of data records.
    Realtime Compute requires the merge function in some scenarios. For example, when you use a session window to aggregate data, you must use the merge method. Realtime Compute can process out-of-order data. Newly arrived data may fill the gap between two separate sessions, which results in the merge of the two sessions. In this case, you must use the merge method to integrate multiple accumulators into one accumulator.
    public void merge(ACC accumulator, Iterable<ACC> its);
    Note
    • The first parameter of the merge method must be an accumulator of the ACC type defined in AggregateFunction. After the merge method is executed, the state data of AggregateFunction is stored in the first accumulator.
    • The second parameter of the merge method is an iterator of one or more accumulators of the ACC type.

Build a development environment

For more information, see Build the development environment.

Write business logic code

An example of Java code is as follows:

import org.apache.flink.table.functions.AggregateFunction;

public class CountUdaf extends AggregateFunction<Long, CountUdaf.CountAccum> {
    // Define the data schema of the accumulator that stores the state data of a count UDAF.
    public static class CountAccum {
        public long total;
    }

    // Initialize the accumulator of the count UDAF.
    public CountAccum createAccumulator() {
        CountAccum acc = new CountAccum();
        acc.total = 0;
        return acc;
    }

    // Call the getValue method to obtain the result of the count UDAF from the accumulator that stores the UDAF state data.
    public Long getValue(CountAccum accumulator) {
        return accumulator.total;
    }

    // Call the accumulate method to update the accumulator that stores the state data of the count UDAF based on the input data.
    public void accumulate(CountAccum accumulator, Object iValue) {
        accumulator.total++;
    }

    public void merge(CountAccum accumulator, Iterable<CountAccum> its) {
         for (CountAccum other : its) {
            accumulator.total += other.total;
         }
    }
}
Note The open and close methods are optional for a subclass of AggregateFunction. For more information, see the examples of a user-defined function (UDF) or user-defined table function (UDTF).

Publish and run a UDAF

For more information, see Develop a job and Publish a job.

Sample code of a UDAF

-- Uses a UDAF to calculate the count.
CREATE FUNCTION countUdaf AS 'com.hjc.test.blink.sql.udx.CountUdaf';
create table sls_stream(
a int,
b bigint,
c varchar
) with (
type='sls',
endPoint='yourEndpoint',
accessKeyId='yourAccessId',
accessKeySecret='yourAccessSecret',
startTime = '2017-07-04 00:00:00',
project='<yourPorjectName>',
logStore='stream-test2',
consumerGroup='consumerGroupTest3'
);

create table rds_output(
len1 bigint,
len2 bigint
) with (
type='rds',
url='yourDatabaseURL',
tableName='<yourDatabaseTableName>',
userName='<yourDatabaseUserName>',
password='<yourDatabasePassword>'
);

insert into rds_output
select
count(a),
countUdaf(a)
from sls_stream