All Products
Search
Document Center

Realtime Compute for Apache Flink:Window aggregation

Last Updated:Jul 28, 2025

Realtime Compute for Apache Flink supports two types of window aggregation: group window aggregation and window table-valued function (TVF) aggregation. This topic describes the syntax of different types of window aggregation, the use cases in which window TVFs cannot be used in aggregation queries, and the support for update streams in different types of window aggregation.

Background information

  • Group window aggregation (old syntax): corresponds to the GroupWindowAggregation operator and supports the TUMBLE, HOP, and SESSION window functions.

  • Window TVF aggregation (new syntax): supports Window TVFs, optimizations described in Performance optimization, the standard GROUPING SETS syntax, and the application of Window Top-N on window aggregation results. This type of window aggregation corresponds to the WindowAggregate operator and supports the TUMBLE, HOP, CUMULATE, and SESSION window functions.

Note

Group window aggregation (old syntax)

Group window aggregation is defined in the GROUP BY clause of a SQL query. Similar to queries that use regular GROUP BY clauses, queries that contain a window function in a GROUP BY clause return a single calculation result for each group.

For information about the syntax, examples, and features of group window aggregation, see Group window aggregation.

Window TVF aggregation (new syntax)

Window TVF aggregation is defined in a GROUP BY clause that contains the window_start and window_end columns generated by window TVFs. Similar to queries that use regular GROUP BY clauses, window TVF aggregation returns a single calculation result for each group.

Unlike aggregation on continuous tables, window TVF aggregation does not produce intermediate results and generates only a final result at the end of the window. Unnecessary intermediate state data is cleaned up.

For information about the syntax, examples, and features of window TVF aggregation, see Window TVF aggregation.

SESSION window TVFs: VVR 11 vs VVR 8

VVR 11.x (Flink 1.20) syntax

SESSION(TABLE data [PARTITION BY(keycols, ...)], DESCRIPTOR(timecol), gap)

Parameters:

  • data: a table with a time attribute column.

  • keycols: a column descriptor that specifies which columns are used to partition the data prior to session windowing.

  • timecol: a column descriptor that specifies which time attribute column is mapped to session windows.

  • gap: the maximum time interval between two events belonging to the same session window.

VVR 8.x (Flink 1.17) syntax

Note

We recommend you upgrade to VVR 11.1 or later.

SESSION(TABLE data, DESCRIPTOR(timecol), gap)

Parameters:

  • data: a table with a time attribute column.

  • timecol: a column descriptor that specifies which time attribute column is mapped to session windows.

  • gap: the maximum time interval between two events belonging to the same session window.

The table below compares the session window TVFs between VVR 11.x and VVR 8.x:

Item

VVR 11.x

VVR 8.x

Differences

Syntax

SESSION(TABLE data [PARTITION BY(keycols, ...)], DESCRIPTOR(timecol), gap)

SESSION(TABLE data, DESCRIPTOR(timecol), gap)

VVR 8.x does not support PARTITION BY and requires implicit specification of partition fields in aggregation statements.

How to specify partition fields

Supports explicitly specifying partition fields through PARTITION BY(keycols).

Supports implicitly specifying partition fields through the GROUP BY clause of the aggregation statement.

In VVR 8.x, partition fields must be a GROUP BY field and not be window_start, window_end, or window_time fields.

Limits on partition fields

None.

Partition fields must be included in the GROUP BY clause and not be a window time field.

In VVR 8.x, partition fields are implicitly specified in aggregation logic, while VVR 11.x provides more flexibility.

Parameter completeness

Complete parameters: data, PARTITION BY, DESCRIPTOR(timecol), and gap.

Simplified parameters: data, DESCRIPTOR(timecol), and gap.

VVR 8.x relies on aggregation logic to infer partitioning information.

Standalone usage support

Supports calling the SESSION() function independently, without aggregation.

SESSION() must be used with GROUP BY.

VVR 8.x forcibly couples window functions with aggregation statements, while VVR 11.x supports more flexible usage scenarios.

Window function and aggregation mergeability

Supports merging window functions with aggregation statements, such as SUM(price).

Does not support use cases where window TVFs and aggregation statements cannot be merged. Namely, aggregation logic must be consistent with window functions.

VVR 8.x has restrictions on merging aggregations with windows.

The following SQL examples are equivalent, both using item as the partition field for the SESSION window function:

-- tables must have time attribute, such as `bidtime` in this table
> desc Bid;
+-------------+------------------------+------+-----+--------+---------------------------------+
|        name |                   type | null | key | extras |                       watermark |
+-------------+------------------------+------+-----+--------+---------------------------------+
|     bidtime | TIMESTAMP(3) *ROWTIME* | true |     |        | `bidtime` - INTERVAL '1' SECOND |
|       price |         DECIMAL(10, 2) | true |     |        |                                 |
|        item |                 STRING | true |     |        |                                 |
+-------------+------------------------+------+-----+--------+---------------------------------+

-- VVR 11.x
> SELECT window_start, window_end, item, SUM(price) AS total_price
  FROM TABLE(
      SESSION(TABLE Bid PARTITION BY item, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES))
  GROUP BY item, window_start, window_end;
  
-- VVR 8.x
> SELECT window_start, window_end, item, SUM(price) AS total_price
  FROM TABLE(
      SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES))
  GROUP BY item, window_start, window_end;

SQL code comparison:

Item

VVR 11.x

VVR 8.x

Description

SESSION window partitioning

SESSION(TABLE Bid PARTITION BY item, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES)

SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES)

VVR 8.x relies on GROUP BY item to implicitly specify the partition field item .

Aggregation and window merging

Supports direct merging (such as SUM(price) calculation within the window).

Aggregation fields must match window partition fields (such as GROUP BY item).

VVR 8.x has implicit constraints on merging aggregations with windows.

Limits of window TVFs in aggregation queries

The SESSION window TVF is used as an example to describe the use cases where window TVFs cannot be used in aggregation queries.

Warning

If you create windows based on the processing time in an aggregation query that does not support window TVFs, the processing time column is materialized and used as the time attribute of the created windows. In this case, the watermark of the source table may affect the aggregation results. For example, the aggregation result for a window may be generated earlier than expected. In addition, delayed data records may be discarded, which is similar to the case of windows created based on the event time. To prevent this issue, make sure that the aggregation queries that contain window TVFs in your SQL statements do not meet the following conditions.

  • Filtering or calculation of the window_start, window_end, or window_time field is performed. Example:

    -- Filtering based on window_start
    > SELECT window_start, window_end, item, SUM(price) AS total_price
        FROM
        (SELECT item, price, window_start, window_end FROM
        TABLE(
        SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES))
        where window_start >= TIMESTAMP '2020-04-15 08:06:00.000')
        GROUP BY item, window_start, window_end;
      
    -- Calculation of window_start
    > SELECT window_start, window_end, item, SUM(price) AS total_price
        FROM
        (SELECT item, price, window_start + (INTERVAL '1' SECOND) as window_start, window_end FROM
        TABLE(
        SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES)))
        GROUP BY item, window_start, window_end;
    
    -- Calculation of window_start
    > SELECT window_start, window_end, item, SUM(price) AS total_price
        FROM
        (SELECT item, price, CAST(window_start as varchar) as window_start, window_end FROM
        TABLE(
        SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES)))
        GROUP BY item, window_start, window_end;
    
  • A window TVF is used together with a user-defined table function (UDTF). Example:

    > SELECT window_start, window_end, category, SUM(price) AS total_price
        FROM
        (SELECT category, price, window_start, window_end FROM
        TABLE(
        SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES)),
        LATERAL TABLE(category_udtf(item)) as T(category))
        GROUP BY category, window_start, window_end;
  • The GROUP BY clause does not contain both the window_start and window_end fields. Example:

     SELECT window_start, item, SUM(price) AS total_price
      FROM TABLE(
          SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES))
      GROUP BY item, window_start;
  • A Python user-defined aggregate function (UDAF) is used.

  • GROUPING SETS, CUBE, or ROLLUP is used in the GROUP BY clause, which specifies that data is grouped separately by the window_start field and the window_end field. Example:

    > SELECT item, SUM(price) AS total_price
      FROM TABLE(
          SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES))
      GROUP BY GROUPING SETS((item), (window_start), (window_end));
      
    > SELECT item, SUM(price) AS total_price
      FROM TABLE(
          SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES))
      GROUP BY CUBE (item, window_start, window_end);
      
    > SELECT item, SUM(price) AS total_price
      FROM TABLE(
          SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES))
      GROUP BY ROLLUP (item, window_start, window_end);
  • Aggregate functions are applied to the window_start, window_end, or window_time field. Example:

    > SELECT window_start, window_end, item, SUM(price) AS total_price, max(window_end) AS max_end
      FROM TABLE(
          SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES))
      GROUP BY item, window_start, window_end;
      

Comparison of the support for update streams

Window function

Old syntax

(GroupWindowAggregation)

New syntax

(WindowAggregate)

VVR and Apache Flink

VVR

Apache Flink

TUMBLE

Yes

Yes

No

HOP

Yes

Yes

No

SESSION

Yes

Yes

Note

For information about the differences between the SESSION window functions in VVR and Apache Flink, see Queries.

Yes (as of Apache Flink 1.19)

CUMULATE

N/A

Yes

Note

Yes (as of VVR 8.0.6)

No

In the old syntax, the support for update streams is the same whether you use VVR or Apache Flink. In the new syntax, only the WindowAggregate operator provided by VVR supports update streams for all window functions. This is because VVR supports the GroupWindowAggregation and WindowAggregate operators and can automatically select the appropriate operator based on the input stream.