This topic describes how to create, register, and use a user-defined aggregate function (UDAF) in Realtime Compute for Apache Flink.

Definition

A UDAF aggregates multiple values into a single value. A many-to-one mapping is established between the input and output of a UDAF. Multiple input values are aggregated to generate one output value.

Create a UDAF

Note Realtime Compute for Apache Flink provides examples of user-defined extensions (UDXs) to facilitate your business development. The examples include how to implement UDFs, user-defined aggregate functions (UDAFs), and user-defined table-valued functions (UDTFs). The development environment of the related version is configured in each example.
  1. Download and decompress ASI_UDX_Demo to your on-premises machine.
    After you decompress the package, the ASI_UDX-main folder is generated. The folder contains the following files:
    • pom.xml: a project-level configuration file that describes the Maven coordinates, dependencies, rules that developers must follow, defect management system, organizations, and licenses of a project, as well as all other project-related factors.
    • \ASI_UDX-main\src\main\java\ASI_UDAF\ASI_UDAF.java: the Java code for the sample UDAF.
  2. Open IntelliJ IDEA and choose File > Open. Select the extracted ASI_UDX-main folder and click OK.
  3. Double-click the pom.xml file in the \ASI_UDX-main\ directory and configure the parameters in the file based on your business requirements.
    In this example, the minimum dependency information that is required to develop UDFs in Flink 1.12 is configured in the pom.xml file. Perform one of the following operations based on your business requirements:
    • If your business does not have dependencies, proceed to the next step without the need to configure the parameters in the pom.xml file.
    • If your business has dependencies, add the dependency information that you need to the pom.xml file.
    The following example shows the minimum dependency of Flink 1.12.
     <dependencies>
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-table-common</artifactId>
             <version>1.12.7</version>
             <scope>provided</scope>
         </dependency>
    </dependencies>
    Note We recommend that you enter the latest minor version for the major version of Apache Flink that corresponds to the Ververica Runtime (VVR) version of your job. For more information about the version mappings between VVR and Apache Flink, see Version mappings between Ververica Runtime (VVR) and Apache Flink.
  4. Double-click the ASI_UDAF.java file in the \ASI_UDX-main\src\main\java\ASI_UDAF directory, and configure the parameters in the file based on your business requirements.
    This example shows the code for cumulative summation in the ASI_UDAF.java file.
    package ASI_UDAF;
    
    import org.apache.flink.table.functions.AggregateFunction;
    
    import java.util.Iterator;
    
    public class ASI_UDAF {
        public static class AccSum {
            public long sum;
        }
    
        public static class MySum extends AggregateFunction<Long, AccSum> {
    
            @Override
            public Long getValue(AccSum acSum) {
                return acSum.sum;
            }
    
            @Override
            public AccSum createAccumulator() {
                AccSum acCount = new AccSum();
                acCount.sum = 0;
                return acCount;
            }
    
            public void accumulate(AccSum acc, long num) {
                acc.sum += num;
            }
    
            /**
             * Support retract a msg generated by upstream operator.
             */
            public void retract(AccSum acc, long num) {
                acc.sum -= num;
            }
    
            /**
             * Support local-global two stage aggregate optimization.
             */
            public void merge(AccSum acc, Iterable<AccSum> it) {
                Iterator<AccSum> iter = it.iterator();
                while (iter.hasNext()) {
                    AccSum accSum = iter.next();
                    if (null != accSum) {
                        acc.sum += accSum.sum;
                    }
                }
            }
        }
    }
    In this example, the UDAF is used to perform a simple cumulative operation. For example, if the three values in a GROUP BY clause are 1, 2, and 3, the returned results vary based on whether miniBatch is enabled.
    • If miniBatch is not enabled, the return values are 1, 3, and 6. By default, miniBatch is not enabled.
    • If miniBatch is enabled, the return value is 6 and the number of intermediate return values cannot be determined. This is because the number of return values varies based on the setting of miniBatch and the distribution of input data.
    Note For more information about miniBatch, see Optimize Flink SQL.
  5. Go to the directory where the pom.xml file is stored. Then, run the following command to package the file:
    mvn package -Dcheckstyle.skip

    If the ASI_UDX-1.0-SNAPSHOT.jar package appears in the \ASI_UDX-main\target\ directory, the UDAF is created.

Use a UDAF

You can use one of the following methods to use a UDAF in SQL jobs:
  • Method 1: Register a UDAF and use the registered UDAF in your job.
    This method helps you reuse code for subsequent job development. For more information about how to register a UDAF, see Manage UDFs. If the registered UDAF is named ASI_UDAF$MySum, use the following sample code in your job:
    CREATE TEMPORARY TABLE ASI_UDAF_Source (
      a BIGINT NOT NULL
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE ASI_UDAF_Sink (
      sum  BIGINT
    ) WITH (
      'connector' = 'print'
    );
    
    INSERT INTO ASI_UDAF_Sink
    SELECT `ASI_UDAF$MySum`(a)
    FROM ASI_UDAF_Source;
  • Method 2: Upload the JAR package of the UDAF in Additional Dependencies on the Advanced tab of the Draft Editor page. Then, add a statement to create a temporary function to the SQL statement of the job and use the UDAF.
    After you upload the JAR package of the UDAF in Additional Dependencies, you can use the UDAF only in this job. You cannot use JAR package in other jobs. If the temporary function that you create is named mysum, the following code shows how to use the function in a job.
    CREATE TEMPORARY TABLE ASI_UDAF_Source (
      a   BIGINT
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE ASI_UDAF_Sink (
      sum  BIGINT
    ) WITH (
      'connector' = 'print'
    );
    
    CREATE TEMPORARY FUNCTION `mysum` AS 'ASI_UDAF.ASI_UDAF$MySum'; --Create a temporary function named mysum. 
    
    INSERT INTO ASI_UDAF_Sink
    SELECT `mysum`(a)
    FROM ASI_UDAF_Source;
Note After the development of the SQL job is complete, go to the Deployments page, find the SQL job, and then click Start in the Actions column. After the job is started, the sum of the data of the a field in the ASI_UDAF_Source table is inserted into the ASI_UDAF_Sink table.