A user-defined aggregate function (UDAF) reduces multiple input rows into a single output value — a many-to-one mapping. Use UDAFs when built-in aggregate functions like SUM or MAX don't cover your aggregation logic.
For background on user-defined functions in Flink, see User-defined Functions.
User-defined Functions and ASI_UDX_Demo are hosted on third-party websites and may be slow or unavailable at times.
How it works
A UDAF uses an accumulator to store intermediate aggregation state. For each group of rows sharing the same GROUP BY key, the runtime calls three methods in sequence:
-
createAccumulator()— creates a fresh accumulator with initial state. -
accumulate(acc, ...)— called once per input row to update the accumulator. -
getValue(acc)— called after all rows are processed to return the final result.
For example, if a table has a numeric column and you want a cumulative sum for values 1, 2, and 3 in the same group:
-
The runtime calls
createAccumulator()once to initialize the accumulator withsum = 0. -
It calls
accumulate()for each row, updatingsumto1, then3, then6. -
It calls
getValue()to return the final result.
The output depends on whether mini-batch is enabled:
-
Without mini-batch (default): one output per row —
1,3,6. -
With mini-batch enabled: the final result only —
6. The number of intermediate outputs varies based on mini-batch settings and data distribution.
For mini-batch configuration, see Optimize Flink SQL.
Required methods
Every AggregateFunction implementation must define these three methods:
| Method | Purpose |
|---|---|
createAccumulator() |
Returns a new accumulator with initial state |
accumulate(acc, ...) |
Updates the accumulator with one input row |
getValue(acc) |
Returns the final aggregated result |
Additional methods are available depending on your use case:
| Method | Purpose |
|---|---|
retract(acc, ...) |
Supports retracting a message generated by an upstream operator |
merge(acc, iterable) |
Supports local-global two-stage aggregate optimization |
Create a UDAF
Realtime Compute for Apache Flink provides a UDF demo project (ASI_UDX_Demo) with a pre-configured development environment, so no environment setup is required.
The demo project includes sample implementations for user-defined scalar functions (UDSFs), UDAFs, and user-defined table-valued functions (UDTFs).
Prerequisites
Before you begin, make sure you have:
-
IntelliJ IDEA installed
-
Maven installed
-
Java development environment configured
Steps
-
Download and decompress ASI_UDX_Demo to your local machine. The decompressed folder
ASI_UDX-maincontains:-
pom.xml— Maven project configuration, including coordinates, dependencies, and build rules. -
\ASI_UDX-main\src\main\java\ASI_UDAF\ASI_UDAF.java— sample UDAF implementation.
-
-
In IntelliJ IDEA, select File > Open, then select the
ASI_UDX-mainfolder. -
Open
pom.xmlin the\ASI_UDX-main\directory and configure dependencies. The file includes the minimum dependency for Flink 1.12:-
If your job has no additional dependencies, skip to the next step.
-
If your job requires additional dependencies, add them to
pom.xml.
<dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>1.12.7</version> <scope>provided</scope> </dependency> </dependencies>Use the latest minor version for the Apache Flink major version that matches your Ververica Runtime (VVR) version. For VVR-to-Flink version mappings, see Overview.
-
-
Open
\ASI_UDX-main\src\main\java\ASI_UDAF\ASI_UDAF.javaand implement your aggregation logic. The sample implements cumulative summation:package ASI_UDAF; import org.apache.flink.table.functions.AggregateFunction; import java.util.Iterator; public class ASI_UDAF{ public static class AccSum{ public long sum; } public static class MySum extends AggregateFunction<Long, AccSum>{ @Override public Long getValue(AccSum acSum){ return acSum.sum; } @Override public AccSum createAccumulator(){ AccSum acCount= new AccSum(); acCount.sum=0; return acCount; } public void accumulate(AccSum acc,long num){ acc.sum += num; } /** * Supports retracting a message generated by an upstream operator. */ public void retract(AccSum acc,long num){ acc.sum -= num; } /** * Supports local-global two-stage aggregate optimization. */ public void merge(AccSum acc,Iterable<AccSum> it){ Iterator<AccSum> iter=it.iterator(); while(iter.hasNext()){ AccSum accSum=iter.next(); if(null!=accSum){ acc.sum+=accSum.sum; } } } } } -
From the directory containing
pom.xml, run:mvn package -Dcheckstyle.skipThe UDAF is packaged successfully when
ASI_UDX-1.0-SNAPSHOT.jarappears in\ASI_UDX-main\target\.
Use a UDAF
Two methods are available for using a UDAF in SQL deployments. The table below summarizes the key differences:
| Aspect | Method 1: Registered UDAF | Method 2: Deployment-level JAR |
|---|---|---|
| Scope | Available across multiple deployments | Single deployment only |
| How to register | Register via the UDF management page | Upload JAR under Additional dependency files in the deployment |
| How to call in SQL | Call by registered name (no CREATE TEMPORARY FUNCTION needed) |
Define an alias with CREATE TEMPORARY FUNCTION … AS 'fully.qualified.ClassName' |
| Reusability | High — suitable for shared business logic | Low — tied to one deployment |
Method 1: Use a registered UDAF (recommended)
Register the UDAF once, then reuse it across deployments. For registration steps, see Manage UDFs.
Once registered as ASI_UDAF$MySum, call it directly in your SQL:
CREATE TEMPORARY TABLE ASI_UDAF_Source (
a BIGINT NOT NULL
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE ASI_UDAF_Sink (
sum BIGINT
) WITH (
'connector' = 'print'
);
INSERT INTO ASI_UDAF_Sink
SELECT `ASI_UDAF$MySum`(a)
FROM ASI_UDAF_Source;
Method 2: Upload a JAR to a specific deployment
On the Flink Data Studio > ETL page, upload the JAR package using Additional dependency files under More configurations. Then define a temporary function alias in the job's SQL.
The JAR is scoped to that deployment only and cannot be shared with other deployments.
If the temporary function is named mysum:
CREATE TEMPORARY TABLE ASI_UDAF_Source (
a BIGINT
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE ASI_UDAF_Sink (
sum BIGINT
) WITH (
'connector' = 'print'
);
CREATE TEMPORARY FUNCTION `mysum` AS 'ASI_UDAF.ASI_UDAF$MySum'; -- Create the temporary function mysum.
INSERT INTO ASI_UDAF_Sink
SELECT `mysum`(a)
FROM ASI_UDAF_Source;
Run the job
After developing and deploying the SQL job, go to Operation Center > Job O&M. Find the target job and click Start in the Operation column.
After the job starts, the UDAF aggregates the a field from ASI_UDAF_Source and writes the cumulative sum into ASI_UDAF_Sink.
What's next
-
Manage UDFs — register and manage UDAFs for reuse across deployments.
-
Optimize Flink SQL — configure mini-batch to control how intermediate results are emitted.
-
Overview — find the Apache Flink version that matches your VVR version.