OVER ウィンドウは、従来のデータベースで使用される標準ウィンドウです。 GROUP BY ウィンドウとは異なります。 OVER ウィンドウを適用するストリームでは、各要素は OVER ウィンドウに対応します。その要素は、現在の要素に隣接する一連の要素です。 ストリームの要素は複数のウィンドウに分散されます。 Flink SQL ウィンドウの実装では、計算をトリガーする各要素によって決定される行は、要素が配置されているウィンドウの最後の行です。

OVER ウィンドウを適用するストリームでは、各要素が OVER ウィンドウに対応し、データコンピューティングを 1 回トリガーします。 Realtime Compute の基盤となる実装では、OVER ウィンドウデータはグローバルかつ統一された方法で管理されます (データのコピーは 1 つだけ保存されます)。 論理的には、ウィンドウコンピューティングを実行するために要素ごとに OVER ウィンドウが維持されます。 計算が完了すると、期限切れのデータは消去されます。

構文

SELECT
    agg1(col1) OVER (definition1) AS colName,
    ...
    aggN(colN) OVER (definition1) AS colNameN
FROM Tab1
			
  • OVER (definition1) は agg1 から aggN で同じである必要があります。
  • AS で指定されたエイリアスは、外部 SQL 文を使用して照会できます。

データ型

Flink SQL では、OVER ウィンドウの定義は標準 SQL 構文に従います。 従来より、OVER ウィンドウは、より細かいウィンドウタイプに分類されていません。 OVER ウィンドウのセマンティクスをよりよく理解するために、計算された行を決定する方法に基づいて、OVER ウィンドウを次の 2 つのタイプに分類します。

  • ROWS OVER ウィンドウ:要素の各行は、新しい計算された行として扱われます。 つまり、各行が新しいウィンドウに対応します。
  • RANGE OVER ウィンドウ:同じタイムスタンプ値を持つすべての要素行は、同じ計算された行として扱われ、同じウィンドウに属します。

属性

直交属性 proctime eventtime
rows
range
  • rows: ウィンドウは、要素の実際の行に基づいて決定されます。
  • range: ウィンドウは、要素の実際の値 (タイムスタンプ値) に基づいて決定されます。

ROWS OVER ウィンドウのセマンティクス

  • ウィンドウデータ

    ROWS OVER ウィンドウでは、各要素がウィンドウを決定します。 ROWS OVER ウィンドウは、Unbounded と Bounded の ROWS OVER ウィンドウに分類されます。

    次の図は、Unbounded の ROWS OVER ウィンドウデータを示しています。

    前の図に示すように、ユーザー 1 のウィンドウ w7 と w8 の要素が同時に到着し、ユーザー 2 のウィンドウ w3 と w4 の要素も同時に到着します。 ただし、要素は別のウィンドウにあります。 この点で、ROWS OVER ウィンドウは RANGE OVER ウィンドウとは異なります。

    次の図は、Bounded の ROWS OVER ウィンドウのデータを示しています。ウィンドウには 3 つの要素 (PRECEDING 状態では 2 つの要素) があります。

    前の図に示すように、ユーザー 1 のウィンドウ w5 と w6 の要素が同時に到着し、ユーザー 2 のウィンドウ w2 と w3 の要素も同時に到着します。 ただし、要素は別のウィンドウにあります。 この点で、ROWS OVER ウィンドウは RANGE OVER ウィンドウとは異なります。
  • ウィンドウ構文
    SELECT 
        agg1(col1) OVER(
         [PARTITION BY (value_expression1,..., value_expressionN)] 
         ORDER BY timeCol
         ROWS 
         BETWEEN (UNBOUNDED | rowCount) PRECEDING AND CURRENT ROW) AS colName, ... 
    FROM Tab1
    					
    • value_expression: パーティション化に使用される値式を指定します。
    • timeCol: 要素のソートに使用される時間フィールドを指定します。
    • rowCount: 現在の行の前にトレースする行数を指定します。
  • 例として、Bounded ROWS OVER ウィンドウを使用します。 これは、商品の ID、タイプ、棚付け時間、および商品の価格をリストする商品棚付けテーブルです。 現在の商品が棚に並ぶ前に、3 つの類似した商品の中で最も高い価格を計算します。

    テストデータ

    itemID itemType onSellTime price
    ITEM001 Electronic 2017-11-11 10:01:00 20
    ITEM002 Electronic 2017-11-11 10:02:00 50
    ITEM003 Electronic 2017-11-11 10:03:00 30
    ITEM004 Electronic 2017-11-11 10:03:00 60
    ITEM005 Electronic 2017-11-11 10:05:00 40
    ITEM006 Electronic 2017-11-11 10:06:00 20
    ITEM007 Electronic 2017-11-11 10:07:00 70
    ITEM008 Clothes 2017-11-11 10:08:00 20

    テストコード

    CREATE TABLE tmall_item(
       itemID VARCHAR,
       itemType VARCHAR,
       onSellTime TIMESTAMP,
       price DOUBLE,
       WATERMARK onSellTime FOR onSellTime as withOffset(onSellTime, 0) 
    ) 
    WITH (
      type = 'sls',
       ...
    ) ;
    
    SELECT  
        itemID,
        itemType, 
        onSellTime, 
        price,  
        MAX(price) OVER (
            PARTITION BY itemType 
            ORDER BY onSellTime 
            ROWS BETWEEN 2 preceding AND CURRENT ROW) AS maxPrice
      FROM tmall_item
    					

    テスト結果

    itemID itemType onSellTime price maxPrice
    ITEM001 Electronic 2017-11-11 10:01:00 20 20
    ITEM002 Electronic 2017-11-11 10:02:00 50 50
    ITEM003 Electronic 2017-11-11 10:03:00 30 50
    ITEM004 Electronic 2017-11-11 10:03:00 60 60
    ITEM005 Electronic 2017-11-11 10:05:00 40 60
    ITEM006 Electronic 2017-11-11 10:06:00 20 60
    ITEM007 Electronic 2017-11-11 10:07:00 70 70
    ITEM008 Clothes 2017-11-11 10:08:00 20 20

RANGE OVER ウィンドウのセマンティクス

  • ウィンドウデータ

    RANGE OVER ウィンドウでは、同じ要素値 (要素タイムスタンプ) を持つすべての要素行がウィンドウを決定します。 RANGE OVER ウィンドウは、Unbounded と Bounded の RANGE OVER ウィンドウに分類されます。

    次の図は、Unbounded RANGE OVER ウィンドウのデータを示しています。

    前の図に示すように、ユーザー 1 の 2 つのウィンドウ w7 の要素が同時に到着し、ユーザー 2 の 2 つのウィンドウ w3 の要素も同時に到着します。 要素はそれぞれ同じウィンドウにあります。 この点で、RANGE OVER ウィンドウは ROWS OVER ウィンドウとは異なります。

    次の図は、Bounded RANGE OVER ウィンドウデータを示しています。3 秒のウィンドウの間隔は 2 秒です。

    前の図に示すように、ユーザー 1 の 2 つのウィンドウ w6 の要素が同時に到着し、ユーザー 2 の 2 つのウィンドウ w3 の要素も同時に到着します。 要素はそれぞれ同じウィンドウにあります。 この点で、RANGE OVER ウィンドウは ROWS OVER ウィンドウとは異なります。
  • ウィンドウ構文
    SELECT 
        agg1(col1) OVER(
         [PARTITION BY (value_expression1,..., value_expressionN)] 
         ORDER BY timeCol
         RANGE 
         BETWEEN (UNBOUNDED | timeInterval) PRECEDING AND CURRENT ROW) AS colName, 
    ... 
    FROM Tab1
    						
    • value_expression: パーティション化に使用される値式を指定します。
    • timeCol: 要素のソートに使用される時間フィールドを指定します。
    • timeInterval: 現在の要素行から、逆方向に追跡される最も古い要素行までの期間を指定します。
  • 例として、Bounded RANGE OVER ウィンドウを使用します。 これは、商品の ID、タイプ、棚付け時間、および商品の価格をリストする商品棚付けテーブルです。 現在の商品より 2 分早く棚に並んだ類似の商品の中で最も高い価格を計算します。

    テストデータ

    itemID itemType onSellTime price
    ITEM001 Electronic 2017-11-11 10:01:00 20
    ITEM002 Electronic 2017-11-11 10:02:00 50
    ITEM003 Electronic 2017-11-11 10:03:00 30
    ITEM004 Electronic 2017-11-11 10:03:00 60
    ITEM005 Electronic 2017-11-11 10:05:00 40
    ITEM006 Electronic 2017-11-11 10:06:00 20
    ITEM007 Electronic 2017-11-11 10:07:00 70
    ITEM008 Clothes 2017-11-11 10:08:00 20

    テストコード

    CREATE TABLE tmall_item(
       itemID VARCHAR,
       itemType VARCHAR,
       onSellTime TIMESTAMP,
       price DOUBLE,
       WATERMARK onSellTime FOR onSellTime as withOffset(onSellTime, 0) 
    ) 
    WITH (
      type = 'sls',
       ...
    ) ;
    
    SELECT  
        itemID,
        itemType, 
        onSellTime, 
        price,  
        MAX(price) OVER (
            PARTITION BY itemType 
            ORDER BY onSellTime 
            RANGE BETWEEN INTERVAL '2' MINUTE preceding AND CURRENT ROW) AS maxPrice
      FROM tmall_item
    
    						

    テスト結果

    itemID itemType onSellTime price maxPrice
    ITEM001 Electronic 2017-11-11 10:01:00 20 20
    ITEM002 Electronic 2017-11-11 10:02:00 50 50
    ITEM003 Electronic 2017-11-11 10:03:00 30 50
    ITEM004 Electronic 2017-11-11 10:03:00 60 60
    ITEM005 Electronic 2017-11-11 10:05:00 40 60
    ITEM006 Electronic 2017-11-11 10:06:00 20 40
    ITEM007 Electronic 2017-11-11 10:07:00 70 70
    ITEM008 Clothes 2017-11-11 10:08:00 20 20