Standard SQL aggregate functions cover most grouping scenarios, but they cannot sort aggregated values by a column that is not a group key. This tutorial shows you how to build a user-defined aggregate function (UDAF) in Realtime Compute for Apache Flink that collects status values per user and returns them sorted in ascending order by event time. Using residential power grid terminal data as the example, the tutorial covers every step from setting up a data source to querying the final result.
Steps at a glance:
How it works
A UDAF maps scalar values from multiple rows to a single scalar value. The aggregation logic is built around an accumulator — an intermediate data structure that stores aggregated values until the final result is computed.
For each group of rows to aggregate, Flink calls three core methods in sequence:
| Method | When Flink calls it | What it does |
|---|---|---|
createAccumulator() |
Once per group, before processing any row | Creates and returns an empty accumulator |
accumulate(acc, ...) |
Once per input row | Updates the accumulator with the row's values |
getValue(acc) |
After all rows in a group are processed | Computes and returns the final result |
Depending on your use case, you may also need to implement these optional methods:
| Method | When to implement |
|---|---|
retract(acc, ...) |
Required when the query can generate retraction messages — for example, in group aggregations or outer joins. Implement this even when your initial use case does not need it, so the UDAF works in any context. |
merge(acc, ...) |
Required for session window and hop window aggregations. |
In this tutorial, the UDAF concatenates each status value with its Unix timestamp using # as a separator, accumulates these strings into a list, then sorts the list by timestamp and extracts the status values to return a comma-separated string.
Sample data
The source table electric_info stores power grid terminal events with four fields: event_id, user_id, event_time, and status. The goal is to aggregate all status values per user, sorted in ascending order by event_time.
Source table: `electric_info`
| event_id | user_id | event_time | status |
|---|---|---|---|
| 1 | 1222 | 2023-06-30 11:14:00 | LD |
| 2 | 1333 | 2023-06-30 11:12:00 | LD |
| 3 | 1222 | 2023-06-30 11:11:00 | TD |
| 4 | 1333 | 2023-06-30 11:12:00 | LD |
| 5 | 1222 | 2023-06-30 11:15:00 | TD |
| 6 | 1333 | 2023-06-30 11:18:00 | LD |
| 7 | 1222 | 2023-06-30 11:19:00 | TD |
| 8 | 1333 | 2023-06-30 11:10:00 | TD |
| 9 | 1555 | 2023-06-30 11:16:00 | TD |
| 10 | 1555 | 2023-06-30 11:17:00 | LD |
Expected result
| user_id | status |
|---|---|
| 1222 | TD,LD,TD,TD |
| 1333 | TD,LD,LD,LD |
| 1555 | TD,LD |
Prerequisites
Before you begin, make sure that you have:
-
A Realtime Compute for Apache Flink workspace
-
An ApsaraDB RDS for MySQL instance in the same VPC as the Flink workspace. If they are in different VPCs, see Network connectivity
Step 1: Prepare a data source
This example uses ApsaraDB RDS for MySQL as the data source.
-
Create a database and an account. Create a database named electric and an account with read and write permissions on the
electricdatabase. Either a privileged account or a standard account works. -
Log on to the ApsaraDB RDS for MySQL instance using DMS. In the
electricdatabase, run the following SQL to create the two tables and insert the sample data:CREATE TABLE `electric_info` ( event_id bigint NOT NULL PRIMARY KEY COMMENT 'Event ID', user_id bigint NOT NULL COMMENT 'User ID', event_time timestamp NOT NULL COMMENT 'Event time', status varchar(10) NOT NULL COMMENT 'User terminal status' ); CREATE TABLE `electric_info_SortListAgg` ( user_id bigint NOT NULL PRIMARY KEY COMMENT 'User ID', status_sort varchar(50) NULL COMMENT 'User terminal status sorted in ascending order by event time' ); -- Insert sample data INSERT INTO electric_info VALUES (1, 1222, '2023-06-30 11:14', 'LD'), (2, 1333, '2023-06-30 11:12', 'LD'), (3, 1222, '2023-06-30 11:11', 'TD'), (4, 1333, '2023-06-30 11:12', 'LD'), (5, 1222, '2023-06-30 11:15', 'TD'), (6, 1333, '2023-06-30 11:18', 'LD'), (7, 1222, '2023-06-30 11:19', 'TD'), (8, 1333, '2023-06-30 11:10', 'TD'), (9, 1555, '2023-06-30 11:16', 'TD'), (10, 1555, '2023-06-30 11:17', 'LD');
Step 2: Register a UDF
-
Download ASI_UDX-1.0-SNAPSHOT.jar. The
pom.xmlfile in the JAR is configured with the minimum dependencies for Flink 1.17.1. For general information about user-defined functions (UDFs), see User-defined functions. -
Review and optionally modify the UDAF source code below. The
SortListAggclass concatenates each row'sstatusand Unix timestamp with a#separator, accumulates them into a list, sorts by the timestamp portion, and returns the sortedstatusvalues as a comma-separated string.The
retract()method is left empty in this example. If your Flink job uses group aggregations or outer joins that can emit retraction messages, implementretract()to remove previously accumulated values fromacc.list.package ASI_UDAF; import org.apache.commons.lang3.StringUtils; import org.apache.flink.table.functions.AggregateFunction; import java.util.ArrayList; import java.util.Comparator; import java.util.Iterator; import java.util.List; public class ASI_UDAF { // Accumulator: stores intermediate "status#unixTimestamp" strings public static class AcList { public List<String> list; } public static class SortListAgg extends AggregateFunction<String, AcList> { // Returns the final aggregated result public String getValue(AcList asc) { // Sort entries by the numeric timestamp (second part after "#") asc.list.sort(new Comparator<String>() { @Override public int compare(String o1, String o2) { return Integer.parseInt(o1.split("#")[1]) - Integer.parseInt(o2.split("#")[1]); } }); // Extract the status part (first part before "#") and join with commas List<String> ret = new ArrayList<String>(); Iterator<String> strlist = asc.list.iterator(); while (strlist.hasNext()) { ret.add(strlist.next().split("#")[0]); } return StringUtils.join(ret, ','); } // Creates an empty accumulator for a new group public AcList createAccumulator() { AcList ac = new AcList(); ac.list = new ArrayList<String>(); return ac; } // Adds one input row to the accumulator public void accumulate(AcList acc, String tuple1) { acc.list.add(tuple1); } // Retraction method — implement this if your query can produce retraction messages public void retract(AcList acc, String num) { } } } -
Open the UDF registration page. For Java UDFs, you can upload the JAR directly through the console instead of registering it manually. For details, see User-defined aggregate functions (UDAFs).
-
Log on to the Realtime Compute for Apache Flink console.
-
In the Actions column of the target workspace, click Console.
-
Click Data Development > ETL.
-
On the Functions tab on the left, click Register UDF.

-
-
In the Select File section, upload the downloaded JAR file, then click OK.
Note-
Your UDF JAR file is uploaded to the sql-artifacts directory of the OSS bucket.
-
The JAR is uploaded to the
sql-artifactsdirectory of your OSS bucket. The Flink development console parses the JAR to detect classes that implement the UDF, UDAF, or user-defined table-valued function (UDTF) interfaces, then automatically populates the Function Name field.

-
-
In the Manage Functions dialog box, click Create Function. The registered function appears in the Functions list on the left side of the SQL editor.
Step 3: Create a Flink job
-
On the Data Development > ETL page, click New.

-
Click Blank Stream Draft, then click Next.
-
In the New Job Draft dialog box, configure the following parameters:
Parameter Description File Name A unique name for the job within the current project. Storage Location The folder where the job is stored. Click the icon next to an existing folder to create a subfolder. Engine Version The Flink engine version. This must match the version specified in the pom.xmlfile. For version details, see Engine versions. -
Write the DDL and DML code. Replace the placeholder values in the connector configuration with your actual RDS endpoint and credentials. All examples use
CONCAT(status,'#',CAST(UNIX_TIMESTAMP(event_time) AS STRING))to pass a composite sort key toASI_UDAF$SortListAgg, so the UDAF can sort byevent_timeinternally.Placeholder Description Example <your-rds-private-endpoint>The private endpoint of the ApsaraDB RDS for MySQL instance rm-bp1s1xgll21******.mysql.rds.aliyuncs.com<your-username>The username for the MySQL database electric_user-- Source table: reads from the electric_info table in ApsaraDB RDS for MySQL CREATE TEMPORARY TABLE electric_info ( event_id bigint NOT NULL, `user_id` bigint NOT NULL, event_time timestamp(6) NOT NULL, status string NOT NULL, PRIMARY KEY (event_id) NOT ENFORCED ) WITH ( 'connector' = 'mysql', 'hostname' = '<your-rds-private-endpoint>', 'port' = '3306', 'username' = '<your-username>', 'password' = '${secret_values.mysql_pw}', 'database-name' = 'electric', 'table-name' = 'electric_info' ); -- Sink table: writes aggregated results to electric_info_sortlistagg CREATE TEMPORARY TABLE electric_info_sortlistagg ( `user_id` bigint NOT NULL, status_sort varchar(50) NOT NULL, PRIMARY KEY (user_id) NOT ENFORCED ) WITH ( 'connector' = 'mysql', 'hostname' = '<your-rds-private-endpoint>', 'port' = '3306', 'username' = '<your-username>', 'password' = '${secret_values.mysql_pw}', 'database-name' = 'electric', 'table-name' = 'electric_info_sortlistagg' ); -- Aggregate: group by user_id, sort status values by event_time ascending INSERT INTO electric_info_sortlistagg SELECT `user_id`, `ASI_UDAF$SortListAgg`(CONCAT(status, '#', CAST(UNIX_TIMESTAMP(event_time) AS STRING))) FROM electric_info GROUP BY user_id;Replace the following placeholders: The
passwordfield uses a project variable (mysql_pw) to avoid hardcoding credentials. For details, see Project variables. For all available MySQL connector parameters, see MySQL connector. -
(Optional) In the upper-right corner, click Deep Check and Debug to validate the job before deploying. For details, see Job development map.
-
Click Deploy, then click OK.
-
On the Operation Center > Jobs page, find the job, click Start in the Actions column, and select Stateless Start.
Step 4: Query the result
In ApsaraDB RDS for MySQL, run the following query to verify the output:
SELECT * FROM `electric_info_sortlistagg`;
The status_sort column shows each user's terminal status values sorted in ascending order by event time.
What's next
-
Built-in functions — explore the built-in aggregate and scalar functions available in Flink SQL
-
Deploy a job and Start a job — detailed job deployment and startup options
-
Configure job deployment information — tune runtime parameters, including options for dynamic scaling and parameter updates that reduce downtime
-
User-defined functions — use Python UDFs in Flink SQL jobs