全部產品
Search
文件中心

Realtime Compute for Apache Flink:Flink通過BITMAP類型實現精確去重

更新時間:Jan 10, 2026

本文將為您介紹Realtime ComputeFlink版通過BITMAP位元影像類型進行精確去重的使用方法,並結合多個SQL樣本介紹典型的應用情境。

背景資訊

在 Flink Realtime Compute中,傳統的去重方案存在以下局限:

  • 資源開銷高昂: COUNT DISTINCT 需在狀態(State)中維護全量的去重鍵。海量資料會導致狀態體積膨脹,消耗大量記憶體與 CPU 資源。

  • 多維擴充困難: 預彙總方案缺乏靈活性。支援 N 個維度任意組合查詢需要窮舉 2N 個組,導致作業邏輯臃腫。

  • 準確性妥協: APPROX_COUNT_DISTINCT 等近似演算法存在統計誤差,無法滿足對資料準確性要求嚴苛的業務情境。

BITMAP 解決方案

為了兼顧準確性與效能,Realtime Compute Flink 版引入了 BITMAP(位元影像) 資料類型。

  • 技術原理: 該方案通常基於 RoaringBitmap 演算法,將整型詳細資料壓縮為位元影像對象。計算邏輯從昂貴的集合維護轉化為高效的位元影像邏輯運算(如並集、交集、差集)。

  • 核心優勢: BITMAP 極大降低了狀態儲存壓力。同時,它允許業務方在不損失 100% 準確性的前提下,對任意維度進行靈活的上卷(Roll-up)、下鑽(Drill-down)及交叉分析。

使用限制

僅Realtime ComputeFlink版VVR 11.5及以上版本支援BITMAP位元影像類型及相關函數。

使用方法

BITMAP類型是基於RoaringBitmap標準的 32 位不帶正負號的整數集合,支援常數級的基數計算以及高效的集合代數運算。其主要有以下兩種使用方法:

即時精確去重
直接處理原始詳細資料,提供與 COUNT DISTINCT 語義完全等效的去重能力,但在大規模資料下擁有更優的記憶體效率與計算效能。(參見樣本 1)

多維分析預彙總
按業務維度產生分區位元影像(Slicing)並持久化儲存。通過集合運算函數,業務方可在不訪問詳細資料的情況下,靈活實現任意維度上卷(Roll-up)與下鑽(Drill-down)分析。(參見樣本 2、3、4)

BITMAP目前支援的內建函數及使用建議請參考BITMAP 位元影像函數

使用樣本

為了展示 BITMAP 如何在不同業務階段解決效能與靈活性問題,我們將通過以下四個循序漸進的樣本進行說明:

  1. 基礎情境(樣本 1): 關注即時結果輸出。展示如何用 BITMAP 直接替換 COUNT DISTINCT,以更低的資源消耗擷取等效的精確去重結果。

  2. 進階情境: 關注數倉分層與靈活分析。這三個樣本構成了一套完整的“預彙總—後計算”工作流程:

    • 儲存(樣本 2): 將詳細資料轉化為位元影像切片並持久化,構建可複用的中介層。

    • 分析(樣本 3): 基於中介層資料,通過位元影像運算實現任意維度上卷與交叉分析,無需重跑歷史資料。

    • 融合(樣本 4): 結合即時資料流與離線歷史位元影像,低成本實現同環比、留存率等流批融合指標計算。

1. 即時精確去重

根據即時日誌,通過BITMAP_BUILD_CARDINALITY_AGG函數統計分鐘級UV。

DDL

-- 資料來源表
CREATE TEMPORARY TABLE user_events
(
      user_id     INT
      ,tag        STRING  --  事件類別目錄
      ,event_time TIMESTAMP(3)
      ,WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
)
WITH ('connector' = 'kafka' ...)
;

-- 分鐘級 UV 統計結果表
CREATE TEMPORARY TABLE minute_uv
(
      ts_date    STRING
      ,ts_hour   STRING
      ,ts_minute STRING
      ,tag       STRING  --  事件類別目錄
      ,minute_uv BIGINT  -- UV計數
)
WITH ('connector' = 'jdbc' ...)
;

DML

  • 核心邏輯:利用BITMAP_BUILD_CARDINALITY_AGG函數,在視窗計算結束時同時執行“去重 + 計數”。

  • 適用情境:即時大屏、簡單的分鐘級監控指標。

  • 注意事項:輸出結果為標量數值,不具備可加性。受限於集合去重原理,無法直接累加不同時間視窗的統計值來擷取更長時間跨度的總 UV(例如:不能簡單將兩個“1分鐘 UV”相加得出“2分鐘去重 UV”)。

INSERT INTO minute_uv
SELECT
    DATE_FORMAT(window_start, 'yyyy-MM-dd') AS ts_date,
    DATE_FORMAT(window_start, 'HH') AS ts_hour,
    DATE_FORMAT(window_start, 'mm') AS ts_minute,
    tag,
    -- 核心函數:構建位元影像並直接返回基數(去重後的 count 值)
    -- 此處邏輯等效於 COUNT(DISTINCT user_id),但效能更優
    BITMAP_BUILD_CARDINALITY_AGG(user_id) AS uv
FROM TABLE(
    TUMBLE(
        TABLE user_events, 
        DESCRIPTOR(event_time), 
        INTERVAL '1' MINUTE
    )
)
GROUP BY
    window_start,
    window_end,
    tag;

2. 分鐘級位元影像儲存

根據即時日誌,統計每分鐘的位元影像資料持久化到外部儲存。

DDL

-- 資料來源表
CREATE TEMPORARY TABLE user_events
(
      user_id     INT
      ,tag        STRING  --  事件類別目錄
      ,event_time TIMESTAMP(3)
      ,WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
)
WITH ('connector' = 'kafka' ...)
;

-- 分鐘級位元影像結果表
CREATE TEMPORARY TABLE minute_bitmaps
(
      ts_date    STRING
      ,ts_hour   STRING
      ,ts_minute STRING
      ,tag       STRING  --  事件類別目錄
      ,minute_bm BYTES   --  序列化後的分鐘級位元影像
)
WITH ('connector' = 'jdbc' ...)
;

DML

  • 核心邏輯:在 Flink 側完成“明細到位元影像”的輕量級轉換,並不直接輸出計數值,而是輸出序列化後的位元據。

  • 技術特點:利用 BITMAP_BUILD_AGG 構造位元影像對象,並通過 BITMAP_TO_BYTES 將其序列化為符合 RoaringBitmap 標準的位元組數組(VARBINARY/BYTES)。

  • 適用情境:構建中介層資料倉儲(DWD/DWS),或對接支援位元影像的 OLAP 引擎(如 StarRocks、Hologres)。

  • 關鍵優勢具備可加性。輸出的二進位位元影像資料保留了集合資訊,下遊作業或查詢引擎可以隨時載入這些資料,通過“位元影像並集(OR)”運算,將多個分鐘級的位元影像合并為小時級或天級位元影像,從而實現靈活的跨時間視窗上卷。

INSERT INTO minute_bitmaps
SELECT
    DATE_FORMAT(window_start, 'yyyy-MM-dd') AS ts_date,
    DATE_FORMAT(window_start, 'HH') AS ts_hour,
    DATE_FORMAT(window_start, 'mm') AS ts_minute,
    tag,
    -- 核心處理:構造位元影像並序列化
    -- 1. BITMAP_BUILD_AGG:將視窗內的 user_id 彙總為記憶體中的位元影像對象
    -- 2. BITMAP_TO_BYTES:將位元影像對象轉碼為標準二進位格式,以便持久化儲存
    BITMAP_TO_BYTES(
        BITMAP_BUILD_AGG(user_id)
    ) AS user_id_bitmap
FROM TABLE(
    TUMBLE(
        TABLE user_events, 
        DESCRIPTOR(event_time), 
        INTERVAL '1' MINUTE
    )
)
GROUP BY
    window_start,
    window_end,
    tag;

3. 使用者畫像與群體篩選

基於分鐘級的位元影像資料,統計在每個小時中,搜尋過關鍵詞(A)且將商品加入購物車(B),但未完成支付(C)的使用者數。

DDL

-- 預計算分鐘級位元影像表
CREATE TEMPORARY TABLE minute_bitmaps
(
      ts_date    STRING
      ,ts_hour   STRING
      ,ts_minute STRING
      ,tag       STRING  -- 事件類別目錄
      ,minute_bm BYTES   -- 預計算的序列化分鐘級位元影像(樣本2中的分鐘級位元影像)
)
WITH ('connector' = 'jdbc' ...)
;

-- 小時級 UV 分析表
CREATE TEMPORARY TABLE hour_combined_uv
(
      ts_date  STRING
      ,ts_hour STRING
      ,tag     STRING  -- 事件類別目錄
      ,uv      BIGINT
)
WITH ('connector' = 'jdbc' ...)
;

DML

  • 核心邏輯

    1. 還原序列化與上卷:將儲存層的位元據還原為位元影像對象,並通過並集運算將多個分鐘級位元影像合并為小時級位元影像。

    2. 集合代數運算:在記憶體中對不同維度位元影像執行交集、差集等操作。

    3. 基數計算:統計最終結果集中的元素數量。

  • 業務情境:使用者Portrait analysis、漏鬥分析或特定群體排除。

  • 關鍵優勢:計算邏輯完全解耦。商務邏輯的變更(如增加過濾條件)僅需修改 SQL 查詢,無需修改上遊資料產出鏈路,實現了極高的分析靈活性。

INSERT INTO hour_combined_uv
SELECT
    ts_date,
    ts_hour,
    'A and B andnot C' AS metric_name,
    -- 第三步:計算基數
    -- 統計最終位元影像中的使用者數量,輸出 BIGINT
    BITMAP_CARDINALITY(
        -- 第二步:執行集合代數運算
        -- 邏輯表達:(Tag A ∩ Tag B) - Tag C
        BITMAP_ANDNOT(
            BITMAP_AND(hour_bm_a, hour_bm_b),
            hour_bm_c
        )
    ) AS uv
FROM (
    SELECT
        ts_date,
        ts_hour,
        -- 第一步:還原序列化與時間上卷
        -- 將儲存的位元據(BYTES)還原為位元影像,
        -- 並使用 BITMAP_OR_AGG 將該小時內所有分鐘級位元影像合并(Union)
        BITMAP_OR_AGG(BITMAP_FROM_BYTES(minute_bm)) 
            FILTER (WHERE tag = 'A') AS hour_bm_a,
            
        BITMAP_OR_AGG(BITMAP_FROM_BYTES(minute_bm)) 
            FILTER (WHERE tag = 'B') AS hour_bm_b,
            
        BITMAP_OR_AGG(BITMAP_FROM_BYTES(minute_bm)) 
            FILTER (WHERE tag = 'C') AS hour_bm_c
    FROM minute_bitmaps
    WHERE tag IN ('A', 'B', 'C')
    GROUP BY
        ts_date,
        ts_hour
);

4. 即時留存分析

基於分鐘級和天級的位元影像資料,Realtime Compute某個頁面截止到當天某分鐘的回訪資料。

DDL

-- 預計算分鐘級位元影像表
CREATE TEMPORARY TABLE minute_bitmaps
(
      ts_date    STRING
      ,ts_hour   STRING
      ,ts_minute STRING
      ,tag       STRING  -- 頁面名
      ,minute_bm BYTES   -- 預計算的序列化分鐘級位元影像
)
WITH ('connector' = 'jdbc' ...)
;

-- 預計算天級位元影像表
CREATE TEMPORARY TABLE daily_bitmaps
(
      ts_date   STRING
      ,tag      STRING  -- 頁面名
      ,daily_bm BYTES   -- 預計算的序列化天級位元影像
)
WITH ('connector' = 'jdbc' ...)
;

-- 即時留存結果表
CREATE TEMPORARY TABLE realtime_retention_metrics
(
      ts_date         STRING
      ,tag            STRING  -- 頁面名
      ,retained_users BIGINT  -- 當日回訪數
      ,total_users    BIGINT  -- 昨日訪客總人數
      ,update_time    TIMESTAMP(3)
      ,PRIMARY KEY (ts_date, tag) NOT ENFORCED
)
WITH ('connector' = 'jdbc' ...)
;

DML

  • 核心邏輯

    1. 即時彙總:將當日產生的分鐘級位元影像流,即時彙總為“截至當前的當日總位元影像”。

    2. 流批關聯:通過 JOIN 操作,將當日即時位元影像與儲存層中昨天的全量位元影像進行關聯。

    3. 交集運算:對兩天的人群位元影像執行 BITMAP_AND(交集)操作,得出重疊使用者集合。

  • 適用情境:即時監控大促活動的使用者召回效果、遊戲日活使用者的黏性分析。

  • 關鍵優勢

    • 極速計算:傳統方案需關聯兩張億級大表計算 COUNT DISTINCT,開銷巨大且延遲高。BITMAP 方案僅需對少量二進位對象進行位元影像運算,計算僅在毫秒級完成。

    • 儲存解耦:歷史資料(如 daily_bitmaps)可儲存在外部系統(如 HDFS/OSS/JDBC),無需長期佔用 Flink 狀態資源。

INSERT INTO realtime_retention_metrics
SELECT
    T.ts_date,
    T.tag,
    -- 計算指標 1:當日即時回訪數(留存使用者)
    -- 邏輯:今天出現的 ∩ 昨天出現的
    BITMAP_CARDINALITY(
        BITMAP_AND(
            T.today_bm,                 -- 今日即時位元影像
            BITMAP_FROM_BYTES(Y.daily_bm) -- 昨日歷史位元影像(需還原序列化)
        )
    ) AS retained_users,
    
    -- 計算指標 2:昨日訪客基準數
    -- 邏輯:直接讀取昨天的位元影像基數,作為留存率計算的分母
    BITMAP_CARDINALITY(
        BITMAP_FROM_BYTES(Y.daily_bm)
    ) AS total_users,
    
    CURRENT_TIMESTAMP AS update_time
FROM (
    -- 子查詢 T:即時資料流處理
    -- 將當天的分鐘級切片,即時彙總成“截至當前”的當日位元影像
    SELECT
        ts_date,
        tag,
        BITMAP_OR_AGG(BITMAP_FROM_BYTES(minute_bm)) AS today_bm
    FROM minute_bitmaps
    GROUP BY ts_date, tag
) T
-- 核心關聯:流(今天)與 批(昨天)的結合
INNER JOIN daily_bitmaps Y
    ON T.tag = Y.tag
    -- 關聯條件:匹配 T 的前一天資料
    AND CAST(T.ts_date AS DATE) = CAST(Y.ts_date AS DATE) + INTERVAL '1' DAY;