All Products
Search
Document Center

Realtime Compute for Apache Flink:UDAFs

Last Updated:Jan 23, 2024

This topic describes how to create, register, and use a user-defined aggregate function (UDAF) in Realtime Compute for Apache Flink.

Definition

A UDAF aggregates multiple values into a single value. A many-to-one mapping is established between the input and output of a UDAF. Multiple input values are aggregated to generate one output value. For more information, see User-defined Functions.

Note

User-defined Functions and ASI_UDX_Demo are provided at third-party websites. When you access the websites, the websites may fail to be accessed or access to the websites may be delayed.

Create a UDAF

Note Realtime Compute for Apache Flink provides examples of user-defined functions (UDFs) to facilitate your business development. The examples include how to implement UDSFs, user-defined aggregate functions (UDAFs), and user-defined table-valued functions (UDTFs). The development environment of the related version is configured in each example.
  1. Download and decompress ASI_UDX_Demo to your on-premises machine.

    After you decompress the package, the ASI_UDX-main folder is generated. The folder contains the following items:

    • pom.xml: a project-level configuration file that describes the Maven coordinates, dependencies, rules that developers must follow, defect management system, organizations, and licenses of a project, as well as all other project-related factors.

    • \ASI_UDX-main\src\main\java\ASI_UDAF\ASI_UDAF.java: the Java code for the sample UDAF.

  2. Open IntelliJ IDEA and choose File > Open. Select the decompressed ASI_UDX-main folder and click OK.

  3. Double-click the pom.xml file in the \ASI_UDX-main\ directory and configure information in the file based on your business requirements.

    In this example, the minimum dependency information that is required to develop UDFs in Flink 1.12 is configured in the pom.xml file. Perform one of the following operations based on your business requirements:

    • If your business does not have dependencies, proceed to the next step without the need to configure information in the pom.xml file.

    • If your business has dependencies, add the dependency information that you need to the pom.xml file.

    The following example shows the minimum dependency of Flink 1.12:

     <dependencies>
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-table-common</artifactId>
             <version>1.12.7</version>
             <scope>provided</scope>
         </dependency>
    </dependencies>
    Note

    We recommend that you enter the latest minor version for the major version of Apache Flink that corresponds to the Ververica Runtime (VVR) version of your deployment. For more information about the version mappings between VVR and Apache Flink, see Overview.

  4. Double-click the ASI_UDAF.java file in the \ASI_UDX-main\src\main\java\ASI_UDAF directory, and write code in the file based on your business requirements.

    This example shows the code for cumulative summation in the ASI_UDAF.java file.

    package ASI_UDAF;
    
    import org.apache.flink.table.functions.AggregateFunction;
    
    import java.util.Iterator;
    
    public class ASI_UDAF{
        public static class AccSum{
            public long sum;
        }
    
        public static class MySum extends AggregateFunction<Long, AccSum>{
    
            @Override
            public Long getValue(AccSum acSum){
                return acSum.sum;
            }
    
            @Override
            public AccSum createAccumulator(){
                AccSum acCount= new AccSum();
                acCount.sum=0;
                return acCount;
            }
    
            public void accumulate(AccSum acc,long num){
                acc.sum += num;
            }
    
            /**
            *Support retract a msg generated by upstream operator.
            */
            public void retract(AccSum acc,long num){
                acc.sum -= num;
            }
    
            /**
            *Support local-global two stage aggregate optimization.
            */
            public void merge(AccSum acc,Iterable<AccSum> it){
                Iterator<AccSum> iter=it.iterator();
                while(iter.hasNext()){
                    AccSum accSum=iter.next();
                    if(null!=accSum){
                        acc.sum+=accSum.sum;
                    }
                }
            }
        }
    }

    In this example, the UDAF is used to perform a simple cumulative operation. For example, if the three values in a GROUP BY clause are 1, 2, and 3, the returned results vary based on whether miniBatch is enabled.

    • If miniBatch is not enabled, the return values are 1, 3, and 6. By default, miniBatch is not enabled.

    • If miniBatch is enabled, the return value is 6 and the number of intermediate return values cannot be determined. This is because the number of return values varies based on the settings of miniBatch and the distribution of input data.

    Note

    For more information about miniBatch, see Optimize Flink SQL.

  5. Go to the directory where the pom.xml file is stored. Then, run the following command to package the file:

    mvn package -Dcheckstyle.skip

    If the ASI_UDX-1.0-SNAPSHOT.jar package appears in the \ASI_UDX-main\target\ directory, the UDAF is created.

Use a UDAF

You can use one of the following methods to use a UDAF in SQL deployments:

  • Method 1: Register a UDAF and use the registered UDAF in your deployment.

    This method helps you reuse code for subsequent deployment development. For more information about how to register a UDAF, see Manage UDFs. If the registered UDAF is named ASI_UDAF$MySum, use the following sample code in your deployment:

    CREATE TEMPORARY TABLE ASI_UDAF_Source (
      a BIGINT NOT NULL
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE ASI_UDAF_Sink (
      sum  BIGINT
    ) WITH (
      'connector' = 'print'
    );
    
    INSERT INTO ASI_UDAF_Sink
    SELECT `ASI_UDAF$MySum`(a)
    FROM ASI_UDAF_Source;
  • Method 2: Upload the JAR package of the UDAF in the Additional Dependencies field on the Advanced tab of the Draft Editor page. Then, add a statement that is used to create a temporary function to the SQL statements of your deployment and use the function.

    After you upload the JAR package of the UDAF in the Additional Dependencies field for your deployment, you can use the UDAF only in this deployment. You cannot use this JAR package in other deployments. If the temporary function that you create is named mysum, the following code shows how to use the function in a deployment:

    CREATE TEMPORARY TABLE ASI_UDAF_Source (
      a   BIGINT
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE ASI_UDAF_Sink (
      sum  BIGINT
    ) WITH (
      'connector' = 'print'
    );
    
    CREATE TEMPORARY FUNCTION `mysum` AS 'ASI_UDAF.ASI_UDAF$MySum'; --Create a temporary function named mysum. 
    
    INSERT INTO ASI_UDAF_Sink
    SELECT `mysum`(a)
    FROM ASI_UDAF_Source;
Note

After the development of the SQL deployment is complete, go to the Deployments page, find the SQL deployment, and then click Start in the Actions column. After the deployment is started, the sum of the data of the a field in the ASI_UDAF_Source table is inserted into the ASI_UDAF_Sink table.