All Products
Search
Document Center

Realtime Compute for Apache Flink:SESSION

Last Updated:Dec 19, 2024

This topic describes how to use a SESSION function in Realtime Compute for Apache Flink.

Definition

A SESSION function groups elements by session activity. Unlike tumbling and sliding windows, session windows do not overlap and are not fixed in size. If a session window does not receive elements within a specific period of time, the session is disconnected and the window is closed.

Syntax

You can use a SESSION function in a GROUP BY clause to define a session window.

SESSION(<time-attr>, <gap-interval>)

Input parameters

Parameter

Description

Example

time-attr

The parameter must be a valid time attribute field in a stream. This parameter specifies whether the time is the processing time or the event time. For more information, see Time attributes.

-

gap-interval

The timeout period or inactivity interval for the session. If no new elements arrive within a period of time specified by <gap-interval> after the last element of a session arrives, the session is closed. Any subsequently arrived elements are allocated to a new session. The gap-interval parameter is in the INTERVAL 'num' timeUnit format.

INTERVAL '10' SECOND indicates that the timeout period of the session is 10 seconds.

Window identifier functions

A window identifier function specifies the start time, end time, or time attribute of a window. The time attribute is used to aggregate lower-level windows.

Function

Return value type

Description

SESSION_START (<time-attr>, <gap-interval>)

Timestamp

Returns the start time, including the boundary value, of a window. For example, if the time span of a window is [00:10, 00:15], 00:10 is returned. The return value is the time of the first record in the session window.

SESSION_END (<time-attr>, <gap-interval>)

Timestamp

Returns the end time, including the boundary value, of a window. For example, if the time span of a window is [00:00, 00:15], 00:15 is returned. The return value is the time of the last record in the session window plus <gap-interval>.

SESSION_ROWTIME (<time-attr>, <gap-interval>)

TIMESTAMP (rowtime-attr)

Returns the end time, excluding the boundary value, of a window. For example, if the time span of a window is (00:00, 00:15), 00:14:59.999 is returned. The return value is a rowtime attribute based on which time operations can be performed. This function can be used only in the windows that are defined based on the event time, such as cascading windows. For more information, see Cascading windows. The function applies only to windows based on the event time.

SESSION_PROCTIME(<time-attr>, <gap-interval>)

TIMESTAMP (rowtime-attr)

Returns the end time, excluding the boundary value, of a window. For example, if the time span of a window is (00:00, 00:15), 00:14:59.999 is returned. The return value is a processing time attribute based on which time operations can be performed. This function can be used only in the windows that are defined based on the processing time, such as cascading windows. For more information, see Cascading windows. The function applies only to windows based on the processing time.

Example

The following example describes how to compute the number of clicks per user during each active session. The session timeout interval is 30 seconds.

  • Test data in the user_clicks table

    username (VARCHAR)

    click_url (VARCHAR)

    eventtime (VARCHAR)

    Jark

    http://taobao.com/xxx

    2024-10-10 10:00:00.0

    Jark

    http://taobao.com/xxx

    2024-10-10 10:00:10.0

    Jark

    http://taobao.com/xxx

    2024-10-10 10:00:49.0

    Jark

    http://taobao.com/xxx

    2024-10-10 10:01:05.0

    Jark

    http://taobao.com/xxx

    2024-10-10 10:01:58.0

    Timo

    http://taobao.com/xxx

    2024-10-10 10:02:10.0

  • Test statements

    CREATE TEMPORARY TABLE user_clicks(
      username varchar,
      click_url varchar,
      eventtime varchar,                            
      ts AS TO_TIMESTAMP(eventtime),
      WATERMARK FOR ts AS ts - INTERVAL '2' SECOND  -- Define a watermark for the rowtime. 
    ) WITH (
      'connector' = 'kafka',
      'topic' = '<yourTopic>',
      'properties.bootstrap.servers' = '<brokers>',
      'scan.startup.mode' = 'earliest-offset',
      'format' = 'csv'
    );
    
    CREATE TEMPORARY TABLE session_output(
      window_start TIMESTAMP,
      window_end TIMESTAMP,
      username VARCHAR,
      clicks BIGINT
    ) WITH (
      'connector'='print',
      'logger'='true'
    );
    
    INSERT INTO session_output
    SELECT
    SESSION_START(ts, INTERVAL '30' SECOND),
    SESSION_END(ts, INTERVAL '30' SECOND),
    username,
    COUNT(click_url)
    FROM user_clicks
    GROUP BY SESSION(ts, INTERVAL '30' SECOND), username;
  • Test results

    window_start (TIMESTAMP)

    window_end (TIMESTAMP)

    username (VARCHAR)

    clicks (BIGINT)

    2024-10-10 10:00:00.0

    2024-10-10 10:00:40.0

    Jark

    2

    2024-10-10 10:00:49.0

    2024-10-10 10:01:35.0

    Jark

    2

    2024-10-10 10:01:58.0

    2024-10-10 10:02:28.0

    Jark

    1

    2024-10-10 10:02:10.0

    2024-10-10 10:02:40.0

    Timo

    1