Use the HOP function in the GROUP BY clause to define a hopping window (also called a sliding window) in Flink SQL. Each event can belong to multiple overlapping windows simultaneously, making HOP well suited for computing moving aggregates — for example, counting clicks per user over the last minute, recalculated every 30 seconds.
Syntax
HOP(<time-attr>, <slide-interval>, <size-interval>)
Parameters
| Parameter | Description | Example |
|---|---|---|
<time-attr> |
A time attribute column in the stream. Specifies whether to use event time or processing time. For more information, see Time attributes. | — |
<slide-interval> |
How often the window advances. Defines the time difference between the start of consecutive windows. Format: INTERVAL 'num' timeUnit. |
INTERVAL '10' SECOND — the window advances every 10 seconds |
<size-interval> |
The total duration covered by each window. Format: INTERVAL 'num' timeUnit. |
INTERVAL '5' MINUTE — each window spans 5 minutes |
Window overlap behavior
The relationship between <slide-interval> and <size-interval> controls how windows overlap:
| Condition | Behavior |
|---|---|
slide-interval < size-interval |
Windows overlap. Each event is assigned to multiple windows. This is the standard hopping window pattern. |
slide-interval = size-interval |
Windows are back-to-back with no gaps — equivalent to tumbling windows. |
slide-interval > size-interval |
Windows do not overlap and are separated by gaps. |
Window identifier functions
Use these functions in SELECT to retrieve the start time, end time, or time attribute of a window.
| Function | Return type | Description |
|---|---|---|
HOP_START(<time-attr>, <slide-interval>, <size-interval>) |
TIMESTAMP | Start time of the window, inclusive. For example, returns 00:10 for the window [00:10, 00:15]. |
HOP_END(<time-attr>, <slide-interval>, <size-interval>) |
TIMESTAMP | End time of the window, inclusive. For example, returns 00:15 for the window [00:00, 00:15]. |
HOP_ROWTIME(<time-attr>, <slide-interval>, <size-interval>) |
TIMESTAMP (rowtime-attr) | End time of the window, exclusive. For example, returns 00:14:59.999 for the window (00:00, 00:15). Returns a rowtime attribute that you can use in downstream window operations. Use only in event-time windows. For more information, see Cascading windows. |
HOP_PROCTIME(<time-attr>, <slide-interval>, <size-interval>) |
TIMESTAMP (rowtime-attr) | End time of the window, exclusive. For example, returns 00:14:59.999 for the window (00:00, 00:15). Returns a processing-time attribute for use in downstream window operations. Use only in processing-time windows. For more information, see Cascading windows. |
Example
This example counts clicks per user over a 1-minute sliding window that advances every 30 seconds. The query uses HOP_START and HOP_END to retrieve the window boundaries.
Test data (user_clicks)
| username (VARCHAR) | click_url (VARCHAR) | eventtime (VARCHAR) |
|---|---|---|
| Jark | http://taobao.com/xxx |
2024-10-10 10:00:00.0 |
| Jark | http://taobao.com/xxx |
2024-10-10 10:00:10.0 |
| Jark | http://taobao.com/xxx |
2024-10-10 10:00:49.0 |
| Jark | http://taobao.com/xxx |
2024-10-10 10:01:05.0 |
| Jark | http://taobao.com/xxx |
2024-10-10 10:01:58.0 |
| Timo | http://taobao.com/xxx |
2024-10-10 10:02:10.0 |
SQL
CREATE TEMPORARY TABLE user_clicks (
username VARCHAR,
click_url VARCHAR,
eventtime VARCHAR,
ts AS TO_TIMESTAMP(eventtime),
WATERMARK FOR ts AS ts - INTERVAL '2' SECOND -- Define a watermark for the rowtime.
) WITH (
'connector' = 'kafka',
'topic' = '<yourTopic>',
'properties.bootstrap.servers' = '<brokers>',
'scan.startup.mode' = 'earliest-offset',
'format' = 'csv'
);
CREATE TEMPORARY TABLE hop_output (
window_start TIMESTAMP,
window_end TIMESTAMP,
username VARCHAR,
clicks BIGINT
) WITH (
'connector' = 'print',
'logger' = 'true'
);
INSERT INTO hop_output
SELECT
HOP_START(ts, INTERVAL '30' SECOND, INTERVAL '1' MINUTE),
HOP_END(ts, INTERVAL '30' SECOND, INTERVAL '1' MINUTE),
username,
COUNT(click_url)
FROM user_clicks
GROUP BY HOP(ts, INTERVAL '30' SECOND, INTERVAL '1' MINUTE), username;
Results
| window_start (TIMESTAMP) | window_end (TIMESTAMP) | username (VARCHAR) | clicks (BIGINT) |
|---|---|---|---|
2024-10-10 09:59:30.0 ① |
2024-10-10 10:00:30.0 |
Jark | 2 |
2024-10-10 10:00:00.0 |
2024-10-10 10:01:00.0 |
Jark | 3 |
2024-10-10 10:00:30.0 |
2024-10-10 10:01:30.0 |
Jark | 2 |
2024-10-10 10:01:00.0 |
2024-10-10 10:02:00.0 |
Jark | 2 |
2024-10-10 10:01:30.0 |
2024-10-10 10:02:30.0 ② |
Jark | 1 |
2024-10-10 10:01:30.0 |
2024-10-10 10:02:30.0 ② |
Timo | 1 |
① First window start time
When a hopping window cannot determine the exact time the first event entered the stream, it shifts the first window's start time backward by: window duration - sliding step.
For example:
| Window duration (seconds) | Sliding step (seconds) | Event time | First window start | First window end |
|---|---|---|---|---|
| 120 | 30 | 2024-07-31 10:00:00.0 |
2024-07-31 09:58:30.0 |
2024-07-31 10:00:30.0 |
| 60 | 10 | 2024-07-31 10:00:00.0 |
2024-07-31 09:59:10.0 |
2024-07-31 10:00:10.0 |
② Window not yet triggered
The row with window_end = 2024-10-10 10:02:30.0 does not appear in the results because the window has not been triggered yet. A window triggers when:
event time >= window_end + watermark offset
For example: 10:02:30.0 + 2 seconds = 10:02:32.0. An event at or after 10:02:32.0 from any user would trigger this window.