An OVER window is a standard window used in traditional databases. Over aggregate is different from window aggregate. In streaming data that uses OVER windows, each element corresponds to an OVER window. Window elements are a set of elements adjacent to the current element. Elements of a stream are distributed across multiple windows. In the implementation of Flink SQL windows, the row determined by each element that triggers data calculation is the last row of the window where the element resides.

In streaming data that uses OVER windows, each element corresponds to an OVER window, and data calculation is triggered for each OVER window. In the underlying implementation of Realtime Compute, the OVER window data is managed in a global and unified manner. Only one copy of the data is stored. Logically, an OVER window is created for each element. Realtime Compute calculates the data for each OVER window and then deletes the data that is no longer used after the calculation is complete.

Syntax

SELECT
    agg1(col1) OVER (definition1) AS colName,
    ...
    aggN(colN) OVER (definition1) AS colNameN
FROM Tab1;
Note
  • OVER (definition1) for agg1 through aggN must be the same.
  • The alias specified by AS can be queried by using an outer SQL statement.

Types

In Flink SQL, OVER windows are defined in compliance with the standard SQL syntax. The traditional OVER windows are not classified into fine-grained window types. For a better understanding of the semantics of OVER windows, they are classified into the following two types based on the way to determine computed rows:
  • ROWS OVER window: Each row of elements is treated as a new computed row. A new window is generated for each row.
  • RANGE OVER window: All rows of elements with the same timestamp are treated as one computed row and are assigned to the same window.

Attributes

Orthogonal attribute Processing time Event time
ROWS OVER window
RANGE OVER window
  • ROWS OVER window: A window is determined based on the actual number of rows of elements.
  • RANGE OVER window: A window is determined based on the actual timestamp of an element.

ROWS OVER window

  • Description

    For a ROWS OVER window, each element is assigned to a window. ROWS OVER windows have two types: unbounded and bounded.

    The following figure shows unbounded ROWS OVER windows.
    Note As shown in the preceding figure, elements in windows w7 and w8 of user 1 arrive at the same time, and elements in windows w3 and w4 of user 2 arrive at the same time. However, the elements are assigned to different windows. ROWS OVER windows are different from RANGE OVER windows in this aspect.
    The following figure shows bounded ROWS OVER windows, in which each window has three elements (two elements preceding the current element).
    Note As shown in the preceding figure, elements in windows w5 and w6 of user 1 arrive at the same time, and elements in windows w2 and w3 of user 2 arrive at the same time. However, the elements are assigned to different windows. ROWS OVER windows are different from RANGE OVER windows in this aspect.
  • Syntax
    SELECT 
        agg1(col1) OVER(
         [PARTITION BY (value_expression1,..., value_expressionN)] 
         ORDER BY timeCol
         ROWS 
         BETWEEN (UNBOUNDED | rowCount) PRECEDING AND CURRENT ROW) AS colName, ... 
    FROM Tab1
    					
    • value_expression: specifies the value expression used for partitioning.
    • timeCol: specifies the time field used to sort elements.
    • rowCount: specifies the number of rows that precede the current row.
  • Example

    This example describes bounded ROWS OVER windows. Assume that a product on-sale table contains item IDs, item types, on-sale time, and prices. Calculate the highest price among the three products similar to the current product before the current product is for sale.

    Test data

    itemID itemType onSellTime price
    ITEM001 Electronic 2017-11-11 10:01:00 20
    ITEM002 Electronic 2017-11-11 10:02:00 50
    ITEM003 Electronic 2017-11-11 10:03:00 30
    ITEM004 Electronic 2017-11-11 10:03:00 60
    ITEM005 Electronic 2017-11-11 10:05:00 40
    ITEM006 Electronic 2017-11-11 10:06:00 20
    ITEM007 Electronic 2017-11-11 10:07:00 70
    ITEM008 Clothes 2017-11-11 10:08:00 20

    Test statements

    CREATE TABLE tmall_item(
       itemID VARCHAR,
       itemType VARCHAR,
       onSellTime TIMESTAMP,
       price DOUBLE,
       WATERMARK onSellTime FOR onSellTime as withOffset(onSellTime, 0) 
    ) 
    WITH (
      type = 'sls',
       ...
    ) ;
    
    SELECT  
        itemID,
        itemType, 
        onSellTime, 
        price,  
        MAX(price) OVER (
            PARTITION BY itemType 
            ORDER BY onSellTime 
            ROWS BETWEEN 2 preceding AND CURRENT ROW) AS maxPrice
      FROM tmall_item

    Test results

    itemID itemType onSellTime price maxPrice
    ITEM001 Electronic 2017-11-11 10:01:00 20 20
    ITEM002 Electronic 2017-11-11 10:02:00 50 50
    ITEM003 Electronic 2017-11-11 10:03:00 30 50
    ITEM004 Electronic 2017-11-11 10:03:00 60 60
    ITEM005 Electronic 2017-11-11 10:05:00 40 60
    ITEM006 Electronic 2017-11-11 10:06:00 20 60
    ITEM007 Electronic 2017-11-11 10:07:00 70 70
    ITEM008 Clothes 2017-11-11 10:08:00 20 20

RANGE OVER window

  • Description

    For a RANGE OVER window, all elements with the same timestamp are assigned to the same window. RANGE OVER windows have two types: unbounded and bounded.

    The following figure shows unbounded RANGE OVER windows.
    Note As shown in the preceding figure, elements in the two w7 windows of user 1 arrive at the same time, and elements in the two w3 windows of user 2 arrive at the same time. Elements with the same timestamp are assigned to the same window. RANGE OVER windows are different from ROWS OVER windows in this aspect.
    The following figure shows bounded RANGE OVER windows, in which a 3-second window has an interval of 2 seconds (INTERVAL '2' SECOND).
    Note As shown in the preceding figure, elements in the two w6 windows of user 1 arrive at the same time, and elements in the two w3 windows of user 2 arrive at the same time. Elements with the same timestamps are assigned to the same window. RANGE OVER windows are different from ROWS OVER windows in this aspect.
  • Syntax
    SELECT 
        agg1(col1) OVER(
         [PARTITION BY (value_expression1,..., value_expressionN)] 
         ORDER BY timeCol
         RANGE 
         BETWEEN (UNBOUNDED | timeInterval) PRECEDING AND CURRENT ROW) AS colName, 
    ... 
    FROM Tab1
    • value_expression: specifies the value expression used for partitioning.
    • timeCol: specifies the time field used to sort elements.
    • timeInterval: specifies the time interval between the time of the current row and that of the element row that you trace back to.
  • Example

    This example describes bounded RANGE OVER windows. Assume that a product on-sale table contains item IDs, item types, sale time, and prices. Calculate the highest price among similar products that are for sale two minutes earlier than the current product.

    Test data

    itemID itemType onSellTime price
    ITEM001 Electronic 2017-11-11 10:01:00 20
    ITEM002 Electronic 2017-11-11 10:02:00 50
    ITEM003 Electronic 2017-11-11 10:03:00 30
    ITEM004 Electronic 2017-11-11 10:03:00 60
    ITEM005 Electronic 2017-11-11 10:05:00 40
    ITEM006 Electronic 2017-11-11 10:06:00 20
    ITEM007 Electronic 2017-11-11 10:07:00 70
    ITEM008 Clothes 2017-11-11 10:08:00 20

    Test statements

    CREATE TABLE tmall_item(
       itemID VARCHAR,
       itemType VARCHAR,
       onSellTime TIMESTAMP,
       price DOUBLE,
       WATERMARK onSellTime FOR onSellTime as withOffset(onSellTime, 0) 
    ) 
    WITH (
      type = 'sls',
       ...
    ) ;
    
    SELECT  
        itemID,
        itemType, 
        onSellTime, 
        price,  
        MAX(price) OVER (
            PARTITION BY itemType 
            ORDER BY onSellTime 
            RANGE BETWEEN INTERVAL '2' MINUTE preceding AND CURRENT ROW) AS maxPrice
      FROM tmall_item
    
    					

    Test results

    itemID itemType onSellTime price maxPrice
    ITEM001 Electronic 2017-11-11 10:01:00 20 20
    ITEM002 Electronic 2017-11-11 10:02:00 50 50
    ITEM003 Electronic 2017-11-11 10:03:00 30 50
    ITEM004 Electronic 2017-11-11 10:03:00 60 60
    ITEM005 Electronic 2017-11-11 10:05:00 40 60
    ITEM006 Electronic 2017-11-11 10:06:00 20 40
    ITEM007 Electronic 2017-11-11 10:07:00 70 70
    ITEM008 Clothes 2017-11-11 10:08:00 20 20