Realtime Compute for Apache Flink supports two types of window aggregation: group window aggregation and window table-valued function (TVF) aggregation. This topic describes the syntax of different types of window aggregation, the use cases in which window TVFs cannot be used in aggregation queries, and the support for update streams in different types of window aggregation.
Background information
Group window aggregation (old syntax): corresponds to the GroupWindowAggregation operator and supports the TUMBLE, HOP, and SESSION window functions.
Window TVF aggregation (new syntax): supports Window TVFs, optimizations described in Performance optimization, the standard
GROUPING SETSsyntax, and the application of Window Top-N on window aggregation results. This type of window aggregation corresponds to the WindowAggregate operator and supports the TUMBLE, HOP, CUMULATE, and SESSION window functions.
Group window aggregation is deprecated. We recommend that you use window TVF aggregation, which is more efficient and versatile.
For information about the support for update streams, see Comparison of the support for update streams.
Group window aggregation (old syntax)
Group window aggregation is defined in the GROUP BY clause of a SQL query. Similar to queries that use regular GROUP BY clauses, queries that contain a window function in a GROUP BY clause return a single calculation result for each group.
For information about the syntax, examples, and features of group window aggregation, see Group window aggregation.
Window TVF aggregation (new syntax)
Window TVF aggregation is defined in a GROUP BY clause that contains the window_start and window_end columns generated by window TVFs. Similar to queries that use regular GROUP BY clauses, window TVF aggregation returns a single calculation result for each group.
Unlike aggregation on continuous tables, window TVF aggregation does not produce intermediate results and generates only a final result at the end of the window. Unnecessary intermediate state data is cleaned up.
For information about the syntax, examples, and features of window TVF aggregation, see Window TVF aggregation.
SESSION window TVFs: VVR 11 vs VVR 8
VVR 11.x (Flink 1.20) syntax
SESSION(TABLE data [PARTITION BY(keycols, ...)], DESCRIPTOR(timecol), gap)Parameters:
data: a table with a time attribute column.keycols: a column descriptor that specifies which columns are used to partition the data prior to session windowing.timecol: a column descriptor that specifies which time attribute column is mapped to session windows.gap: the maximum time interval between two events belonging to the same session window.
VVR 8.x (Flink 1.17) syntax
We recommend you upgrade to VVR 11.1 or later.
SESSION(TABLE data, DESCRIPTOR(timecol), gap)Parameters:
data: a table with a time attribute column.timecol: a column descriptor that specifies which time attribute column is mapped to session windows.gap: the maximum time interval between two events belonging to the same session window.
The table below compares the session window TVFs between VVR 11.x and VVR 8.x:
Item | VVR 11.x | VVR 8.x | Differences |
Syntax |
|
| VVR 8.x does not support |
How to specify partition fields | Supports explicitly specifying partition fields through | Supports implicitly specifying partition fields through the | In VVR 8.x, partition fields must be a |
Limits on partition fields | None. | Partition fields must be included in the | In VVR 8.x, partition fields are implicitly specified in aggregation logic, while VVR 11.x provides more flexibility. |
Parameter completeness | Complete parameters: | Simplified parameters: | VVR 8.x relies on aggregation logic to infer partitioning information. |
Standalone usage support | Supports calling the |
| VVR 8.x forcibly couples window functions with aggregation statements, while VVR 11.x supports more flexible usage scenarios. |
Window function and aggregation mergeability | Supports merging window functions with aggregation statements, such as | Does not support use cases where window TVFs and aggregation statements cannot be merged. Namely, aggregation logic must be consistent with window functions. | VVR 8.x has restrictions on merging aggregations with windows. |
The following SQL examples are equivalent, both using item as the partition field for the SESSION window function:
-- tables must have time attribute, such as `bidtime` in this table
> desc Bid;
+-------------+------------------------+------+-----+--------+---------------------------------+
| name | type | null | key | extras | watermark |
+-------------+------------------------+------+-----+--------+---------------------------------+
| bidtime | TIMESTAMP(3) *ROWTIME* | true | | | `bidtime` - INTERVAL '1' SECOND |
| price | DECIMAL(10, 2) | true | | | |
| item | STRING | true | | | |
+-------------+------------------------+------+-----+--------+---------------------------------+
-- VVR 11.x
> SELECT window_start, window_end, item, SUM(price) AS total_price
FROM TABLE(
SESSION(TABLE Bid PARTITION BY item, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES))
GROUP BY item, window_start, window_end;
-- VVR 8.x
> SELECT window_start, window_end, item, SUM(price) AS total_price
FROM TABLE(
SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES))
GROUP BY item, window_start, window_end;SQL code comparison:
Item | VVR 11.x | VVR 8.x | Description |
SESSION window partitioning |
|
| VVR 8.x relies on |
Aggregation and window merging | Supports direct merging (such as | Aggregation fields must match window partition fields (such as | VVR 8.x has implicit constraints on merging aggregations with windows. |
Limits of window TVFs in aggregation queries
The SESSION window TVF is used as an example to describe the use cases where window TVFs cannot be used in aggregation queries.
If you create windows based on the processing time in an aggregation query that does not support window TVFs, the processing time column is materialized and used as the time attribute of the created windows. In this case, the watermark of the source table may affect the aggregation results. For example, the aggregation result for a window may be generated earlier than expected. In addition, delayed data records may be discarded, which is similar to the case of windows created based on the event time. To prevent this issue, make sure that the aggregation queries that contain window TVFs in your SQL statements do not meet the following conditions.
Filtering or calculation of the window_start, window_end, or window_time field is performed. Example:
-- Filtering based on window_start > SELECT window_start, window_end, item, SUM(price) AS total_price FROM (SELECT item, price, window_start, window_end FROM TABLE( SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES)) where window_start >= TIMESTAMP '2020-04-15 08:06:00.000') GROUP BY item, window_start, window_end; -- Calculation of window_start > SELECT window_start, window_end, item, SUM(price) AS total_price FROM (SELECT item, price, window_start + (INTERVAL '1' SECOND) as window_start, window_end FROM TABLE( SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES))) GROUP BY item, window_start, window_end; -- Calculation of window_start > SELECT window_start, window_end, item, SUM(price) AS total_price FROM (SELECT item, price, CAST(window_start as varchar) as window_start, window_end FROM TABLE( SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES))) GROUP BY item, window_start, window_end;A window TVF is used together with a user-defined table function (UDTF). Example:
> SELECT window_start, window_end, category, SUM(price) AS total_price FROM (SELECT category, price, window_start, window_end FROM TABLE( SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES)), LATERAL TABLE(category_udtf(item)) as T(category)) GROUP BY category, window_start, window_end;The GROUP BY clause does not contain both the window_start and window_end fields. Example:
SELECT window_start, item, SUM(price) AS total_price FROM TABLE( SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES)) GROUP BY item, window_start;A Python user-defined aggregate function (UDAF) is used.
GROUPING SETS, CUBE, or ROLLUP is used in the GROUP BY clause, which specifies that data is grouped separately by the window_start field and the window_end field. Example:
> SELECT item, SUM(price) AS total_price FROM TABLE( SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES)) GROUP BY GROUPING SETS((item), (window_start), (window_end)); > SELECT item, SUM(price) AS total_price FROM TABLE( SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES)) GROUP BY CUBE (item, window_start, window_end); > SELECT item, SUM(price) AS total_price FROM TABLE( SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES)) GROUP BY ROLLUP (item, window_start, window_end);Aggregate functions are applied to the window_start, window_end, or window_time field. Example:
> SELECT window_start, window_end, item, SUM(price) AS total_price, max(window_end) AS max_end FROM TABLE( SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES)) GROUP BY item, window_start, window_end;
Comparison of the support for update streams
Window function | Old syntax (GroupWindowAggregation) | New syntax (WindowAggregate) | |
VVR and Apache Flink | VVR | Apache Flink | |
TUMBLE | Yes | Yes | No |
HOP | Yes | Yes | No |
SESSION | Yes | Yes Note For information about the differences between the SESSION window functions in VVR and Apache Flink, see Queries. | Yes (as of Apache Flink 1.19) |
CUMULATE | N/A | Yes Note Yes (as of VVR 8.0.6) | No |
In the old syntax, the support for update streams is the same whether you use VVR or Apache Flink. In the new syntax, only the WindowAggregate operator provided by VVR supports update streams for all window functions. This is because VVR supports the GroupWindowAggregation and WindowAggregate operators and can automatically select the appropriate operator based on the input stream.