All Products
Search
Document Center

Realtime Compute for Apache Flink:Java UDAFs

Last Updated:Mar 26, 2026

A user-defined aggregate function (UDAF) reduces multiple input rows into a single output value — a many-to-one mapping. Use UDAFs when built-in aggregate functions like SUM or MAX don't cover your aggregation logic.

For background on user-defined functions in Flink, see User-defined Functions.

User-defined Functions and ASI_UDX_Demo are hosted on third-party websites and may be slow or unavailable at times.

How it works

A UDAF uses an accumulator to store intermediate aggregation state. For each group of rows sharing the same GROUP BY key, the runtime calls three methods in sequence:

  1. createAccumulator() — creates a fresh accumulator with initial state.

  2. accumulate(acc, ...) — called once per input row to update the accumulator.

  3. getValue(acc) — called after all rows are processed to return the final result.

For example, if a table has a numeric column and you want a cumulative sum for values 1, 2, and 3 in the same group:

  • The runtime calls createAccumulator() once to initialize the accumulator with sum = 0.

  • It calls accumulate() for each row, updating sum to 1, then 3, then 6.

  • It calls getValue() to return the final result.

The output depends on whether mini-batch is enabled:

  • Without mini-batch (default): one output per row — 1, 3, 6.

  • With mini-batch enabled: the final result only — 6. The number of intermediate outputs varies based on mini-batch settings and data distribution.

For mini-batch configuration, see Optimize Flink SQL.

Required methods

Every AggregateFunction implementation must define these three methods:

Method Purpose
createAccumulator() Returns a new accumulator with initial state
accumulate(acc, ...) Updates the accumulator with one input row
getValue(acc) Returns the final aggregated result

Additional methods are available depending on your use case:

Method Purpose
retract(acc, ...) Supports retracting a message generated by an upstream operator
merge(acc, iterable) Supports local-global two-stage aggregate optimization

Create a UDAF

Realtime Compute for Apache Flink provides a UDF demo project (ASI_UDX_Demo) with a pre-configured development environment, so no environment setup is required.

The demo project includes sample implementations for user-defined scalar functions (UDSFs), UDAFs, and user-defined table-valued functions (UDTFs).

Prerequisites

Before you begin, make sure you have:

  • IntelliJ IDEA installed

  • Maven installed

  • Java development environment configured

Steps

  1. Download and decompress ASI_UDX_Demo to your local machine. The decompressed folder ASI_UDX-main contains:

    • pom.xml — Maven project configuration, including coordinates, dependencies, and build rules.

    • \ASI_UDX-main\src\main\java\ASI_UDAF\ASI_UDAF.java — sample UDAF implementation.

  2. In IntelliJ IDEA, select File > Open, then select the ASI_UDX-main folder.

  3. Open pom.xml in the \ASI_UDX-main\ directory and configure dependencies. The file includes the minimum dependency for Flink 1.12:

    • If your job has no additional dependencies, skip to the next step.

    • If your job requires additional dependencies, add them to pom.xml.

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>1.12.7</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>

    Use the latest minor version for the Apache Flink major version that matches your Ververica Runtime (VVR) version. For VVR-to-Flink version mappings, see Overview.

  4. Open \ASI_UDX-main\src\main\java\ASI_UDAF\ASI_UDAF.java and implement your aggregation logic. The sample implements cumulative summation:

    package ASI_UDAF;
    
    import org.apache.flink.table.functions.AggregateFunction;
    
    import java.util.Iterator;
    
    public class ASI_UDAF{
        public static class AccSum{
            public long sum;
        }
    
        public static class MySum extends AggregateFunction<Long, AccSum>{
    
            @Override
            public Long getValue(AccSum acSum){
                return acSum.sum;
            }
    
            @Override
            public AccSum createAccumulator(){
                AccSum acCount= new AccSum();
                acCount.sum=0;
                return acCount;
            }
    
            public void accumulate(AccSum acc,long num){
                acc.sum += num;
            }
    
            /**
            * Supports retracting a message generated by an upstream operator.
            */
            public void retract(AccSum acc,long num){
                acc.sum -= num;
            }
    
            /**
            * Supports local-global two-stage aggregate optimization.
            */
            public void merge(AccSum acc,Iterable<AccSum> it){
                Iterator<AccSum> iter=it.iterator();
                while(iter.hasNext()){
                    AccSum accSum=iter.next();
                    if(null!=accSum){
                        acc.sum+=accSum.sum;
                    }
                }
            }
        }
    }
  5. From the directory containing pom.xml, run:

    mvn package -Dcheckstyle.skip

    The UDAF is packaged successfully when ASI_UDX-1.0-SNAPSHOT.jar appears in \ASI_UDX-main\target\.

Use a UDAF

Two methods are available for using a UDAF in SQL deployments. The table below summarizes the key differences:

Aspect Method 1: Registered UDAF Method 2: Deployment-level JAR
Scope Available across multiple deployments Single deployment only
How to register Register via the UDF management page Upload JAR under Additional dependency files in the deployment
How to call in SQL Call by registered name (no CREATE TEMPORARY FUNCTION needed) Define an alias with CREATE TEMPORARY FUNCTION … AS 'fully.qualified.ClassName'
Reusability High — suitable for shared business logic Low — tied to one deployment

Method 1: Use a registered UDAF (recommended)

Register the UDAF once, then reuse it across deployments. For registration steps, see Manage UDFs.

Once registered as ASI_UDAF$MySum, call it directly in your SQL:

CREATE TEMPORARY TABLE ASI_UDAF_Source (
  a BIGINT NOT NULL
) WITH (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE ASI_UDAF_Sink (
  sum  BIGINT
) WITH (
  'connector' = 'print'
);

INSERT INTO ASI_UDAF_Sink
SELECT `ASI_UDAF$MySum`(a)
FROM ASI_UDAF_Source;

Method 2: Upload a JAR to a specific deployment

On the Flink Data Studio > ETL page, upload the JAR package using Additional dependency files under More configurations. Then define a temporary function alias in the job's SQL.

The JAR is scoped to that deployment only and cannot be shared with other deployments.

If the temporary function is named mysum:

CREATE TEMPORARY TABLE ASI_UDAF_Source (
  a   BIGINT
) WITH (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE ASI_UDAF_Sink (
  sum  BIGINT
) WITH (
  'connector' = 'print'
);

CREATE TEMPORARY FUNCTION `mysum` AS 'ASI_UDAF.ASI_UDAF$MySum'; -- Create the temporary function mysum.

INSERT INTO ASI_UDAF_Sink
SELECT `mysum`(a)
FROM ASI_UDAF_Source;

Run the job

After developing and deploying the SQL job, go to Operation Center > Job O&M. Find the target job and click Start in the Operation column.

After the job starts, the UDAF aggregates the a field from ASI_UDAF_Source and writes the cumulative sum into ASI_UDAF_Sink.

What's next

  • Manage UDFs — register and manage UDAFs for reuse across deployments.

  • Optimize Flink SQL — configure mini-batch to control how intermediate results are emitted.

  • Overview — find the Apache Flink version that matches your VVR version.