All Products
Search
Document Center

Realtime Compute for Apache Flink:Python UDAFs

Last Updated:Mar 26, 2026

A user-defined aggregate function (UDAF) aggregates multiple input values into a single output value, mapping many input rows to one result per group.

This topic explains how to create, register, and use a Python UDAF in Realtime Compute for Apache Flink.

Limitations

  • Apache Flink 1.12 or later is required.

  • Python is pre-installed on the Realtime Compute for Apache Flink workspace. Write your code in the pre-installed Python version.

    Python 3.7.9 is pre-installed for Ververica Runtime (VVR) versions earlier than 8.0.11. Python 3.9.21 is pre-installed for VVR 8.0.11 or later. After upgrading to VVR 8.0.11 or later, re-test, re-deploy, and re-run any PyFlink jobs built on an earlier VVR version.
  • JDK 8 and JDK 11 are supported in the runtime environment. If your Python deployment depends on a third-party JAR file, make sure the JAR file is compatible with JDK 8 or JDK 11.

  • Only open source Scala 2.11 is supported. If your Python deployment depends on a third-party JAR file, make sure the JAR file is compatible with Scala 2.11.

How it works

A UDAF uses an accumulator to track intermediate aggregation state across input rows. The accumulator is created once per group and updated row by row until all inputs are processed.

The execution order for each group:

  1. create_accumulator() — creates an empty accumulator to hold the initial state.

  2. accumulate(accumulator, ...) — called once per input row to update the accumulator.

  3. get_value(accumulator) — called after all rows in the group are processed to return the final result.

Methods reference

Method Required When to implement
create_accumulator() Yes Always
accumulate(...) Yes Always
get_value(...) Yes Always
retract(...) Conditional When the aggregation may receive retraction messages.

Create a UDAF

Flink provides sample code for user-defined extensions (UDXs), including UDAFs, user-defined functions (UDFs), and user-defined table functions (UDTFs). The steps below use a Windows environment.
  1. Download and decompress the python_demo-master package to your machine.

  2. In PyCharm, choose File > Open and open the decompressed python_demo-master directory.

  3. Open \python_demo-master\udx\udfs.py and edit it to fit your business logic. The example below defines weighted_avg, which computes a weighted average over current and historical data.

    from pyflink.common import Row
    from pyflink.table import AggregateFunction, DataTypes
    from pyflink.table.udf import udaf
    
    
    class WeightedAvg(AggregateFunction):
    
        def create_accumulator(self):
            # Row(sum, count)
            return Row(0, 0)
    
        def get_value(self, accumulator: Row) -> float:
            if accumulator[1] == 0:
                return 0
            else:
                return accumulator[0] / accumulator[1]
    
        def accumulate(self, accumulator: Row, value, weight):
            accumulator[0] += value * weight
            accumulator[1] += weight
    
        def retract(self, accumulator: Row, value, weight):
            accumulator[0] -= value * weight
            accumulator[1] -= weight
    
    
    weighted_avg = udaf(f=WeightedAvg(),
                        result_type=DataTypes.DOUBLE(),
                        accumulator_type=DataTypes.ROW([
                            DataTypes.FIELD("f0", DataTypes.BIGINT()),
                            DataTypes.FIELD("f1", DataTypes.BIGINT())]))
  4. From the \python_demo-master\ directory, package the udx folder:

    zip -r python_demo.zip udx

    The UDAF is created when python_demo.zip appears in the \python_demo-master\ directory.

Register a UDAF

For registration steps, see Manage user-defined functions (UDFs).

Use a UDAF

  1. Create a Flink SQL draft. For details, see Job development overview. The following example computes the weighted average of field a in ASI_UDAF_Source, using field b as the weight.

    CREATE TEMPORARY TABLE ASI_UDAF_Source (
      a   BIGINT,
      b   BIGINT
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE ASI_UDAF_Sink (
      avg_value  DOUBLE
    ) WITH (
      'connector' = 'blackhole'
    );
    
    INSERT INTO ASI_UDAF_Sink
    SELECT weighted_avg(a, b)
    FROM ASI_UDAF_Source;
  2. In the left-side navigation pane of the Realtime Compute for Apache Flink development console, choose O&M > Deployments. Find the target deployment and click Start in the Actions column. After the deployment starts, the weighted average of field a—with field b as the weight—is written into each row of ASI_UDAF_Sink.