All Products
Search
Document Center

Realtime Compute for Apache Flink:Window aggregation

Last Updated:Mar 26, 2026

Realtime Compute for Apache Flink supports two types of window aggregation: group window aggregation and window table-valued function (TVF) aggregation. This topic covers their syntax, the scenarios where window TVF aggregation falls back to non-TVF mode, and update stream support across window types.

Choose between the two syntaxes

Group window aggregationWindow TVF aggregation
OperatorGroupWindowAggregationWindowAggregate
Window functionsTUMBLE, HOP, SESSIONTUMBLE, HOP, CUMULATE, SESSION
StatusDeprecatedRecommended
Performance optimizationsNoYes
`GROUPING SETS` supportNoYes
Window Top-N after aggregationNoYes
Update stream supportYes (VVR)Yes (VVR, all window types)

Use window TVF aggregation. It supports all window types from group window aggregation plus CUMULATE, adds performance optimizations and GROUPING SETS, and lets you apply Window Top-N on aggregation results.

Group window aggregation (deprecated)

Group window aggregation defines windows in the GROUP BY clause. It corresponds to the GroupWindowAggregation operator and supports TUMBLE, HOP, and SESSION window functions.

For syntax, examples, and feature details, see Group window aggregation.

Window TVF aggregation

Window TVF aggregation defines windows through a GROUP BY clause that includes the window_start and window_end columns produced by window TVFs. It corresponds to the WindowAggregate operator and supports TUMBLE, HOP, CUMULATE, and SESSION window functions.

Unlike aggregation on continuous tables, window TVF aggregation produces no intermediate results — only a final result at the end of each window. Intermediate state data is cleaned up automatically.

For syntax, examples, and feature details, see Window TVF aggregation.

SESSION window TVF syntax: VVR 11.x vs VVR 8.x

The SESSION window TVF syntax differs between VVR versions. Upgrade to VVR 11.1 or later to use the full-featured syntax.

VVR 11.x (Flink 1.20)

SESSION(TABLE data [PARTITION BY(keycols, ...)], DESCRIPTOR(timecol), gap)
ParameterDescription
dataA table with a time attribute column
keycols(Optional) Columns used to partition data before session windowing
timecolThe time attribute column mapped to session windows
gapThe maximum time interval between two events in the same session

VVR 8.x (Flink 1.17)

SESSION(TABLE data, DESCRIPTOR(timecol), gap)
ParameterDescription
dataA table with a time attribute column
timecolThe time attribute column mapped to session windows
gapThe maximum time interval between two events in the same session
VVR 8.x does not support PARTITION BY. Partition fields are inferred implicitly from the GROUP BY clause.

SESSION syntax comparison: VVR 11.x vs VVR 8.x

VVR 11.xVVR 8.x
SyntaxSESSION(TABLE data [PARTITION BY(keycols, ...)], DESCRIPTOR(timecol), gap)SESSION(TABLE data, DESCRIPTOR(timecol), gap)
Partition field specificationExplicit — via PARTITION BY(keycols)Implicit — via the GROUP BY clause
Partition field restrictionsNoneMust be in GROUP BY; cannot be window_start, window_end, or window_time
Standalone `SESSION()` usageSupportedMust be used with GROUP BY
Merging window function with aggregationSupportedNot supported — aggregation must match partition fields

The following examples are equivalent. Both use item as the partition field.

-- The Bid table schema (used in all examples below)
> desc Bid;
+-------------+------------------------+------+-----+--------+---------------------------------+
|        name |                   type | null | key | extras |                       watermark |
+-------------+------------------------+------+-----+--------+---------------------------------+
|     bidtime | TIMESTAMP(3) *ROWTIME* | true |     |        | `bidtime` - INTERVAL '1' SECOND |
|       price |         DECIMAL(10, 2) | true |     |        |                                 |
|        item |                 STRING | true |     |        |                                 |
+-------------+------------------------+------+-----+--------+---------------------------------+

-- VVR 11.x: partition field declared explicitly in SESSION()
> SELECT window_start, window_end, item, SUM(price) AS total_price
  FROM TABLE(
      SESSION(TABLE Bid PARTITION BY item, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES))
  GROUP BY item, window_start, window_end;

-- VVR 8.x: partition field inferred from GROUP BY
> SELECT window_start, window_end, item, SUM(price) AS total_price
  FROM TABLE(
      SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES))
  GROUP BY item, window_start, window_end;
VVR 11.xVVR 8.x
SESSION window partitioningSESSION(TABLE Bid PARTITION BY item, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES)SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES)
Aggregation and window mergingDirect merging supported (e.g., SUM(price) within the window)Aggregation fields must match window partition fields (e.g., GROUP BY item)

When window TVF aggregation falls back to non-TVF mode

When a query includes a window TVF but does not meet the conditions for the TVF and the aggregation to be merged, the system falls back to a non-TVF execution plan.

Warning

If a non-mergeable query uses processing time as the time attribute, the processing time column is materialized and used as the time attribute of the created windows. This causes the source table's watermark to affect aggregation results — windows may close earlier than expected, and late data may be discarded, the same as with event-time windows. Avoid the patterns below to prevent this.

The window TVF and the aggregation statement cannot be merged when any of the following conditions is met:

  1. Filtering or computation on window time fields. window_start, window_end, or window_time is filtered or modified before aggregation.

    -- Filtering on window_start
    > SELECT window_start, window_end, item, SUM(price) AS total_price
        FROM
        (SELECT item, price, window_start, window_end FROM
        TABLE(
        SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES))
        WHERE window_start >= TIMESTAMP '2020-04-15 08:06:00.000')
        GROUP BY item, window_start, window_end;
    
    -- Arithmetic on window_start
    > SELECT window_start, window_end, item, SUM(price) AS total_price
        FROM
        (SELECT item, price, window_start + (INTERVAL '1' SECOND) AS window_start, window_end FROM
        TABLE(
        SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES)))
        GROUP BY item, window_start, window_end;
    
    -- Type casting on window_start
    > SELECT window_start, window_end, item, SUM(price) AS total_price
        FROM
        (SELECT item, price, CAST(window_start AS varchar) AS window_start, window_end FROM
        TABLE(
        SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES)))
        GROUP BY item, window_start, window_end;
  2. A window TVF is used with a user-defined table-valued function (UDTF).

    > SELECT window_start, window_end, category, SUM(price) AS total_price
        FROM
        (SELECT category, price, window_start, window_end FROM
        TABLE(
        SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES)),
        LATERAL TABLE(category_udtf(item)) AS T(category))
        GROUP BY category, window_start, window_end;
  3. The `GROUP BY` clause is missing `window_start` or `window_end`.

    > SELECT window_start, item, SUM(price) AS total_price
      FROM TABLE(
          SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES))
      GROUP BY item, window_start;
  4. A Python user-defined aggregate function (UDAF) is used.

  5. `GROUPING SETS`, `CUBE`, or `ROLLUP` groups separately by `window_start` or `window_end`.

    > SELECT item, SUM(price) AS total_price
      FROM TABLE(
          SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES))
      GROUP BY GROUPING SETS((item), (window_start), (window_end));
    
    > SELECT item, SUM(price) AS total_price
      FROM TABLE(
          SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES))
      GROUP BY CUBE (item, window_start, window_end);
    
    > SELECT item, SUM(price) AS total_price
      FROM TABLE(
          SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES))
      GROUP BY ROLLUP (item, window_start, window_end);
  6. An aggregate function is applied to `window_start`, `window_end`, or `window_time`.

    > SELECT window_start, window_end, item, SUM(price) AS total_price, MAX(window_end) AS max_end
      FROM TABLE(
          SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES))
      GROUP BY item, window_start, window_end;

Update stream support

The table below shows update stream support by window function and syntax.

Window functionOld syntax (GroupWindowAggregation) — VVROld syntax (GroupWindowAggregation) — Apache FlinkNew syntax (WindowAggregate) — VVRNew syntax (WindowAggregate) — Apache Flink
TUMBLEYesYesYesNo
HOPYesYesYesNo
SESSIONYesYesYesYes (Apache Flink 1.19 and later)
CUMULATEN/AN/AYes (VVR 8.0.6 and later)No

In the old syntax, update stream support is identical whether you use VVR or Apache Flink. In the new syntax, only VVR's WindowAggregate operator supports update streams for all window functions. VVR automatically selects between the GroupWindowAggregation and WindowAggregate operators based on the input stream.

For differences between the SESSION window function in VVR and Apache Flink, see Queries.