Realtime Compute for Apache Flink は、2 種類のウィンドウ集約をサポートしています。グループウィンドウ集約とウィンドウテーブル値関数 (TVF) 集約です。このトピックでは、さまざまな種類のウィンドウ集約の構文、集約クエリでウィンドウ TVF を使用できないユースケース、およびさまざまな種類のウィンドウ集約での更新ストリームのサポートについて説明します。
背景情報
グループウィンドウ集計(古い構文): GroupWindowAggregation 演算子に対応し、TUMBLE、HOP、および SESSION ウィンドウ関数をサポートしています。
ウィンドウ TVF 集約 (新しい構文): ウィンドウ TVF、パフォーマンスの最適化で説明されている最適化、標準
GROUPING SETS構文、および Window Top-N のウィンドウ集約結果への適用をサポートしています。このタイプのウィンドウ集約は WindowAggregate オペレーターに対応し、TUMBLE、HOP、CUMULATE、および SESSION ウィンドウ関数をサポートしています。
グループウィンドウ集約は非推奨です。より効率的で汎用性の高い ウィンドウ TVF 集約を使用することをお勧めします。
更新ストリームのサポートについては、更新ストリームのサポートの比較を参照してください。
グループウィンドウ集計(古い構文)
グループウィンドウ集約は、SQL クエリの GROUP BY 句で定義されます。通常の GROUP BY 句を使用するクエリと同様に、GROUP BY 句にウィンドウ関数を含むクエリは、グループごとに 1 つの計算結果を返します。
グループウィンドウ集約の構文、例、および機能については、「グループウィンドウ集約」をご参照ください。
ウィンドウ TVF 集計(新しい構文)
ウィンドウ TVF 集約は、ウィンドウ TVF によって生成された window_start 列と window_end 列を含む GROUP BY 句で定義されます。通常の GROUP BY 句を使用するクエリと同様に、ウィンドウ TVF 集約はグループごとに 1 つの計算結果を返します。
連続テーブルの集計とは異なり、ウィンドウ TVF 集計は中間結果を生成せず、ウィンドウの最後に最終結果のみを生成します。不要な中間状態データはクリーンアップされます。
ウィンドウ TVF 集約の構文、例、および機能については、「ウィンドウ TVF 集約」をご参照ください。
SESSION ウィンドウ TVF: VVR 11 vs VVR 8
VVR 11.x (Flink 1.20) 構文
SESSION(TABLE data [PARTITION BY(keycols, ...)], DESCRIPTOR(timecol), gap)パラメータ:
data: 時間属性列を持つテーブル。keycols: セッションウィンドウ処理の前にデータのパーティションに使用する列を指定する列記述子。timecol: セッションウィンドウにマップする時間属性列を指定する列記述子。gap: 同じセッションウィンドウに属する 2 つのイベント間の最大時間間隔。
VVR 8.x (Flink 1.17) 構文
VVR 11.1 以後にスペックアップすることをお勧めします。
SESSION(TABLE data, DESCRIPTOR(timecol), gap)パラメータ:
data: 時間属性列を持つテーブル。timecol: セッションウィンドウにマップする時間属性列を指定する列記述子。gap: 同じセッションウィンドウに属する 2 つのイベント間の最大時間間隔。
次の表は、VVR 11.x と VVR 8.x のセッションウィンドウ TVF を比較しています。
項目 | VVR 11.x | VVR 8.x | 違い |
構文 |
|
| VVR 8.x は |
パーティションフィールドの指定方法 |
| 集約文の | VVR 8.x では、パーティションフィールドは |
パーティションフィールドの制限 | なし。 | パーティションフィールドは | VVR 8.x では、パーティションフィールドは集約ロジックで暗黙的に指定されますが、VVR 11.x はより柔軟性を提供します。 |
パラメーターの完全性 | 完全なパラメーター: | 簡略化されたパラメーター: | VVR 8.x は集約ロジックに依存してパーティション情報を推測します。 |
スタンドアロン使用のサポート | 集約なしで |
| VVR 8.x はウィンドウ関数を集約文に強制的に結合しますが、VVR 11.x はより柔軟な使用シナリオをサポートしています。 |
ウィンドウ関数と集約のマージ可能性 | ウィンドウ関数を集約文 ( | ウィンドウ TVF と集約文をマージできないユースケースはサポートしていません。つまり、集約ロジックはウィンドウ関数と一致している必要があります。 | VVR 8.x は集約をウィンドウとマージすることに制限があります。 |
次の SQL の例は同等であり、どちらも item を SESSION ウィンドウ関数のパーティションフィールドとして使用しています。
-- テーブルには時間属性が必要です (このテーブルでは `bidtime` など)
> desc Bid;
+-------------+------------------------+------+-----+--------+---------------------------------+
| name | type | null | key | extras | watermark |
+-------------+------------------------+------+-----+--------+---------------------------------+
| bidtime | TIMESTAMP(3) *ROWTIME* | true | | | `bidtime` - INTERVAL '1' SECOND |
| price | DECIMAL(10, 2) | true | | | |
| item | STRING | true | | | |
+-------------+------------------------+------+-----+--------+---------------------------------+
-- VVR 11.x
> SELECT window_start, window_end, item, SUM(price) AS total_price
FROM TABLE(
SESSION(TABLE Bid PARTITION BY item, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES))
GROUP BY item, window_start, window_end;
-- VVR 8.x
> SELECT window_start, window_end, item, SUM(price) AS total_price
FROM TABLE(
SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES))
GROUP BY item, window_start, window_end;SQL コードの比較:
項目 | VVR 11.x | VVR 8.x | 説明 |
SESSION ウィンドウのパーティション分割 |
|
| VVR 8.x は |
集約とウィンドウのマージ | 直接マージをサポートしています (ウィンドウ内での | 集約フィールドはウィンドウパーティションフィールドと一致している必要があります ( | VVR 8.x は集約をウィンドウとマージすることに暗黙の制約があります。 |
集計クエリにおけるウィンドウ TVF の制限
SESSION ウィンドウ TVF を例として、集約クエリでウィンドウ TVF を使用できないユースケースについて説明します。
ウィンドウ TVF をサポートしていない集計クエリで処理時間に基づいてウィンドウを作成する場合、処理時間列は具体化され、作成されたウィンドウの時間属性として使用されます。この場合、ソーステーブルのウォーターマークが集計結果に影響を与える可能性があります。たとえば、ウィンドウの集計結果が予期よりも早く生成される場合があります。さらに、遅延データレコードは破棄される可能性があり、これはイベント時間に基づいて作成されたウィンドウの場合と同様です。この問題を防ぐために、SQL ステートメントにウィンドウ TVF を含む集計クエリが次の条件を満たしていないことを確認してください。
window_start、window_end、または window_time フィールドのフィルタリングまたは計算が実行されます。例:
-- window_start に基づくフィルタリング > SELECT window_start, window_end, item, SUM(price) AS total_price FROM (SELECT item, price, window_start, window_end FROM TABLE( SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES)) where window_start >= TIMESTAMP '2020-04-15 08:06:00.000') GROUP BY item, window_start, window_end; -- window_start の計算 > SELECT window_start, window_end, item, SUM(price) AS total_price FROM (SELECT item, price, window_start + (INTERVAL '1' SECOND) as window_start, window_end FROM TABLE( SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES))) GROUP BY item, window_start, window_end; -- window_start の計算 > SELECT window_start, window_end, item, SUM(price) AS total_price FROM (SELECT item, price, CAST(window_start as varchar) as window_start, window_end FROM TABLE( SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES))) GROUP BY item, window_start, window_end;ウィンドウ TVF がユーザー定義テーブル関数(UDTF)と一緒に使用されます。例:
> SELECT window_start, window_end, category, SUM(price) AS total_price FROM (SELECT category, price, window_start, window_end FROM TABLE( SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES)), LATERAL TABLE(category_udtf(item)) as T(category)) GROUP BY category, window_start, window_end;GROUP BY 句に window_start フィールドと window_end フィールドの両方が含まれていません。例:
SELECT window_start, item, SUM(price) AS total_price FROM TABLE( SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES)) GROUP BY item, window_start;Python ユーザー定義集計関数(UDAF)が使用されます。
GROUP BY 句で GROUPING SETS、CUBE、または ROLLUP が使用され、window_start フィールドと window_end フィールドでデータを個別にグループ化することが指定されています。例:
> SELECT item, SUM(price) AS total_price FROM TABLE( SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES)) GROUP BY GROUPING SETS((item), (window_start), (window_end)); > SELECT item, SUM(price) AS total_price FROM TABLE( SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES)) GROUP BY CUBE (item, window_start, window_end); > SELECT item, SUM(price) AS total_price FROM TABLE( SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES)) GROUP BY ROLLUP (item, window_start, window_end);集計関数が window_start、window_end、または window_time フィールドに適用されます。例:
> SELECT window_start, window_end, item, SUM(price) AS total_price, max(window_end) AS max_end FROM TABLE( SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES)) GROUP BY item, window_start, window_end;
更新ストリームのサポートの比較
ウィンドウ関数 | 古い構文 (GroupWindowAggregation) | 新しい構文 (WindowAggregate) | |
VVR と Apache Flink | VVR | Apache Flink | |
TUMBLE | はい | はい | いいえ |
HOP | はい | はい | いいえ |
SESSION | はい | はい 説明 VVR と Apache Flink の SESSION ウィンドウ関数の違いについては、「クエリ」をご参照ください。 | はい (Apache Flink 1.19 以降) |
CUMULATE | N/A | はい 説明 はい (VVR 8.0.6 以降) | いいえ |
古い構文では、VVR を使用するか Apache Flink を使用するかによらず、更新ストリームのサポートは同じです。新しい構文では、VVR によって提供される WindowAggregate 演算子のみが、すべてのウィンドウ関数に対して更新ストリームをサポートしています。これは、VVR が GroupWindowAggregation 演算子と WindowAggregate 演算子をサポートしており、入力ストリームに基づいて適切な演算子を自動的に選択できるためです。