全部产品
Search
文档中心

实时计算Flink版:Flink通过BITMAP类型实现精确去重

更新时间:Feb 02, 2026

本文将为您介绍实时计算Flink版通过BITMAP位图类型进行精确去重的使用方法,并结合多个SQL示例介绍典型的应用场景。

背景信息

在 Flink 实时计算中,传统的去重方案存在以下局限:

  • 资源开销高昂: COUNT DISTINCT 需在状态(State)中维护全量的去重键。海量数据会导致状态体积膨胀,消耗大量内存与 CPU 资源。

  • 多维扩展困难: 预聚合方案缺乏灵活性。支持 N 个维度的任意组合查询需要穷举 2N 个组,导致作业逻辑臃肿。

  • 准确性妥协: APPROX_COUNT_DISTINCT 等近似算法存在统计误差,无法满足对数据准确性要求严苛的业务场景。

BITMAP 解决方案

为了兼顾准确性与性能,实时计算 Flink 版引入了 BITMAP(位图) 数据类型。

  • 技术原理: 该方案通常基于 RoaringBitmap 算法,将整型明细数据压缩为位图对象。计算逻辑从昂贵的集合维护转化为高效的位图逻辑运算(如并集、交集、差集)。

  • 核心优势: BITMAP 极大降低了状态存储压力。同时,它允许业务方在不损失 100% 准确性的前提下,对任意维度进行灵活的上卷(Roll-up)、下钻(Drill-down)及交叉分析。

使用限制

仅实时计算Flink版VVR 11.5及以上版本支持BITMAP位图类型及相关函数。

使用方法

BITMAP类型是基于RoaringBitmap标准的 32 位无符号整数集合,支持常数级的基数计算以及高效的集合代数运算。其主要有以下两种使用方法:

实时精确去重
直接处理原始明细数据,提供与 COUNT DISTINCT 语义完全等效的去重能力,但在大规模数据下拥有更优的内存效率与计算性能。(参见示例 1)

多维分析预聚合
按业务维度生成分片位图(Slicing)并持久化存储。通过集合运算函数,业务方可在不访问明细数据的情况下,灵活实现任意维度的上卷(Roll-up)与下钻(Drill-down)分析。(参见示例 2、3、4)

BITMAP目前支持的内置函数及使用建议请参考BITMAP 位图函数

使用示例

为了展示 BITMAP 如何在不同业务阶段解决性能与灵活性问题,我们将通过以下四个循序渐进的示例进行说明:

  1. 基础场景(示例 1): 关注实时结果输出。展示如何用 BITMAP 直接替换 COUNT DISTINCT,以更低的资源消耗获取等效的精确去重结果。

  2. 进阶场景: 关注数仓分层与灵活分析。这三个示例构成了一套完整的“预聚合—后计算”工作流:

    • 存储(示例 2): 将明细数据转化为位图切片并持久化,构建可复用的中间层。

    • 分析(示例 3): 基于中间层数据,通过位图运算实现任意维度的上卷与交叉分析,无需重跑历史数据。

    • 融合(示例 4): 结合实时流与离线历史位图,低成本实现同环比、留存率等流批融合指标计算。

1. 实时精确去重

根据实时日志,通过BITMAP_BUILD_CARDINALITY_AGG函数统计分钟级UV。

DDL

-- 数据源表
CREATE TEMPORARY TABLE user_events
(
      user_id     INT
      ,tag        STRING  --  事件类别
      ,event_time TIMESTAMP(3)
      ,WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
)
WITH ('connector' = 'kafka' ...)
;

-- 分钟级 UV 统计结果表
CREATE TEMPORARY TABLE minute_uv
(
      ts_date    STRING
      ,ts_hour   STRING
      ,ts_minute STRING
      ,tag       STRING  --  事件类别
      ,minute_uv BIGINT  -- UV计数
)
WITH ('connector' = 'jdbc' ...)
;

DML

  • 核心逻辑:利用BITMAP_BUILD_CARDINALITY_AGG函数,在窗口计算结束时同时执行“去重 + 计数”。

  • 适用场景:实时大屏、简单的分钟级监控指标。

  • 注意事项:输出结果为标量数值,不具备可加性。受限于集合去重原理,无法直接累加不同时间窗口的统计值来获取更长时间跨度的总 UV(例如:不能简单将两个“1分钟 UV”相加得出“2分钟去重 UV”)。

INSERT INTO minute_uv
SELECT
    DATE_FORMAT(window_start, 'yyyy-MM-dd') AS ts_date,
    DATE_FORMAT(window_start, 'HH') AS ts_hour,
    DATE_FORMAT(window_start, 'mm') AS ts_minute,
    tag,
    -- 核心函数:构建位图并直接返回基数(去重后的 count 值)
    -- 此处逻辑等效于 COUNT(DISTINCT user_id),但性能更优
    BITMAP_BUILD_CARDINALITY_AGG(user_id) AS uv
FROM TABLE(
    TUMBLE(
        TABLE user_events, 
        DESCRIPTOR(event_time), 
        INTERVAL '1' MINUTE
    )
)
GROUP BY
    window_start,
    window_end,
    tag;

2. 分钟级位图存储

根据实时日志,统计每分钟的位图数据持久化到外部存储。

DDL

-- 数据源表
CREATE TEMPORARY TABLE user_events
(
      user_id     INT
      ,tag        STRING  --  事件类别
      ,event_time TIMESTAMP(3)
      ,WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
)
WITH ('connector' = 'kafka' ...)
;

-- 分钟级位图结果表
CREATE TEMPORARY TABLE minute_bitmaps
(
      ts_date    STRING
      ,ts_hour   STRING
      ,ts_minute STRING
      ,tag       STRING  --  事件类别
      ,minute_bm BYTES   --  序列化后的分钟级位图
)
WITH ('connector' = 'jdbc' ...)
;

DML

  • 核心逻辑:在 Flink 侧完成“明细到位图”的轻量级转换,并不直接输出计数值,而是输出序列化后的二进制数据。

  • 技术特点:利用 BITMAP_BUILD_AGG 构造位图对象,并通过 BITMAP_TO_BYTES 将其序列化为符合 RoaringBitmap 标准的字节数组(VARBINARY/BYTES)。

  • 适用场景:构建中间层数据仓库(DWD/DWS),或对接支持位图的 OLAP 引擎(如 StarRocks、Hologres)。

  • 关键优势具备可加性。输出的二进制位图数据保留了集合信息,下游作业或查询引擎可以随时加载这些数据,通过“位图并集(OR)”运算,将多个分钟级的位图合并为小时级或天级位图,从而实现灵活的跨时间窗口上卷。

INSERT INTO minute_bitmaps
SELECT
    DATE_FORMAT(window_start, 'yyyy-MM-dd') AS ts_date,
    DATE_FORMAT(window_start, 'HH') AS ts_hour,
    DATE_FORMAT(window_start, 'mm') AS ts_minute,
    tag,
    -- 核心处理:构造位图并序列化
    -- 1. BITMAP_BUILD_AGG:将窗口内的 user_id 聚合为内存中的位图对象
    -- 2. BITMAP_TO_BYTES:将位图对象转码为标准二进制格式,以便持久化存储
    BITMAP_TO_BYTES(
        BITMAP_BUILD_AGG(user_id)
    ) AS user_id_bitmap
FROM TABLE(
    TUMBLE(
        TABLE user_events, 
        DESCRIPTOR(event_time), 
        INTERVAL '1' MINUTE
    )
)
GROUP BY
    window_start,
    window_end,
    tag;

3. 用户画像与群体筛选

基于分钟级的位图数据,统计在每个小时中,搜索过关键词(A)且将商品加入购物车(B),但未完成支付(C)的用户数。

DDL

-- 预计算分钟级位图表
CREATE TEMPORARY TABLE minute_bitmaps
(
      ts_date    STRING
      ,ts_hour   STRING
      ,ts_minute STRING
      ,tag       STRING  -- 事件类别
      ,minute_bm BYTES   -- 预计算的序列化分钟级位图(示例2中的分钟级位图)
)
WITH ('connector' = 'jdbc' ...)
;

-- 小时级 UV 分析表
CREATE TEMPORARY TABLE hour_combined_uv
(
      ts_date  STRING
      ,ts_hour STRING
      ,tag     STRING  -- 事件类别
      ,uv      BIGINT
)
WITH ('connector' = 'jdbc' ...)
;

DML

  • 核心逻辑

    1. 反序列化与上卷:将存储层的二进制数据还原为位图对象,并通过并集运算将多个分钟级位图合并为小时级位图。

    2. 集合代数运算:在内存中对不同维度的位图执行交集、差集等操作。

    3. 基数计算:统计最终结果集中的元素数量。

  • 业务场景:用户画像分析、漏斗分析或特定群体排除。

  • 关键优势:计算逻辑完全解耦。业务逻辑的变更(如增加过滤条件)仅需修改 SQL 查询,无需修改上游数据产出链路,实现了极高的分析灵活性。

INSERT INTO hour_combined_uv
SELECT
    ts_date,
    ts_hour,
    'A and B andnot C' AS metric_name,
    -- 第三步:计算基数
    -- 统计最终位图中的用户数量,输出 BIGINT
    BITMAP_CARDINALITY(
        -- 第二步:执行集合代数运算
        -- 逻辑表达:(Tag A ∩ Tag B) - Tag C
        BITMAP_ANDNOT(
            BITMAP_AND(hour_bm_a, hour_bm_b),
            hour_bm_c
        )
    ) AS uv
FROM (
    SELECT
        ts_date,
        ts_hour,
        -- 第一步:反序列化与时间上卷
        -- 将存储的二进制数据(BYTES)还原为位图,
        -- 并使用 BITMAP_OR_AGG 将该小时内所有分钟级位图合并(Union)
        BITMAP_OR_AGG(BITMAP_FROM_BYTES(minute_bm)) 
            FILTER (WHERE tag = 'A') AS hour_bm_a,
            
        BITMAP_OR_AGG(BITMAP_FROM_BYTES(minute_bm)) 
            FILTER (WHERE tag = 'B') AS hour_bm_b,
            
        BITMAP_OR_AGG(BITMAP_FROM_BYTES(minute_bm)) 
            FILTER (WHERE tag = 'C') AS hour_bm_c
    FROM minute_bitmaps
    WHERE tag IN ('A', 'B', 'C')
    GROUP BY
        ts_date,
        ts_hour
);

4. 实时留存分析

基于分钟级和天级的位图数据,实时计算某个页面截止到当天某分钟的回访数据。

DDL

-- 预计算分钟级位图表
CREATE TEMPORARY TABLE minute_bitmaps
(
      ts_date    STRING
      ,ts_hour   STRING
      ,ts_minute STRING
      ,tag       STRING  -- 页面名
      ,minute_bm BYTES   -- 预计算的序列化分钟级位图
)
WITH ('connector' = 'jdbc' ...)
;

-- 预计算天级位图表
CREATE TEMPORARY TABLE daily_bitmaps
(
      ts_date   STRING
      ,tag      STRING  -- 页面名
      ,daily_bm BYTES   -- 预计算的序列化天级位图
)
WITH ('connector' = 'jdbc' ...)
;

-- 实时留存结果表
CREATE TEMPORARY TABLE realtime_retention_metrics
(
      ts_date         STRING
      ,tag            STRING  -- 页面名
      ,retained_users BIGINT  -- 当日回访数
      ,total_users    BIGINT  -- 昨日访客总人数
      ,update_time    TIMESTAMP(3)
      ,PRIMARY KEY (ts_date, tag) NOT ENFORCED
)
WITH ('connector' = 'jdbc' ...)
;

DML

  • 核心逻辑

    1. 实时聚合:将当日生成的分钟级位图流,实时聚合为“截至当前的当日总位图”。

    2. 流批关联:通过 JOIN 操作,将当日实时位图与存储层中昨天的全量位图进行关联。

    3. 交集运算:对两天的人群位图执行 BITMAP_AND(交集)操作,得出重叠用户集合。

  • 适用场景:实时监控大促活动的用户召回效果、游戏日活用户的黏性分析。

  • 关键优势

    • 极速计算:传统方案需关联两张亿级大表计算 COUNT DISTINCT,开销巨大且延迟高。BITMAP 方案仅需对少量二进制对象进行位图运算,计算仅在毫秒级完成。

    • 存储解耦:历史数据(如 daily_bitmaps)可存储在外部系统(如 HDFS/OSS/JDBC),无需长期占用 Flink 状态资源。

INSERT INTO realtime_retention_metrics
SELECT
    T.ts_date,
    T.tag,
    -- 计算指标 1:当日实时回访数(留存用户)
    -- 逻辑:今天出现的 ∩ 昨天出现的
    BITMAP_CARDINALITY(
        BITMAP_AND(
            T.today_bm,                 -- 今日实时位图
            BITMAP_FROM_BYTES(Y.daily_bm) -- 昨日历史位图(需反序列化)
        )
    ) AS retained_users,
    
    -- 计算指标 2:昨日访客基准数
    -- 逻辑:直接读取昨天的位图基数,作为留存率计算的分母
    BITMAP_CARDINALITY(
        BITMAP_FROM_BYTES(Y.daily_bm)
    ) AS total_users,
    
    CURRENT_TIMESTAMP AS update_time
FROM (
    -- 子查询 T:实时流处理
    -- 将当天的分钟级切片,实时聚合成“截至当前”的当日位图
    SELECT
        ts_date,
        tag,
        BITMAP_OR_AGG(BITMAP_FROM_BYTES(minute_bm)) AS today_bm
    FROM minute_bitmaps
    GROUP BY ts_date, tag
) T
-- 核心关联:流(今天)与 批(昨天)的结合
INNER JOIN daily_bitmaps Y
    ON T.tag = Y.tag
    -- 关联条件:匹配 T 的前一天数据
    AND CAST(T.ts_date AS DATE) = CAST(Y.ts_date AS DATE) + INTERVAL '1' DAY;