Realtime Compute for Apache Flink supports two types of window aggregation: group window aggregation and window table-valued function (TVF) aggregation. This topic covers their syntax, the scenarios where window TVF aggregation falls back to non-TVF mode, and update stream support across window types.
Choose between the two syntaxes
| Group window aggregation | Window TVF aggregation | |
|---|---|---|
| Operator | GroupWindowAggregation | WindowAggregate |
| Window functions | TUMBLE, HOP, SESSION | TUMBLE, HOP, CUMULATE, SESSION |
| Status | Deprecated | Recommended |
| Performance optimizations | No | Yes |
| `GROUPING SETS` support | No | Yes |
| Window Top-N after aggregation | No | Yes |
| Update stream support | Yes (VVR) | Yes (VVR, all window types) |
Use window TVF aggregation. It supports all window types from group window aggregation plus CUMULATE, adds performance optimizations and GROUPING SETS, and lets you apply Window Top-N on aggregation results.
Group window aggregation (deprecated)
Group window aggregation defines windows in the GROUP BY clause. It corresponds to the GroupWindowAggregation operator and supports TUMBLE, HOP, and SESSION window functions.
For syntax, examples, and feature details, see Group window aggregation.
Window TVF aggregation
Window TVF aggregation defines windows through a GROUP BY clause that includes the window_start and window_end columns produced by window TVFs. It corresponds to the WindowAggregate operator and supports TUMBLE, HOP, CUMULATE, and SESSION window functions.
Unlike aggregation on continuous tables, window TVF aggregation produces no intermediate results — only a final result at the end of each window. Intermediate state data is cleaned up automatically.
For syntax, examples, and feature details, see Window TVF aggregation.
SESSION window TVF syntax: VVR 11.x vs VVR 8.x
The SESSION window TVF syntax differs between VVR versions. Upgrade to VVR 11.1 or later to use the full-featured syntax.
VVR 11.x (Flink 1.20)
SESSION(TABLE data [PARTITION BY(keycols, ...)], DESCRIPTOR(timecol), gap)| Parameter | Description |
|---|---|
data | A table with a time attribute column |
keycols | (Optional) Columns used to partition data before session windowing |
timecol | The time attribute column mapped to session windows |
gap | The maximum time interval between two events in the same session |
VVR 8.x (Flink 1.17)
SESSION(TABLE data, DESCRIPTOR(timecol), gap)| Parameter | Description |
|---|---|
data | A table with a time attribute column |
timecol | The time attribute column mapped to session windows |
gap | The maximum time interval between two events in the same session |
VVR 8.x does not supportPARTITION BY. Partition fields are inferred implicitly from theGROUP BYclause.
SESSION syntax comparison: VVR 11.x vs VVR 8.x
| VVR 11.x | VVR 8.x | |
|---|---|---|
| Syntax | SESSION(TABLE data [PARTITION BY(keycols, ...)], DESCRIPTOR(timecol), gap) | SESSION(TABLE data, DESCRIPTOR(timecol), gap) |
| Partition field specification | Explicit — via PARTITION BY(keycols) | Implicit — via the GROUP BY clause |
| Partition field restrictions | None | Must be in GROUP BY; cannot be window_start, window_end, or window_time |
| Standalone `SESSION()` usage | Supported | Must be used with GROUP BY |
| Merging window function with aggregation | Supported | Not supported — aggregation must match partition fields |
The following examples are equivalent. Both use item as the partition field.
-- The Bid table schema (used in all examples below)
> desc Bid;
+-------------+------------------------+------+-----+--------+---------------------------------+
| name | type | null | key | extras | watermark |
+-------------+------------------------+------+-----+--------+---------------------------------+
| bidtime | TIMESTAMP(3) *ROWTIME* | true | | | `bidtime` - INTERVAL '1' SECOND |
| price | DECIMAL(10, 2) | true | | | |
| item | STRING | true | | | |
+-------------+------------------------+------+-----+--------+---------------------------------+
-- VVR 11.x: partition field declared explicitly in SESSION()
> SELECT window_start, window_end, item, SUM(price) AS total_price
FROM TABLE(
SESSION(TABLE Bid PARTITION BY item, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES))
GROUP BY item, window_start, window_end;
-- VVR 8.x: partition field inferred from GROUP BY
> SELECT window_start, window_end, item, SUM(price) AS total_price
FROM TABLE(
SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES))
GROUP BY item, window_start, window_end;| VVR 11.x | VVR 8.x | |
|---|---|---|
| SESSION window partitioning | SESSION(TABLE Bid PARTITION BY item, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES) | SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES) |
| Aggregation and window merging | Direct merging supported (e.g., SUM(price) within the window) | Aggregation fields must match window partition fields (e.g., GROUP BY item) |
When window TVF aggregation falls back to non-TVF mode
When a query includes a window TVF but does not meet the conditions for the TVF and the aggregation to be merged, the system falls back to a non-TVF execution plan.
If a non-mergeable query uses processing time as the time attribute, the processing time column is materialized and used as the time attribute of the created windows. This causes the source table's watermark to affect aggregation results — windows may close earlier than expected, and late data may be discarded, the same as with event-time windows. Avoid the patterns below to prevent this.
The window TVF and the aggregation statement cannot be merged when any of the following conditions is met:
Filtering or computation on window time fields.
window_start,window_end, orwindow_timeis filtered or modified before aggregation.-- Filtering on window_start > SELECT window_start, window_end, item, SUM(price) AS total_price FROM (SELECT item, price, window_start, window_end FROM TABLE( SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES)) WHERE window_start >= TIMESTAMP '2020-04-15 08:06:00.000') GROUP BY item, window_start, window_end; -- Arithmetic on window_start > SELECT window_start, window_end, item, SUM(price) AS total_price FROM (SELECT item, price, window_start + (INTERVAL '1' SECOND) AS window_start, window_end FROM TABLE( SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES))) GROUP BY item, window_start, window_end; -- Type casting on window_start > SELECT window_start, window_end, item, SUM(price) AS total_price FROM (SELECT item, price, CAST(window_start AS varchar) AS window_start, window_end FROM TABLE( SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES))) GROUP BY item, window_start, window_end;A window TVF is used with a user-defined table-valued function (UDTF).
> SELECT window_start, window_end, category, SUM(price) AS total_price FROM (SELECT category, price, window_start, window_end FROM TABLE( SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES)), LATERAL TABLE(category_udtf(item)) AS T(category)) GROUP BY category, window_start, window_end;The `GROUP BY` clause is missing `window_start` or `window_end`.
> SELECT window_start, item, SUM(price) AS total_price FROM TABLE( SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES)) GROUP BY item, window_start;A Python user-defined aggregate function (UDAF) is used.
`GROUPING SETS`, `CUBE`, or `ROLLUP` groups separately by `window_start` or `window_end`.
> SELECT item, SUM(price) AS total_price FROM TABLE( SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES)) GROUP BY GROUPING SETS((item), (window_start), (window_end)); > SELECT item, SUM(price) AS total_price FROM TABLE( SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES)) GROUP BY CUBE (item, window_start, window_end); > SELECT item, SUM(price) AS total_price FROM TABLE( SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES)) GROUP BY ROLLUP (item, window_start, window_end);An aggregate function is applied to `window_start`, `window_end`, or `window_time`.
> SELECT window_start, window_end, item, SUM(price) AS total_price, MAX(window_end) AS max_end FROM TABLE( SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES)) GROUP BY item, window_start, window_end;
Update stream support
The table below shows update stream support by window function and syntax.
| Window function | Old syntax (GroupWindowAggregation) — VVR | Old syntax (GroupWindowAggregation) — Apache Flink | New syntax (WindowAggregate) — VVR | New syntax (WindowAggregate) — Apache Flink |
|---|---|---|---|---|
| TUMBLE | Yes | Yes | Yes | No |
| HOP | Yes | Yes | Yes | No |
| SESSION | Yes | Yes | Yes | Yes (Apache Flink 1.19 and later) |
| CUMULATE | N/A | N/A | Yes (VVR 8.0.6 and later) | No |
In the old syntax, update stream support is identical whether you use VVR or Apache Flink. In the new syntax, only VVR's WindowAggregate operator supports update streams for all window functions. VVR automatically selects between the GroupWindowAggregation and WindowAggregate operators based on the input stream.
For differences between the SESSION window function in VVR and Apache Flink, see Queries.