A user-defined aggregate function (UDAF) aggregates multiple input values into a single output value, mapping many input rows to one result per group.
This topic explains how to create, register, and use a Python UDAF in Realtime Compute for Apache Flink.
Limitations
-
Apache Flink 1.12 or later is required.
-
Python is pre-installed on the Realtime Compute for Apache Flink workspace. Write your code in the pre-installed Python version.
Python 3.7.9 is pre-installed for Ververica Runtime (VVR) versions earlier than 8.0.11. Python 3.9.21 is pre-installed for VVR 8.0.11 or later. After upgrading to VVR 8.0.11 or later, re-test, re-deploy, and re-run any PyFlink jobs built on an earlier VVR version.
-
JDK 8 and JDK 11 are supported in the runtime environment. If your Python deployment depends on a third-party JAR file, make sure the JAR file is compatible with JDK 8 or JDK 11.
-
Only open source Scala 2.11 is supported. If your Python deployment depends on a third-party JAR file, make sure the JAR file is compatible with Scala 2.11.
How it works
A UDAF uses an accumulator to track intermediate aggregation state across input rows. The accumulator is created once per group and updated row by row until all inputs are processed.
The execution order for each group:
-
create_accumulator()— creates an empty accumulator to hold the initial state. -
accumulate(accumulator, ...)— called once per input row to update the accumulator. -
get_value(accumulator)— called after all rows in the group are processed to return the final result.
Methods reference
| Method | Required | When to implement |
|---|---|---|
create_accumulator() |
Yes | Always |
accumulate(...) |
Yes | Always |
get_value(...) |
Yes | Always |
retract(...) |
Conditional | When the aggregation may receive retraction messages. |
Create a UDAF
Flink provides sample code for user-defined extensions (UDXs), including UDAFs, user-defined functions (UDFs), and user-defined table functions (UDTFs). The steps below use a Windows environment.
-
Download and decompress the python_demo-master package to your machine.
-
In PyCharm, choose File > Open and open the decompressed
python_demo-masterdirectory. -
Open
\python_demo-master\udx\udfs.pyand edit it to fit your business logic. The example below definesweighted_avg, which computes a weighted average over current and historical data.from pyflink.common import Row from pyflink.table import AggregateFunction, DataTypes from pyflink.table.udf import udaf class WeightedAvg(AggregateFunction): def create_accumulator(self): # Row(sum, count) return Row(0, 0) def get_value(self, accumulator: Row) -> float: if accumulator[1] == 0: return 0 else: return accumulator[0] / accumulator[1] def accumulate(self, accumulator: Row, value, weight): accumulator[0] += value * weight accumulator[1] += weight def retract(self, accumulator: Row, value, weight): accumulator[0] -= value * weight accumulator[1] -= weight weighted_avg = udaf(f=WeightedAvg(), result_type=DataTypes.DOUBLE(), accumulator_type=DataTypes.ROW([ DataTypes.FIELD("f0", DataTypes.BIGINT()), DataTypes.FIELD("f1", DataTypes.BIGINT())])) -
From the
\python_demo-master\directory, package theudxfolder:zip -r python_demo.zip udxThe UDAF is created when
python_demo.zipappears in the\python_demo-master\directory.
Register a UDAF
For registration steps, see Manage user-defined functions (UDFs).
Use a UDAF
-
Create a Flink SQL draft. For details, see Job development overview. The following example computes the weighted average of field
ainASI_UDAF_Source, using fieldbas the weight.CREATE TEMPORARY TABLE ASI_UDAF_Source ( a BIGINT, b BIGINT ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE ASI_UDAF_Sink ( avg_value DOUBLE ) WITH ( 'connector' = 'blackhole' ); INSERT INTO ASI_UDAF_Sink SELECT weighted_avg(a, b) FROM ASI_UDAF_Source; -
In the left-side navigation pane of the Realtime Compute for Apache Flink development console, choose O&M > Deployments. Find the target deployment and click Start in the Actions column. After the deployment starts, the weighted average of field
a—with fieldbas the weight—is written into each row ofASI_UDAF_Sink.