OVER ウィンドウは、従来型データベースで使用される標準的なウィンドウ関数です。GROUP BY ウィンドウとは異なり、OVER ウィンドウ内の各要素は一意のウィンドウフレームに対応します。これらのフレームは、要素の行またはタイムスタンプ値に基づいて定義できます。そのため、ストリーム要素が複数のウィンドウに存在することが可能です。
OVER ウィンドウを使用するストリームでは、各要素は一意のウィンドウに対応します。各要素は計算を1回トリガーし、その計算をトリガーする行は、当該要素のウィンドウの最終行です。Realtime Compute の基盤となる実装では、OVER ウィンドウのデータは一元管理され、単一のコピーのみが保存されます。論理的には、各要素に対して OVER ウィンドウが作成されます。Realtime Compute は各 OVER ウィンドウのデータを計算し、計算が完了した後に不要となったデータを削除します。詳細については、「Over 集計」をご参照ください。
構文
SELECT
agg1(col1) OVER (definition1) AS colName,
...
aggN(colN) OVER (definition1) AS colNameN
FROM Tab1;agg1(col1):col1 列に対して計算を行う集計関数。
OVER (definition1):ウィンドウを定義する OVER 句。
AS colName:結果列のエイリアス。
agg1 から aggN までのすべての集約において、OVER 定義は同一である必要があります。
外部の SQL クエリでは、このエイリアスを使用してデータをクエリできます。
種類
Flink SQL では、OVER ウィンドウの定義は標準 SQL 構文に従います。従来の OVER ウィンドウは、より細かいタイプに分類されません。ただし、計算対象の行の定義方法に基づき、以下の 2 種類に分類できます。
ROWS OVER ウィンドウ:各行を新しい計算境界として扱います。つまり、各行が新しいウィンドウを定義します。
RANGE OVER ウィンドウ:同じ時間値を持つすべての行を単一の計算境界として扱います。つまり、同じ時間値を持つすべての行が同じウィンドウに属します。
プロパティ
直交プロパティ | 説明 | proctime | eventtime |
ROWS OVER ウィンドウ | 要素の実際の行に基づいてウィンドウを定義します。 | サポート | サポート |
RANGE OVER ウィンドウ | 要素の実際の値(タイムスタンプ)に基づいてウィンドウを定義します。 | サポート | サポート |
ROWS OVER ウィンドウのセマンティクス
ウィンドウデータ
ROWS OVER ウィンドウの各要素は、それぞれ独自のウィンドウを定義します。
ウィンドウ構文
SELECT agg1(col1) OVER( [PARTITION BY (value_expression1,..., value_expressionN)] ORDER BY timeCol ROWS BETWEEN (UNBOUNDED | rowCount) PRECEDING AND CURRENT ROW) AS colName, ... FROM Tab1;value_expression:パーティション分割に使用する式。
timeCol:要素をソートするための時間属性フィールド。
rowCount:現在の行を基準として、ウィンドウフレームに含める先行行の数。
例
たとえば、有界 ROWS OVER ウィンドウのシナリオを考えてみます。プロダクト ID、プロダクトタイプ、プロダクト掲載時刻、およびプロダクト価格を含む商品一覧テーブルがあると仮定します。目的は、同じタイプの直近 3 商品の中で最も高い価格を求めることです。
tmall_item テーブルのサンプルデータ
itemid (VARCHAR)
itemtype (VARCHAR)
eventtime (VARCHAR)
price (DOUBLE)
ITEM001
Electronic
2024-11-11 10:01:0020
ITEM002
Electronic
2024-11-11 10:02:0050
ITEM003
Electronic
2024-11-11 10:03:0030
ITEM004
Electronic
2024-11-11 10:03:0060
ITEM005
Electronic
2024-11-11 10:05:0040
ITEM006
Electronic
2024-11-11 10:06:0020
ITEM007
Electronic
2024-11-11 10:07:0070
ITEM008
Clothes
2024-11-11 10:08:0020
サンプルコード
CREATE TEMPORARY TABLE tmall_item( itemid VARCHAR, itemtype VARCHAR, eventtime varchar, onselltime AS TO_TIMESTAMP(eventtime), price DOUBLE, WATERMARK FOR onselltime AS onselltime - INTERVAL '2' SECOND -- rowtime の Watermark を定義します。 ) WITH ( 'connector' = 'kafka', 'topic' = '<yourTopic>', 'properties.bootstrap.servers' = '<brokers>', 'scan.startup.mode' = 'earliest-offset', 'format' = 'csv' ); SELECT itemid, itemtype, onselltime, price, MAX(price) OVER ( PARTITION BY itemtype ORDER BY onselltime ROWS BETWEEN 2 preceding AND CURRENT ROW) AS maxprice FROM tmall_item;結果
itemid
itemtype
onselltime
price
maxprice
ITEM001
Electronic
2024-11-11 10:01:0020
20
ITEM002
Electronic
2024-11-11 10:02:0050
50
ITEM003
Electronic
2024-11-11 10:03:0030
50
ITEM004
Electronic
2024-11-11 10:03:0060
60
ITEM005
Electronic
2024-11-11 10:05:0040
60
ITEM006
Electronic
2024-11-11 10:06:0020
60
ITEM007
Electronic
2024-11-11 10:07:0070
70
ITEM008
Clothes
2024-11-11 10:08:0020
20
RANGE OVER ウィンドウのセマンティクス
ウィンドウデータ
RANGE OVER ウィンドウでは、ソート列(例:タイムスタンプ)の値が同一であるすべての行がウィンドウフレームを定義します。
ウィンドウ構文
SELECT agg1(col1) OVER( [PARTITION BY (value_expression1,..., value_expressionN)] ORDER BY timeCol RANGE BETWEEN (UNBOUNDED | timeInterval) PRECEDING AND CURRENT ROW) AS colName, ... FROM Tab1;value_expression:パーティション分割に使用する式。
timeCol:要素をソートするための時間属性フィールド。
timeInterval:現在の行を基準として、ウィンドウフレームに含める先行行の時間間隔。
例
この例では、有界 RANGE OVER ウィンドウを示します。プロダクト ID、タイプ、掲載時刻、および価格を含む商品一覧テーブルがあると仮定します。目的は、現在の商品の掲載時刻を含め、その 2 分前に掲載された同じタイプの商品の中で最も高い価格を求めることです。
tmall_item テーブルのサンプルデータ
itemid (VARCHAR)
itemtype (VARCHAR)
eventtime (VARCHAR)
price (DOUBLE)
ITEM001
Electronic
2024-11-11 10:01:0020
ITEM002
Electronic
2024-11-11 10:02:0050
ITEM003
Electronic
2024-11-11 10:03:0030
ITEM004
Electronic
2024-11-11 10:03:0060
ITEM005
Electronic
2024-11-11 10:05:0040
ITEM006
Electronic
2024-11-11 10:06:0020
ITEM007
Electronic
2024-11-11 10:07:0070
ITEM008
Clothes
2024-11-11 10:08:0020
サンプルコード
CREATE TEMPORARY TABLE tmall_item( itemid VARCHAR, itemtype VARCHAR, eventtime varchar, onselltime AS TO_TIMESTAMP(eventtime), price DOUBLE, WATERMARK FOR onselltime AS onselltime - INTERVAL '2' SECOND -- rowtime の Watermark を定義します。 ) WITH ( 'connector' = 'kafka', 'topic' = '<yourTopic>', 'properties.bootstrap.servers' = '<brokers>', 'scan.startup.mode' = 'earliest-offset', 'format' = 'csv' ); SELECT itemid, itemtype, onselltime, price, MAX(price) OVER ( PARTITION BY itemtype ORDER BY onselltime RANGE BETWEEN INTERVAL '2' MINUTE preceding AND CURRENT ROW) AS maxprice FROM tmall_item;結果
itemid
itemtype
onselltime
price
maxprice
ITEM001
Electronic
2024-11-11 10:01:0020
20
ITEM002
Electronic
2024-11-11 10:02:0050
50
ITEM003
Electronic
2024-11-11 10:03:0030
50
ITEM004
Electronic
2024-11-11 10:03:0060
60
ITEM005
Electronic
2024-11-11 10:05:0040
60
ITEM006
Electronic
2024-11-11 10:06:0020
40
ITEM007
Electronic
2024-11-11 10:07:0070
70
ITEM008
Clothes
2024-11-11 10:08:0020
20