Realtime ComputeFlink版的視窗彙總支援老文法分組視窗彙總(Group Window Aggregation)和新文法視窗資料表值函式彙總(Window TVF Aggregation)兩種形式。本文為您介紹視窗彙總新老文法詳情、視窗資料表值函式和彙總語句無法合并的情境、以及新老文法對更新流的支援情況。
背景資訊
分組視窗彙總(老文法):對應GroupWindowAggregation運算元,支援TUMBLE、HOP、SESSION視窗類別型。
視窗資料表值函式彙總(新文法):基於Window TVF新文法的視窗彙總,具有所有效能調優中提到的效能最佳化措施、支援標準的
GROUPING SETS文法、可以在視窗彙總結果上使用視窗Top等優勢。對應WindowAggregate運算元,支援TUMBLE、HOP、CUMULATE和SESSION視窗函數。
分組視窗彙總已淘汰,推薦您使用更高效且功能更豐富的視窗資料表值函式彙總。
它們對於更新流的支援情況,請參見新老文法對更新流的支援情況。
分組視窗彙總(老文法)
分組視窗彙總定義在SQL的GROUP BY子句中,和普通的GROUP BY子句一樣,包含分組視窗函數的GROUP BY子句的查詢會對各組分別計算,各自產生一個結果行。
分組視窗彙總的文法、範例及特性等詳情,請參見Group Window Aggregation。
視窗資料表值函式彙總(新文法)
視窗彙總是通過GROUP BY子句定義的,其特徵是包含由視窗資料表值函式產生的window_start和 window_end列。和普通的GROUP BY子句一樣,視窗彙總會為每個組計算出一行資料。
和其他連續表上的彙總不同,視窗彙總不產生中間結果,只在視窗結束產生一個總的彙總結果,另外,視窗彙總會清除不需要的中間狀態。
視窗資料表值函式彙總的文法、範例及特性等,請參見Window TVF Aggregation。
SESSION視窗資料表值函式彙總在不同VVR版本中的區別
VVR 11.x(對應Flink 1.20版本)文法
SESSION(TABLE data [PARTITION BY(keycols, ...)], DESCRIPTOR(timecol), gap)參數含義如下:
data:擁有時間屬性列的表。
keycols:列描述符,決定會話視窗應該使用哪些列來分區資料。
timecol:列描述符,決定資料的哪個時間屬性列應該映射到視窗。
gap:兩個事件被認為屬於同一個會話視窗的最大時間間隔。
(建議棄用)VVR 8.x(對應Flink 1.17版本)文法
SESSION(TABLE data, DESCRIPTOR(timecol), gap)參數含義如下:
data:擁有時間屬性列的表。
timecol:列描述符,決定資料的哪個時間屬性列應該映射到視窗。
gap:兩個事件被認為屬於同一個會話視窗的最大時間間隔。
Realtime Compute引擎VVR 8.x與VVR 11.x的SESSION視窗資料表值函式使用區別如下:
特性 | VVR 11.x | VVR 8.x(建議棄用) | 差異說明 |
文法結構 |
|
| VVR 8.x缺少 |
分區欄位指定方式 | 支援顯式 | 必須通過彙總語句的 | VVR 8.x強制要求分區欄位為彙總語句中非 |
分區欄位限制 | 無強制限制,可自由選擇分區欄位。 | 分區欄位必須與彙總語句的 | VVR 8.x通過彙總邏輯隱式綁定分區欄位,VVR 11.x文法更靈活。 |
參數完整性 | 完整參數: | 簡化參數: | VVR 8.x移除了 |
單獨使用支援性 | 支援單獨調用 | 不支援單獨使用,必須與彙總語句(如 | VVR 8.x強制要求函數與彙總語句耦合,VVR 11.x支援更靈活的使用情境。 |
視窗函數與彙總合并性 | 支援視窗函數與彙總語句合并(如 | 不支援視窗資料表值函式和彙總語句無法合并的情境(需保持彙總邏輯與視窗函數一致)。 | VVR 8.x對彙總與視窗的合并使用有嚴格限制。 |
以下樣本中SQL文法是等價的,都將使用item欄位作為SESSION視窗函數的分區欄位:
-- tables must have time attribute, e.g. `bidtime` in this table
> desc Bid;
+-------------+------------------------+------+-----+--------+---------------------------------+
| name | type | null | key | extras | watermark |
+-------------+------------------------+------+-----+--------+---------------------------------+
| bidtime | TIMESTAMP(3) *ROWTIME* | true | | | `bidtime` - INTERVAL '1' SECOND |
| price | DECIMAL(10, 2) | true | | | |
| item | STRING | true | | | |
+-------------+------------------------+------+-----+--------+---------------------------------+
-- VVR 11.x
> SELECT window_start, window_end, item, SUM(price) AS total_price
FROM TABLE(
SESSION(TABLE Bid PARTITION BY item, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES))
GROUP BY item, window_start, window_end;
-- VVR 8.x
> SELECT window_start, window_end, item, SUM(price) AS total_price
FROM TABLE(
SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES))
GROUP BY item, window_start, window_end;樣本對比說明:
情境 | VVR 11.x SQL | VVR 8.x SQL(建議棄用) | 說明 |
SESSION視窗分區 |
|
| 兩者均通過 |
彙總與視窗合并 | 支援直接合并(如 | 僅當彙總欄位與視窗分區欄位一致時支援(如 | VVR 8.x對彙總與視窗的合并使用有隱性約束。 |
視窗資料表值函式和彙總語句無法合并的情境
以下情境均以SESSION視窗為例,同樣適用於其他視窗資料表值函式。
當視窗資料表值函式和彙總語句無法合并時,如果使用Processing Time作為視窗列來劃分視窗,則會導致視窗彙總(Window Aggregation)節點使用被視窗資料表值函式(Window TVF)節點物化的Processing Time列作為視窗的時間屬性,在彙總計算時會受到來自於源表浮水印(Watermark)的幹擾,從而導致視窗提前輸出並可能出現類似事件時間視窗的延遲資料丟棄。請改寫您的SQL,盡量避免視窗資料表值函式和彙總語句無法合并的情況發生。
在視窗資料表值函式和彙總語句之間,包含對window_start、window_end和window_time欄位的過濾或計算。例如:
-- 包含對window_start的過濾 > SELECT window_start, window_end, item, SUM(price) AS total_price FROM (SELECT item, price, window_start, window_end FROM TABLE( SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES)) where window_start >= TIMESTAMP '2020-04-15 08:06:00.000') GROUP BY item, window_start, window_end; -- 包含對window_start的計算 > SELECT window_start, window_end, item, SUM(price) AS total_price FROM (SELECT item, price, window_start + (INTERVAL '1' SECOND) as window_start, window_end FROM TABLE( SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES))) GROUP BY item, window_start, window_end; -- 包含對window_start的計算 > SELECT window_start, window_end, item, SUM(price) AS total_price FROM (SELECT item, price, CAST(window_start as varchar) as window_start, window_end FROM TABLE( SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES))) GROUP BY item, window_start, window_end;視窗資料表值函式和UDTF同時使用。例如:
> SELECT window_start, window_end, category, SUM(price) AS total_price FROM (SELECT category, price, window_start, window_end FROM TABLE( SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES)), LATERAL TABLE(category_udtf(item)) as T(category)) GROUP BY category, window_start, window_end;彙總語句的GROUP KEY中未同時包含window_start和window_end。例如:
SELECT window_start, item, SUM(price) AS total_price FROM TABLE( SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES)) GROUP BY item, window_start;彙總函式使用python UDAF。
彙總函式使用GROUPING SETS、CUBE和ROLLUP文法,導致window_start和window_end不在同一組GROUP KEY中。例如:
> SELECT item, SUM(price) AS total_price FROM TABLE( SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES)) GROUP BY GROUPING SETS((item), (window_start), (window_end)); > SELECT item, SUM(price) AS total_price FROM TABLE( SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES)) GROUP BY CUBE (item, window_start, window_end); > SELECT item, SUM(price) AS total_price FROM TABLE( SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES)) GROUP BY ROLLUP (item, window_start, window_end);彙總語句中,彙總函式使用視窗列window_start、window_end和window_time進行計算。例如:
> SELECT window_start, window_end, item, SUM(price) AS total_price, max(window_end) AS max_end FROM TABLE( SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES)) GROUP BY item, window_start, window_end;
新老文法對更新流的支援情況
視窗函數 | 老文法 (GroupWindowAggregation運算元) | 新文法 (WindowAggregate運算元) | |
VVR、社區Flink | VVR | 社區Flink | |
TUMBLE | 支援 | 支援 | 不支援 |
HOP | 支援 | 支援 | 不支援 |
SESSION | 支援 | 支援 說明 VVR和社區Flink關於Session視窗區別請參見Queries語句。 | Flink 1.19支援 |
CUMULATE | N/A | 支援 說明 Realtime Compute引擎VVR 8.0.6及以上版本支援。 | 不支援 |
在對更新流的支援上,老文法視窗彙總(GroupWindowAggregation運算元)支援更新流(VVR和社區Flink保持一致),新文法(WindowAggregate運算元)社區Flink(1.16~1.18)不支援更新流,而VVR實現了新老文法的內部融合,可以自動根據輸入資料流的情況選擇支援的運算元,實現社區Flink新文法中不支援更新流的TUMBLE、HOP視窗彙總對更新流的支援。