本文將為您介紹Realtime ComputeFlink版通過BITMAP位元影像類型進行精確去重的使用方法,並結合多個SQL樣本介紹典型的應用情境。
背景資訊
在 Flink Realtime Compute中,傳統的去重方案存在以下局限:
資源開銷高昂:
COUNT DISTINCT需在狀態(State)中維護全量的去重鍵。海量資料會導致狀態體積膨脹,消耗大量記憶體與 CPU 資源。多維擴充困難: 預彙總方案缺乏靈活性。支援 N 個維度任意組合查詢需要窮舉 2N 個組,導致作業邏輯臃腫。
準確性妥協:
APPROX_COUNT_DISTINCT等近似演算法存在統計誤差,無法滿足對資料準確性要求嚴苛的業務情境。
BITMAP 解決方案
為了兼顧準確性與效能,Realtime Compute Flink 版引入了 BITMAP(位元影像) 資料類型。
技術原理: 該方案通常基於 RoaringBitmap 演算法,將整型詳細資料壓縮為位元影像對象。計算邏輯從昂貴的集合維護轉化為高效的位元影像邏輯運算(如並集、交集、差集)。
核心優勢: BITMAP 極大降低了狀態儲存壓力。同時,它允許業務方在不損失 100% 準確性的前提下,對任意維度進行靈活的上卷(Roll-up)、下鑽(Drill-down)及交叉分析。
使用限制
僅Realtime ComputeFlink版VVR 11.5及以上版本支援BITMAP位元影像類型及相關函數。
使用方法
BITMAP類型是基於RoaringBitmap標準的 32 位不帶正負號的整數集合,支援常數級的基數計算以及高效的集合代數運算。其主要有以下兩種使用方法:
即時精確去重
直接處理原始詳細資料,提供與 COUNT DISTINCT 語義完全等效的去重能力,但在大規模資料下擁有更優的記憶體效率與計算效能。(參見樣本 1)
多維分析預彙總
按業務維度產生分區位元影像(Slicing)並持久化儲存。通過集合運算函數,業務方可在不訪問詳細資料的情況下,靈活實現任意維度上卷(Roll-up)與下鑽(Drill-down)分析。(參見樣本 2、3、4)
BITMAP目前支援的內建函數及使用建議請參考BITMAP 位元影像函數。
使用樣本
為了展示 BITMAP 如何在不同業務階段解決效能與靈活性問題,我們將通過以下四個循序漸進的樣本進行說明:
基礎情境(樣本 1): 關注即時結果輸出。展示如何用 BITMAP 直接替換
COUNT DISTINCT,以更低的資源消耗擷取等效的精確去重結果。進階情境: 關注數倉分層與靈活分析。這三個樣本構成了一套完整的“預彙總—後計算”工作流程:
儲存(樣本 2): 將詳細資料轉化為位元影像切片並持久化,構建可複用的中介層。
分析(樣本 3): 基於中介層資料,通過位元影像運算實現任意維度上卷與交叉分析,無需重跑歷史資料。
融合(樣本 4): 結合即時資料流與離線歷史位元影像,低成本實現同環比、留存率等流批融合指標計算。
1. 即時精確去重
根據即時日誌,通過BITMAP_BUILD_CARDINALITY_AGG函數統計分鐘級UV。
DDL
-- 資料來源表
CREATE TEMPORARY TABLE user_events
(
user_id INT
,tag STRING -- 事件類別目錄
,event_time TIMESTAMP(3)
,WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
)
WITH ('connector' = 'kafka' ...)
;
-- 分鐘級 UV 統計結果表
CREATE TEMPORARY TABLE minute_uv
(
ts_date STRING
,ts_hour STRING
,ts_minute STRING
,tag STRING -- 事件類別目錄
,minute_uv BIGINT -- UV計數
)
WITH ('connector' = 'jdbc' ...)
;DML
核心邏輯:利用
BITMAP_BUILD_CARDINALITY_AGG函數,在視窗計算結束時同時執行“去重 + 計數”。適用情境:即時大屏、簡單的分鐘級監控指標。
注意事項:輸出結果為標量數值,不具備可加性。受限於集合去重原理,無法直接累加不同時間視窗的統計值來擷取更長時間跨度的總 UV(例如:不能簡單將兩個“1分鐘 UV”相加得出“2分鐘去重 UV”)。
INSERT INTO minute_uv
SELECT
DATE_FORMAT(window_start, 'yyyy-MM-dd') AS ts_date,
DATE_FORMAT(window_start, 'HH') AS ts_hour,
DATE_FORMAT(window_start, 'mm') AS ts_minute,
tag,
-- 核心函數:構建位元影像並直接返回基數(去重後的 count 值)
-- 此處邏輯等效於 COUNT(DISTINCT user_id),但效能更優
BITMAP_BUILD_CARDINALITY_AGG(user_id) AS uv
FROM TABLE(
TUMBLE(
TABLE user_events,
DESCRIPTOR(event_time),
INTERVAL '1' MINUTE
)
)
GROUP BY
window_start,
window_end,
tag;2. 分鐘級位元影像儲存
根據即時日誌,統計每分鐘的位元影像資料持久化到外部儲存。
DDL
-- 資料來源表
CREATE TEMPORARY TABLE user_events
(
user_id INT
,tag STRING -- 事件類別目錄
,event_time TIMESTAMP(3)
,WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
)
WITH ('connector' = 'kafka' ...)
;
-- 分鐘級位元影像結果表
CREATE TEMPORARY TABLE minute_bitmaps
(
ts_date STRING
,ts_hour STRING
,ts_minute STRING
,tag STRING -- 事件類別目錄
,minute_bm BYTES -- 序列化後的分鐘級位元影像
)
WITH ('connector' = 'jdbc' ...)
;DML
核心邏輯:在 Flink 側完成“明細到位元影像”的輕量級轉換,並不直接輸出計數值,而是輸出序列化後的位元據。
技術特點:利用
BITMAP_BUILD_AGG構造位元影像對象,並通過BITMAP_TO_BYTES將其序列化為符合 RoaringBitmap 標準的位元組數組(VARBINARY/BYTES)。適用情境:構建中介層資料倉儲(DWD/DWS),或對接支援位元影像的 OLAP 引擎(如 StarRocks、Hologres)。
關鍵優勢:具備可加性。輸出的二進位位元影像資料保留了集合資訊,下遊作業或查詢引擎可以隨時載入這些資料,通過“位元影像並集(OR)”運算,將多個分鐘級的位元影像合并為小時級或天級位元影像,從而實現靈活的跨時間視窗上卷。
INSERT INTO minute_bitmaps
SELECT
DATE_FORMAT(window_start, 'yyyy-MM-dd') AS ts_date,
DATE_FORMAT(window_start, 'HH') AS ts_hour,
DATE_FORMAT(window_start, 'mm') AS ts_minute,
tag,
-- 核心處理:構造位元影像並序列化
-- 1. BITMAP_BUILD_AGG:將視窗內的 user_id 彙總為記憶體中的位元影像對象
-- 2. BITMAP_TO_BYTES:將位元影像對象轉碼為標準二進位格式,以便持久化儲存
BITMAP_TO_BYTES(
BITMAP_BUILD_AGG(user_id)
) AS user_id_bitmap
FROM TABLE(
TUMBLE(
TABLE user_events,
DESCRIPTOR(event_time),
INTERVAL '1' MINUTE
)
)
GROUP BY
window_start,
window_end,
tag;3. 使用者畫像與群體篩選
基於分鐘級的位元影像資料,統計在每個小時中,搜尋過關鍵詞(A)且將商品加入購物車(B),但未完成支付(C)的使用者數。
DDL
-- 預計算分鐘級位元影像表
CREATE TEMPORARY TABLE minute_bitmaps
(
ts_date STRING
,ts_hour STRING
,ts_minute STRING
,tag STRING -- 事件類別目錄
,minute_bm BYTES -- 預計算的序列化分鐘級位元影像(樣本2中的分鐘級位元影像)
)
WITH ('connector' = 'jdbc' ...)
;
-- 小時級 UV 分析表
CREATE TEMPORARY TABLE hour_combined_uv
(
ts_date STRING
,ts_hour STRING
,tag STRING -- 事件類別目錄
,uv BIGINT
)
WITH ('connector' = 'jdbc' ...)
;DML
核心邏輯:
還原序列化與上卷:將儲存層的位元據還原為位元影像對象,並通過並集運算將多個分鐘級位元影像合并為小時級位元影像。
集合代數運算:在記憶體中對不同維度位元影像執行交集、差集等操作。
基數計算:統計最終結果集中的元素數量。
業務情境:使用者Portrait analysis、漏鬥分析或特定群體排除。
關鍵優勢:計算邏輯完全解耦。商務邏輯的變更(如增加過濾條件)僅需修改 SQL 查詢,無需修改上遊資料產出鏈路,實現了極高的分析靈活性。
INSERT INTO hour_combined_uv
SELECT
ts_date,
ts_hour,
'A and B andnot C' AS metric_name,
-- 第三步:計算基數
-- 統計最終位元影像中的使用者數量,輸出 BIGINT
BITMAP_CARDINALITY(
-- 第二步:執行集合代數運算
-- 邏輯表達:(Tag A ∩ Tag B) - Tag C
BITMAP_ANDNOT(
BITMAP_AND(hour_bm_a, hour_bm_b),
hour_bm_c
)
) AS uv
FROM (
SELECT
ts_date,
ts_hour,
-- 第一步:還原序列化與時間上卷
-- 將儲存的位元據(BYTES)還原為位元影像,
-- 並使用 BITMAP_OR_AGG 將該小時內所有分鐘級位元影像合并(Union)
BITMAP_OR_AGG(BITMAP_FROM_BYTES(minute_bm))
FILTER (WHERE tag = 'A') AS hour_bm_a,
BITMAP_OR_AGG(BITMAP_FROM_BYTES(minute_bm))
FILTER (WHERE tag = 'B') AS hour_bm_b,
BITMAP_OR_AGG(BITMAP_FROM_BYTES(minute_bm))
FILTER (WHERE tag = 'C') AS hour_bm_c
FROM minute_bitmaps
WHERE tag IN ('A', 'B', 'C')
GROUP BY
ts_date,
ts_hour
);4. 即時留存分析
基於分鐘級和天級的位元影像資料,Realtime Compute某個頁面截止到當天某分鐘的回訪資料。
DDL
-- 預計算分鐘級位元影像表
CREATE TEMPORARY TABLE minute_bitmaps
(
ts_date STRING
,ts_hour STRING
,ts_minute STRING
,tag STRING -- 頁面名
,minute_bm BYTES -- 預計算的序列化分鐘級位元影像
)
WITH ('connector' = 'jdbc' ...)
;
-- 預計算天級位元影像表
CREATE TEMPORARY TABLE daily_bitmaps
(
ts_date STRING
,tag STRING -- 頁面名
,daily_bm BYTES -- 預計算的序列化天級位元影像
)
WITH ('connector' = 'jdbc' ...)
;
-- 即時留存結果表
CREATE TEMPORARY TABLE realtime_retention_metrics
(
ts_date STRING
,tag STRING -- 頁面名
,retained_users BIGINT -- 當日回訪數
,total_users BIGINT -- 昨日訪客總人數
,update_time TIMESTAMP(3)
,PRIMARY KEY (ts_date, tag) NOT ENFORCED
)
WITH ('connector' = 'jdbc' ...)
;DML
核心邏輯:
即時彙總:將當日產生的分鐘級位元影像流,即時彙總為“截至當前的當日總位元影像”。
流批關聯:通過 JOIN 操作,將當日即時位元影像與儲存層中昨天的全量位元影像進行關聯。
交集運算:對兩天的人群位元影像執行
BITMAP_AND(交集)操作,得出重疊使用者集合。
適用情境:即時監控大促活動的使用者召回效果、遊戲日活使用者的黏性分析。
關鍵優勢:
極速計算:傳統方案需關聯兩張億級大表計算
COUNT DISTINCT,開銷巨大且延遲高。BITMAP 方案僅需對少量二進位對象進行位元影像運算,計算僅在毫秒級完成。儲存解耦:歷史資料(如 daily_bitmaps)可儲存在外部系統(如 HDFS/OSS/JDBC),無需長期佔用 Flink 狀態資源。
INSERT INTO realtime_retention_metrics
SELECT
T.ts_date,
T.tag,
-- 計算指標 1:當日即時回訪數(留存使用者)
-- 邏輯:今天出現的 ∩ 昨天出現的
BITMAP_CARDINALITY(
BITMAP_AND(
T.today_bm, -- 今日即時位元影像
BITMAP_FROM_BYTES(Y.daily_bm) -- 昨日歷史位元影像(需還原序列化)
)
) AS retained_users,
-- 計算指標 2:昨日訪客基準數
-- 邏輯:直接讀取昨天的位元影像基數,作為留存率計算的分母
BITMAP_CARDINALITY(
BITMAP_FROM_BYTES(Y.daily_bm)
) AS total_users,
CURRENT_TIMESTAMP AS update_time
FROM (
-- 子查詢 T:即時資料流處理
-- 將當天的分鐘級切片,即時彙總成“截至當前”的當日位元影像
SELECT
ts_date,
tag,
BITMAP_OR_AGG(BITMAP_FROM_BYTES(minute_bm)) AS today_bm
FROM minute_bitmaps
GROUP BY ts_date, tag
) T
-- 核心關聯:流(今天)與 批(昨天)的結合
INNER JOIN daily_bitmaps Y
ON T.tag = Y.tag
-- 關聯條件:匹配 T 的前一天資料
AND CAST(T.ts_date AS DATE) = CAST(Y.ts_date AS DATE) + INTERVAL '1' DAY;