This topic describes how to use a SESSION function in Realtime Compute for Apache Flink.
Definition
A SESSION function groups elements by session activity. Unlike tumbling and sliding windows, session windows do not overlap and are not fixed in size. If a session window does not receive elements within a specific period of time, the session is disconnected and the window is closed.
Syntax
You can use a SESSION function in a GROUP BY clause to define a session window.
SESSION(<time-attr>, <gap-interval>)Input parameters
Parameter | Description | Example |
time-attr | The parameter must be a valid time attribute field in a stream. This parameter specifies whether the time is the processing time or the event time. For more information, see Time attributes. | - |
gap-interval | The timeout period or inactivity interval for the session. If no new elements arrive within a period of time specified by |
|
Window identifier functions
A window identifier function specifies the start time, end time, or time attribute of a window. The time attribute is used to aggregate lower-level windows.
Function | Return value type | Description |
| Timestamp | Returns the start time, including the boundary value, of a window. For example, if the time span of a window is |
| Timestamp | Returns the end time, including the boundary value, of a window. For example, if the time span of a window is |
| TIMESTAMP (rowtime-attr) | Returns the end time, excluding the boundary value, of a window. For example, if the time span of a window is |
| TIMESTAMP (rowtime-attr) | Returns the end time, excluding the boundary value, of a window. For example, if the time span of a window is |
Example
The following example describes how to compute the number of clicks per user during each active session. The session timeout interval is 30 seconds.
Test data in the user_clicks table
username (VARCHAR)
click_url (VARCHAR)
eventtime (VARCHAR)
Jark
http://taobao.com/xxx2024-10-10 10:00:00.0Jark
http://taobao.com/xxx2024-10-10 10:00:10.0Jark
http://taobao.com/xxx2024-10-10 10:00:49.0Jark
http://taobao.com/xxx2024-10-10 10:01:05.0Jark
http://taobao.com/xxx2024-10-10 10:01:58.0Timo
http://taobao.com/xxx2024-10-10 10:02:10.0Test statements
CREATE TEMPORARY TABLE user_clicks( username varchar, click_url varchar, eventtime varchar, ts AS TO_TIMESTAMP(eventtime), WATERMARK FOR ts AS ts - INTERVAL '2' SECOND -- Define a watermark for the rowtime. ) WITH ( 'connector' = 'kafka', 'topic' = '<yourTopic>', 'properties.bootstrap.servers' = '<brokers>', 'scan.startup.mode' = 'earliest-offset', 'format' = 'csv' ); CREATE TEMPORARY TABLE session_output( window_start TIMESTAMP, window_end TIMESTAMP, username VARCHAR, clicks BIGINT ) WITH ( 'connector'='print', 'logger'='true' ); INSERT INTO session_output SELECT SESSION_START(ts, INTERVAL '30' SECOND), SESSION_END(ts, INTERVAL '30' SECOND), username, COUNT(click_url) FROM user_clicks GROUP BY SESSION(ts, INTERVAL '30' SECOND), username;Test results
window_start (TIMESTAMP)
window_end (TIMESTAMP)
username (VARCHAR)
clicks (BIGINT)
2024-10-10 10:00:00.02024-10-10 10:00:40.0Jark
2
2024-10-10 10:00:49.02024-10-10 10:01:35.0Jark
2
2024-10-10 10:01:58.02024-10-10 10:02:28.0Jark
1
2024-10-10 10:02:10.02024-10-10 10:02:40.0Timo
1