This topic describes how to use the tumbling window function in Realtime Compute for Apache Flink.

Introduction to a tumbling window

A tumbling window assigns segments of a data stream to a window with a specified size. Generally, tumbling windows are fixed in size and do not overlap each other. For example, if a 5-minute tumbling window is defined, an infinite data stream is divided based on the time period into windows such as [0:00 - 0:05), [0:05, 0:10), and [0:10, 0:15). The following figure shows a 30-second tumbling window.

Syntax

The TUMBLE function is used to define a tumbling window in a GROUP BY clause.
TUMBLE(<time-attr>, <size-interval>)
<size-interval>: INTERVAL 'string' timeUnit
Note The <time-attr> parameter must be a valid time attribute in a time stream to specify whether the time is a processing time or event time. For more information, see Overview, Time attributes, and Watermark.

Window identifier functions

A window identifier function specifies the start time, end time, or the time attribute of a window. The time attribute is used to aggregate lower-level windows.

Function Return type Description
TUMBLE_START(time-attr, size-interval) TIMESTAMP Returns the start time (including the boundary value) of a window. For example, if the window is [00:10, 00:15), 00:10 is returned.
TUMBLE_END(time-attr, size-interval) TIMESTAMP Returns the end time (including the boundary value) of a window. For example, if the window is [00:00, 00:15], 00:15 is returned.
TUMBLE_ROWTIME(time-attr, size-interval) TIMESTAMP(rowtime-attr) Returns the end time (excluding the boundary value) of a window. For example, if the window is [00:00, 00:15], 00:14:59.999 is returned. The return value is a rowtime attribute, based on which time operations can be performed. For example, a cascading window function can be used in windows based on the event time.
TUMBLE_PROCTIME(time-attr, size-interval) TIMESTAMP(rowtime-attr) Returns the end time (excluding the boundary value) of a window. For example, if the window is [00:00, 00:15], 00:14:59.999 is returned. The return value is a proctime attribute, based on which time operations can be performed. For example, a cascading window function can be used in windows based on the processing time.

The following example describes how to count the number of clicks per user on a specific website every one minute based on the event time.

  • Test data
    username (VARCHAR) click_url (VARCHAR) ts (TIMESTAMP)
    Jark http://taobao.com/xxx 2017-10-10 10:00:00.0
    Jark http://taobao.com/xxx 2017-10-10 10:00:10.0
    Jark http://taobao.com/xxx 2017-10-10 10:00:49.0
    Jark http://taobao.com/xxx 2017-10-10 10:01:05.0
    Jark http://taobao.com/xxx 2017-10-10 10:01:58.0
    Timo http://taobao.com/xxx 2017-10-10 10:02:10.0
  • Test statements
    CREATE TABLE user_clicks(
    username varchar,
    click_url varchar,
    ts timeStamp,
    WATERMARK wk FOR ts as withOffset(ts, 2000) -- Define a watermark for rowtime.
    ) with (
    type='datahub',
    ...
    );
    
    CREATE TABLE tumble_output(
    window_start TIMESTAMP,
    window_end TIMESTAMP,
    username VARCHAR,
    clicks BIGINT
    ) with (
    type='RDS'
    );
    
    INSERT INTO tumble_output
    SELECT
    TUMBLE_START(ts, INTERVAL '1' MINUTE) as window_start,
    TUMBLE_END(ts, INTERVAL '1' MINUTE) as window_end,
    username,
    COUNT(click_url)
    FROM user_clicks
    GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE), username;
  • Test results
    window_start (TIMESTAMP) window_end (TIMESTAMP) username (VARCHAR) clicks (BIGINT)
    2017-10-10 10:00:00.0 2017-10-10 10:01:00.0 Jark 3
    2017-10-10 10:01:00.0 2017-10-10 10:02:00.0 Jark 2
    2017-10-10 10:02:00.0 2017-10-10 10:03:00.0 Timo 1

The following example describes how to count the number of clicks per user on a specific website every one minute based on the processing time.

  • Test data
    username (VARCHAR) click_url (VARCHAR)
    Jark http://taobao.com/xxx
    Jark http://taobao.com/xxx
    Jark http://taobao.com/xxx
    Jark http://taobao.com/xxx
    Jark http://taobao.com/xxx
    Timo http://taobao.com/xxx
  • Test statements
    CREATE TABLE window_test (
      username              VARCHAR,
      click_url             VARCHAR,
        ts as PROCTIME()
    ) WITH (
      type = 'datahu ...c = xxx'
    );
    
    CREATE TABLE tumble_output(
    window_start TIMESTAMP,
    window_end TIMESTAMP,
    username VARCHAR,
    clicks BIGINT
    ) with (
    type='print'
    );
    
    INSERT INTO tumble_output
    SELECT
    TUMBLE_START(ts, INTERVAL '1' MINUTE),
    TUMBLE_END(ts, INTERVAL '1' MINUTE),
    username,
    COUNT(click_url)
    FROM window_test
    GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE), username;
  • Test results
    window_start (TIMESTAMP) window_end (TIMESTAMP) username (VARCHAR) clicks (BIGINT)
    2019-04-11 14:43:00.000 2019-04-11 14:44:00.000 Jark 5
    2019-04-11 14:43:00.000 2019-04-11 14:44:00.000 Timo 1
    Note Local debugging is instantaneous and the processing time may be less than 1s. Therefore, if the processing time attribute is used to aggregate data in windows, local debugging may fail.