An OVER window is a standard windowing function used in traditional databases. Unlike a GROUP BY window, it computes an aggregate over a range of rows relative to the current row without collapsing the result set. Each row keeps its own output row, and stream elements can exist in multiple windows. Each element triggers exactly one computation, and the row that triggers the computation is the last row of that element's window.
Realtime Compute for Apache Flink manages all OVER window data centrally in a single copy. Logically, a separate window is created for each element; after the computation completes, data that is no longer needed is discarded. For more information, see Over Aggregation.
Syntax
SELECT
agg1(col1) OVER (definition1) AS colName,
...
aggN(colN) OVER (definition1) AS colNameN
FROM Tab1;
-
agg1(col1): An aggregate function applied tocol1. -
OVER (definition1): The OVER clause that defines the window. -
AS colName: An alias for the result column, which outer queries can reference.
The OVER definition must be the same for all aggregations from agg1 to aggN.
Window types
OVER windows are divided into two types based on how the calculation boundary is defined:
| Type | Calculation boundary |
|---|---|
| ROWS OVER window | Each row defines its own boundary. |
| RANGE OVER window | All rows with the same timestamp value share a boundary. |
Both types support proctime and eventtime.
ROWS OVER window
In a ROWS OVER window, each row is treated as a distinct calculation boundary, so the window frame contains a fixed count of preceding rows.
Syntax
SELECT
agg1(col1) OVER (
[PARTITION BY value_expression1, ..., value_expressionN]
ORDER BY timeCol
ROWS BETWEEN (UNBOUNDED | rowCount) PRECEDING AND CURRENT ROW
) AS colName,
...
FROM Tab1;
| Parameter | Description |
|---|---|
value_expression |
The expression to partition by. |
timeCol |
The time attribute field for sorting elements. |
rowCount |
The number of preceding rows to include in the window frame, relative to the current row. |
Example
This example finds the highest price among the current product and the two preceding products of the same type, using ROWS BETWEEN 2 PRECEDING AND CURRENT ROW.
Input table: `tmall_item`
| itemid (VARCHAR) | itemtype (VARCHAR) | eventtime (VARCHAR) | price (DOUBLE) |
|---|---|---|---|
| ITEM001 | Electronic | 2024-11-11 10:01:00 |
20 |
| ITEM002 | Electronic | 2024-11-11 10:02:00 |
50 |
| ITEM003 | Electronic | 2024-11-11 10:03:00 |
30 |
| ITEM004 | Electronic | 2024-11-11 10:03:00 |
60 |
| ITEM005 | Electronic | 2024-11-11 10:05:00 |
40 |
| ITEM006 | Electronic | 2024-11-11 10:06:00 |
20 |
| ITEM007 | Electronic | 2024-11-11 10:07:00 |
70 |
| ITEM008 | Clothes | 2024-11-11 10:08:00 |
20 |
Sample code
CREATE TEMPORARY TABLE tmall_item(
itemid VARCHAR,
itemtype VARCHAR,
eventtime VARCHAR,
onselltime AS TO_TIMESTAMP(eventtime),
price DOUBLE,
WATERMARK FOR onselltime AS onselltime - INTERVAL '2' SECOND -- Define a watermark for the rowtime.
) WITH (
'connector' = 'kafka',
'topic' = '<yourTopic>',
'properties.bootstrap.servers' = '<brokers>',
'scan.startup.mode' = 'earliest-offset',
'format' = 'csv'
);
SELECT
itemid,
itemtype,
onselltime,
price,
MAX(price) OVER (
PARTITION BY itemtype
ORDER BY onselltime
ROWS BETWEEN 2 PRECEDING AND CURRENT ROW
) AS maxprice
FROM tmall_item;
Results
| itemid | itemtype | onselltime | price | maxprice |
|---|---|---|---|---|
| ITEM001 | Electronic | 2024-11-11 10:01:00 |
20 | 20 |
| ITEM002 | Electronic | 2024-11-11 10:02:00 |
50 | 50 |
| ITEM003 | Electronic | 2024-11-11 10:03:00 |
30 | 50 |
| ITEM004 | Electronic | 2024-11-11 10:03:00 |
60 | 60 |
| ITEM005 | Electronic | 2024-11-11 10:05:00 |
40 | 60 |
| ITEM006 | Electronic | 2024-11-11 10:06:00 |
20 | 60 |
| ITEM007 | Electronic | 2024-11-11 10:07:00 |
70 | 70 |
| ITEM008 | Clothes | 2024-11-11 10:08:00 |
20 | 20 |
During the warm-up phase — when fewer rows are available than the window size requests — the window automatically shrinks. For ITEM001 (the only Electronic row so far), the window contains just that one row; for ITEM002, it contains two rows. The window reaches its full size of three rows starting from ITEM003.
RANGE OVER window
In a RANGE OVER window, all rows with the same value in the ordering column share a calculation boundary. The window frame is defined by a time interval rather than a row count, so the number of rows in each frame can vary.
Syntax
SELECT
agg1(col1) OVER (
[PARTITION BY value_expression1, ..., value_expressionN]
ORDER BY timeCol
RANGE BETWEEN (UNBOUNDED | timeInterval) PRECEDING AND CURRENT ROW
) AS colName,
...
FROM Tab1;
| Parameter | Description |
|---|---|
value_expression |
The expression to partition by. |
timeCol |
The time attribute field for sorting elements. |
timeInterval |
The time interval of preceding rows to include in the window frame, relative to the current row. |
Example
This example finds the highest price among same-type products listed within the two minutes preceding the current product's listing time, using RANGE BETWEEN INTERVAL '2' MINUTE PRECEDING AND CURRENT ROW.
The input table is the same tmall_item table as the ROWS OVER window example above.
Sample code
CREATE TEMPORARY TABLE tmall_item(
itemid VARCHAR,
itemtype VARCHAR,
eventtime VARCHAR,
onselltime AS TO_TIMESTAMP(eventtime),
price DOUBLE,
WATERMARK FOR onselltime AS onselltime - INTERVAL '2' SECOND -- Define a watermark for the rowtime.
) WITH (
'connector' = 'kafka',
'topic' = '<yourTopic>',
'properties.bootstrap.servers' = '<brokers>',
'scan.startup.mode' = 'earliest-offset',
'format' = 'csv'
);
SELECT
itemid,
itemtype,
onselltime,
price,
MAX(price) OVER (
PARTITION BY itemtype
ORDER BY onselltime
RANGE BETWEEN INTERVAL '2' MINUTE PRECEDING AND CURRENT ROW
) AS maxprice
FROM tmall_item;
Results
| itemid | itemtype | onselltime | price | maxprice |
|---|---|---|---|---|
| ITEM001 | Electronic | 2024-11-11 10:01:00 |
20 | 20 |
| ITEM002 | Electronic | 2024-11-11 10:02:00 |
50 | 50 |
| ITEM003 | Electronic | 2024-11-11 10:03:00 |
30 | 50 |
| ITEM004 | Electronic | 2024-11-11 10:03:00 |
60 | 60 |
| ITEM005 | Electronic | 2024-11-11 10:05:00 |
40 | 60 |
| ITEM006 | Electronic | 2024-11-11 10:06:00 |
20 | 40 |
| ITEM007 | Electronic | 2024-11-11 10:07:00 |
70 | 70 |
| ITEM008 | Clothes | 2024-11-11 10:08:00 |
20 | 20 |
Notice that ITEM006's maxprice is 40, not 60. ITEM004 was listed at 10:03, which is more than two minutes before ITEM006's listing time of 10:06, so ITEM004 falls outside ITEM006's window. Only ITEM005 (10:05) and ITEM006 (10:06) fall within the two-minute range.
This is the key behavioral difference from a ROWS OVER window: a RANGE window uses time distance, not row count, to determine what enters the frame.