OVER ウィンドウは、従来のデータベースで使用される標準ウィンドウです。 GROUP BY ウィンドウとは異なります。 OVER ウィンドウを適用するストリームでは、各要素は OVER ウィンドウに対応します。その要素は、現在の要素に隣接する一連の要素です。 ストリームの要素は複数のウィンドウに分散されます。 Flink SQL ウィンドウの実装では、計算をトリガーする各要素によって決定される行は、要素が配置されているウィンドウの最後の行です。
OVER ウィンドウを適用するストリームでは、各要素が OVER ウィンドウに対応し、データコンピューティングを 1 回トリガーします。 Realtime Compute の基盤となる実装では、OVER ウィンドウデータはグローバルかつ統一された方法で管理されます (データのコピーは 1 つだけ保存されます)。 論理的には、ウィンドウコンピューティングを実行するために要素ごとに OVER ウィンドウが維持されます。 計算が完了すると、期限切れのデータは消去されます。
構文
SELECT
agg1(col1) OVER (definition1) AS colName,
...
aggN(colN) OVER (definition1) AS colNameN
FROM Tab1
- OVER (definition1) は agg1 から aggN で同じである必要があります。
- AS で指定されたエイリアスは、外部 SQL 文を使用して照会できます。
データ型
Flink SQL では、OVER ウィンドウの定義は標準 SQL 構文に従います。 従来より、OVER ウィンドウは、より細かいウィンドウタイプに分類されていません。 OVER ウィンドウのセマンティクスをよりよく理解するために、計算された行を決定する方法に基づいて、OVER ウィンドウを次の 2 つのタイプに分類します。
- ROWS OVER ウィンドウ:要素の各行は、新しい計算された行として扱われます。 つまり、各行が新しいウィンドウに対応します。
- RANGE OVER ウィンドウ:同じタイムスタンプ値を持つすべての要素行は、同じ計算された行として扱われ、同じウィンドウに属します。
属性
直交属性 | proctime | eventtime |
---|---|---|
rows | √ | √ |
range | √ | √ |
- rows: ウィンドウは、要素の実際の行に基づいて決定されます。
- range: ウィンドウは、要素の実際の値 (タイムスタンプ値) に基づいて決定されます。
ROWS OVER ウィンドウのセマンティクス
- ウィンドウデータ
ROWS OVER ウィンドウでは、各要素がウィンドウを決定します。 ROWS OVER ウィンドウは、Unbounded と Bounded の ROWS OVER ウィンドウに分類されます。
次の図は、Unbounded の ROWS OVER ウィンドウデータを示しています。
注 前の図に示すように、ユーザー 1 のウィンドウ w7 と w8 の要素が同時に到着し、ユーザー 2 のウィンドウ w3 と w4 の要素も同時に到着します。 ただし、要素は別のウィンドウにあります。 この点で、ROWS OVER ウィンドウは RANGE OVER ウィンドウとは異なります。次の図は、Bounded の ROWS OVER ウィンドウのデータを示しています。ウィンドウには 3 つの要素 (PRECEDING 状態では 2 つの要素) があります。
注 前の図に示すように、ユーザー 1 のウィンドウ w5 と w6 の要素が同時に到着し、ユーザー 2 のウィンドウ w2 と w3 の要素も同時に到着します。 ただし、要素は別のウィンドウにあります。 この点で、ROWS OVER ウィンドウは RANGE OVER ウィンドウとは異なります。 - ウィンドウ構文
SELECT agg1(col1) OVER( [PARTITION BY (value_expression1,..., value_expressionN)] ORDER BY timeCol ROWS BETWEEN (UNBOUNDED | rowCount) PRECEDING AND CURRENT ROW) AS colName, ... FROM Tab1
- value_expression: パーティション化に使用される値式を指定します。
- timeCol: 要素のソートに使用される時間フィールドを指定します。
- rowCount: 現在の行の前にトレースする行数を指定します。
- 例
例として、Bounded ROWS OVER ウィンドウを使用します。 これは、商品の ID、タイプ、棚付け時間、および商品の価格をリストする商品棚付けテーブルです。 現在の商品が棚に並ぶ前に、3 つの類似した商品の中で最も高い価格を計算します。
テストデータ
itemID itemType onSellTime price ITEM001 Electronic 2017-11-11 10:01:00
20 ITEM002 Electronic 2017-11-11 10:02:00
50 ITEM003 Electronic 2017-11-11 10:03:00
30 ITEM004 Electronic 2017-11-11 10:03:00
60 ITEM005 Electronic 2017-11-11 10:05:00
40 ITEM006 Electronic 2017-11-11 10:06:00
20 ITEM007 Electronic 2017-11-11 10:07:00
70 ITEM008 Clothes 2017-11-11 10:08:00
20 テストコード
CREATE TABLE tmall_item( itemID VARCHAR, itemType VARCHAR, onSellTime TIMESTAMP, price DOUBLE, WATERMARK onSellTime FOR onSellTime as withOffset(onSellTime, 0) ) WITH ( type = 'sls', ... ) ; SELECT itemID, itemType, onSellTime, price, MAX(price) OVER ( PARTITION BY itemType ORDER BY onSellTime ROWS BETWEEN 2 preceding AND CURRENT ROW) AS maxPrice FROM tmall_item
テスト結果
itemID itemType onSellTime price maxPrice ITEM001 Electronic 2017-11-11 10:01:00
20 20 ITEM002 Electronic 2017-11-11 10:02:00
50 50 ITEM003 Electronic 2017-11-11 10:03:00
30 50 ITEM004 Electronic 2017-11-11 10:03:00 60 60 ITEM005 Electronic 2017-11-11 10:05:00
40 60 ITEM006 Electronic 2017-11-11 10:06:00
20 60 ITEM007 Electronic 2017-11-11 10:07:00
70 70 ITEM008 Clothes 2017-11-11 10:08:00
20 20
RANGE OVER ウィンドウのセマンティクス
- ウィンドウデータ
RANGE OVER ウィンドウでは、同じ要素値 (要素タイムスタンプ) を持つすべての要素行がウィンドウを決定します。 RANGE OVER ウィンドウは、Unbounded と Bounded の RANGE OVER ウィンドウに分類されます。
次の図は、Unbounded RANGE OVER ウィンドウのデータを示しています。前の図に示すように、ユーザー 1 の 2 つのウィンドウ w7 の要素が同時に到着し、ユーザー 2 の 2 つのウィンドウ w3 の要素も同時に到着します。 要素はそれぞれ同じウィンドウにあります。 この点で、RANGE OVER ウィンドウは ROWS OVER ウィンドウとは異なります。
次の図は、Bounded RANGE OVER ウィンドウデータを示しています。3 秒のウィンドウの間隔は 2 秒です。
注 前の図に示すように、ユーザー 1 の 2 つのウィンドウ w6 の要素が同時に到着し、ユーザー 2 の 2 つのウィンドウ w3 の要素も同時に到着します。 要素はそれぞれ同じウィンドウにあります。 この点で、RANGE OVER ウィンドウは ROWS OVER ウィンドウとは異なります。 - ウィンドウ構文
SELECT agg1(col1) OVER( [PARTITION BY (value_expression1,..., value_expressionN)] ORDER BY timeCol RANGE BETWEEN (UNBOUNDED | timeInterval) PRECEDING AND CURRENT ROW) AS colName, ... FROM Tab1
- value_expression: パーティション化に使用される値式を指定します。
- timeCol: 要素のソートに使用される時間フィールドを指定します。
- timeInterval: 現在の要素行から、逆方向に追跡される最も古い要素行までの期間を指定します。
- 例
例として、Bounded RANGE OVER ウィンドウを使用します。 これは、商品の ID、タイプ、棚付け時間、および商品の価格をリストする商品棚付けテーブルです。 現在の商品より 2 分早く棚に並んだ類似の商品の中で最も高い価格を計算します。
テストデータ
itemID itemType onSellTime price ITEM001 Electronic 2017-11-11 10:01:00
20 ITEM002 Electronic 2017-11-11 10:02:00
50 ITEM003 Electronic 2017-11-11 10:03:00
30 ITEM004 Electronic 2017-11-11 10:03:00
60 ITEM005 Electronic 2017-11-11 10:05:00
40 ITEM006 Electronic 2017-11-11 10:06:00
20 ITEM007 Electronic 2017-11-11 10:07:00
70 ITEM008 Clothes 2017-11-11 10:08:00
20 テストコード
CREATE TABLE tmall_item( itemID VARCHAR, itemType VARCHAR, onSellTime TIMESTAMP, price DOUBLE, WATERMARK onSellTime FOR onSellTime as withOffset(onSellTime, 0) ) WITH ( type = 'sls', ... ) ; SELECT itemID, itemType, onSellTime, price, MAX(price) OVER ( PARTITION BY itemType ORDER BY onSellTime RANGE BETWEEN INTERVAL '2' MINUTE preceding AND CURRENT ROW) AS maxPrice FROM tmall_item
テスト結果
itemID itemType onSellTime price maxPrice ITEM001 Electronic 2017-11-11 10:01:00
20 20 ITEM002 Electronic 2017-11-11 10:02:00
50 50 ITEM003 Electronic 2017-11-11 10:03:00
30 50 ITEM004 Electronic 2017-11-11 10:03:00
60 60 ITEM005 Electronic 2017-11-11 10:05:00
40 60 ITEM006 Electronic 2017-11-11 10:06:00
20 40 ITEM007 Electronic 2017-11-11 10:07:00
70 70 ITEM008 Clothes 2017-11-11 10:08:00
20 20