All Products
Search
Document Center

Realtime Compute for Apache Flink:Sort and aggregate data with UDAFs

Last Updated:Mar 26, 2026

Standard SQL aggregate functions cover most grouping scenarios, but they cannot sort aggregated values by a column that is not a group key. This tutorial shows you how to build a user-defined aggregate function (UDAF) in Realtime Compute for Apache Flink that collects status values per user and returns them sorted in ascending order by event time. Using residential power grid terminal data as the example, the tutorial covers every step from setting up a data source to querying the final result.

Steps at a glance:

How it works

A UDAF maps scalar values from multiple rows to a single scalar value. The aggregation logic is built around an accumulator — an intermediate data structure that stores aggregated values until the final result is computed.

For each group of rows to aggregate, Flink calls three core methods in sequence:

Method When Flink calls it What it does
createAccumulator() Once per group, before processing any row Creates and returns an empty accumulator
accumulate(acc, ...) Once per input row Updates the accumulator with the row's values
getValue(acc) After all rows in a group are processed Computes and returns the final result

Depending on your use case, you may also need to implement these optional methods:

Method When to implement
retract(acc, ...) Required when the query can generate retraction messages — for example, in group aggregations or outer joins. Implement this even when your initial use case does not need it, so the UDAF works in any context.
merge(acc, ...) Required for session window and hop window aggregations.

In this tutorial, the UDAF concatenates each status value with its Unix timestamp using # as a separator, accumulates these strings into a list, then sorts the list by timestamp and extracts the status values to return a comma-separated string.

Sample data

The source table electric_info stores power grid terminal events with four fields: event_id, user_id, event_time, and status. The goal is to aggregate all status values per user, sorted in ascending order by event_time.

Source table: `electric_info`

event_id user_id event_time status
1 1222 2023-06-30 11:14:00 LD
2 1333 2023-06-30 11:12:00 LD
3 1222 2023-06-30 11:11:00 TD
4 1333 2023-06-30 11:12:00 LD
5 1222 2023-06-30 11:15:00 TD
6 1333 2023-06-30 11:18:00 LD
7 1222 2023-06-30 11:19:00 TD
8 1333 2023-06-30 11:10:00 TD
9 1555 2023-06-30 11:16:00 TD
10 1555 2023-06-30 11:17:00 LD

Expected result

user_id status
1222 TD,LD,TD,TD
1333 TD,LD,LD,LD
1555 TD,LD

Prerequisites

Before you begin, make sure that you have:

  • A Realtime Compute for Apache Flink workspace

  • An ApsaraDB RDS for MySQL instance in the same VPC as the Flink workspace. If they are in different VPCs, see Network connectivity

Step 1: Prepare a data source

This example uses ApsaraDB RDS for MySQL as the data source.

  1. Create an ApsaraDB RDS for MySQL instance.

  2. Create a database and an account. Create a database named electric and an account with read and write permissions on the electric database. Either a privileged account or a standard account works.

  3. Log on to the ApsaraDB RDS for MySQL instance using DMS. In the electric database, run the following SQL to create the two tables and insert the sample data:

    CREATE TABLE `electric_info` (
      event_id bigint NOT NULL PRIMARY KEY COMMENT 'Event ID',
      user_id bigint NOT NULL COMMENT 'User ID',
      event_time timestamp NOT NULL COMMENT 'Event time',
      status varchar(10) NOT NULL COMMENT 'User terminal status'
    );
    
    CREATE TABLE `electric_info_SortListAgg` (
      user_id bigint NOT NULL PRIMARY KEY COMMENT 'User ID',
      status_sort varchar(50) NULL COMMENT 'User terminal status sorted in ascending order by event time'
    );
    
    -- Insert sample data
    INSERT INTO electric_info VALUES
    (1,  1222, '2023-06-30 11:14', 'LD'),
    (2,  1333, '2023-06-30 11:12', 'LD'),
    (3,  1222, '2023-06-30 11:11', 'TD'),
    (4,  1333, '2023-06-30 11:12', 'LD'),
    (5,  1222, '2023-06-30 11:15', 'TD'),
    (6,  1333, '2023-06-30 11:18', 'LD'),
    (7,  1222, '2023-06-30 11:19', 'TD'),
    (8,  1333, '2023-06-30 11:10', 'TD'),
    (9,  1555, '2023-06-30 11:16', 'TD'),
    (10, 1555, '2023-06-30 11:17', 'LD');

Step 2: Register a UDF

  1. Download ASI_UDX-1.0-SNAPSHOT.jar. The pom.xml file in the JAR is configured with the minimum dependencies for Flink 1.17.1. For general information about user-defined functions (UDFs), see User-defined functions.

  2. Review and optionally modify the UDAF source code below. The SortListAgg class concatenates each row's status and Unix timestamp with a # separator, accumulates them into a list, sorts by the timestamp portion, and returns the sorted status values as a comma-separated string.

    The retract() method is left empty in this example. If your Flink job uses group aggregations or outer joins that can emit retraction messages, implement retract() to remove previously accumulated values from acc.list.
    package ASI_UDAF;
    
    import org.apache.commons.lang3.StringUtils;
    import org.apache.flink.table.functions.AggregateFunction;
    
    import java.util.ArrayList;
    import java.util.Comparator;
    import java.util.Iterator;
    import java.util.List;
    
    public class ASI_UDAF {
    
        // Accumulator: stores intermediate "status#unixTimestamp" strings
        public static class AcList {
            public List<String> list;
        }
    
        public static class SortListAgg extends AggregateFunction<String, AcList> {
    
            // Returns the final aggregated result
            public String getValue(AcList asc) {
                // Sort entries by the numeric timestamp (second part after "#")
                asc.list.sort(new Comparator<String>() {
                    @Override
                    public int compare(String o1, String o2) {
                        return Integer.parseInt(o1.split("#")[1]) - Integer.parseInt(o2.split("#")[1]);
                    }
                });
                // Extract the status part (first part before "#") and join with commas
                List<String> ret = new ArrayList<String>();
                Iterator<String> strlist = asc.list.iterator();
                while (strlist.hasNext()) {
                    ret.add(strlist.next().split("#")[0]);
                }
                return StringUtils.join(ret, ',');
            }
    
            // Creates an empty accumulator for a new group
            public AcList createAccumulator() {
                AcList ac = new AcList();
                ac.list = new ArrayList<String>();
                return ac;
            }
    
            // Adds one input row to the accumulator
            public void accumulate(AcList acc, String tuple1) {
                acc.list.add(tuple1);
            }
    
            // Retraction method — implement this if your query can produce retraction messages
            public void retract(AcList acc, String num) {
            }
        }
    }
  3. Open the UDF registration page. For Java UDFs, you can upload the JAR directly through the console instead of registering it manually. For details, see User-defined aggregate functions (UDAFs).

    1. Log on to the Realtime Compute for Apache Flink console.

    2. In the Actions column of the target workspace, click Console.

    3. Click Data Development > ETL.

    4. On the Functions tab on the left, click Register UDF. image.png

  4. In the Select File section, upload the downloaded JAR file, then click OK.

    Note
    • Your UDF JAR file is uploaded to the sql-artifacts directory of the OSS bucket.

    • The JAR is uploaded to the sql-artifacts directory of your OSS bucket. The Flink development console parses the JAR to detect classes that implement the UDF, UDAF, or user-defined table-valued function (UDTF) interfaces, then automatically populates the Function Name field.

    Register UDF

  5. In the Manage Functions dialog box, click Create Function. The registered function appears in the Functions list on the left side of the SQL editor.

Step 3: Create a Flink job

  1. On the Data Development > ETL page, click New.

    image.png

  2. Click Blank Stream Draft, then click Next.

  3. In the New Job Draft dialog box, configure the following parameters:

    Parameter Description
    File Name A unique name for the job within the current project.
    Storage Location The folder where the job is stored. Click the icon next to an existing folder to create a subfolder.
    Engine Version The Flink engine version. This must match the version specified in the pom.xml file. For version details, see Engine versions.
  4. Write the DDL and DML code. Replace the placeholder values in the connector configuration with your actual RDS endpoint and credentials. All examples use CONCAT(status,'#',CAST(UNIX_TIMESTAMP(event_time) AS STRING)) to pass a composite sort key to ASI_UDAF$SortListAgg, so the UDAF can sort by event_time internally.

    Placeholder Description Example
    <your-rds-private-endpoint> The private endpoint of the ApsaraDB RDS for MySQL instance rm-bp1s1xgll21******.mysql.rds.aliyuncs.com
    <your-username> The username for the MySQL database electric_user
    -- Source table: reads from the electric_info table in ApsaraDB RDS for MySQL
    CREATE TEMPORARY TABLE electric_info (
      event_id   bigint NOT NULL,
      `user_id`  bigint NOT NULL,
      event_time timestamp(6) NOT NULL,
      status     string NOT NULL,
      PRIMARY KEY (event_id) NOT ENFORCED
    ) WITH (
      'connector'     = 'mysql',
      'hostname'      = '<your-rds-private-endpoint>',
      'port'          = '3306',
      'username'      = '<your-username>',
      'password'      = '${secret_values.mysql_pw}',
      'database-name' = 'electric',
      'table-name'    = 'electric_info'
    );
    
    -- Sink table: writes aggregated results to electric_info_sortlistagg
    CREATE TEMPORARY TABLE electric_info_sortlistagg (
      `user_id`   bigint NOT NULL,
      status_sort varchar(50) NOT NULL,
      PRIMARY KEY (user_id) NOT ENFORCED
    ) WITH (
      'connector'     = 'mysql',
      'hostname'      = '<your-rds-private-endpoint>',
      'port'          = '3306',
      'username'      = '<your-username>',
      'password'      = '${secret_values.mysql_pw}',
      'database-name' = 'electric',
      'table-name'    = 'electric_info_sortlistagg'
    );
    
    -- Aggregate: group by user_id, sort status values by event_time ascending
    INSERT INTO electric_info_sortlistagg
    SELECT
      `user_id`,
      `ASI_UDAF$SortListAgg`(CONCAT(status, '#', CAST(UNIX_TIMESTAMP(event_time) AS STRING)))
    FROM electric_info
    GROUP BY user_id;

    Replace the following placeholders: The password field uses a project variable (mysql_pw) to avoid hardcoding credentials. For details, see Project variables. For all available MySQL connector parameters, see MySQL connector.

  5. (Optional) In the upper-right corner, click Deep Check and Debug to validate the job before deploying. For details, see Job development map.

  6. Click Deploy, then click OK.

  7. On the Operation Center > Jobs page, find the job, click Start in the Actions column, and select Stateless Start.

Step 4: Query the result

In ApsaraDB RDS for MySQL, run the following query to verify the output:

SELECT * FROM `electric_info_sortlistagg`;

The status_sort column shows each user's terminal status values sorted in ascending order by event time.

image.png

What's next