本文将为您介绍实时计算Flink版通过BITMAP位图类型进行精确去重的使用方法,并结合多个SQL示例介绍典型的应用场景。
背景信息
在 Flink 实时计算中,传统的去重方案存在以下局限:
资源开销高昂:
COUNT DISTINCT需在状态(State)中维护全量的去重键。海量数据会导致状态体积膨胀,消耗大量内存与 CPU 资源。多维扩展困难: 预聚合方案缺乏灵活性。支持 N 个维度的任意组合查询需要穷举 2N 个组,导致作业逻辑臃肿。
准确性妥协:
APPROX_COUNT_DISTINCT等近似算法存在统计误差,无法满足对数据准确性要求严苛的业务场景。
BITMAP 解决方案
为了兼顾准确性与性能,实时计算 Flink 版引入了 BITMAP(位图) 数据类型。
技术原理: 该方案通常基于 RoaringBitmap 算法,将整型明细数据压缩为位图对象。计算逻辑从昂贵的集合维护转化为高效的位图逻辑运算(如并集、交集、差集)。
核心优势: BITMAP 极大降低了状态存储压力。同时,它允许业务方在不损失 100% 准确性的前提下,对任意维度进行灵活的上卷(Roll-up)、下钻(Drill-down)及交叉分析。
使用限制
仅实时计算Flink版VVR 11.5及以上版本支持BITMAP位图类型及相关函数。
使用方法
BITMAP类型是基于RoaringBitmap标准的 32 位无符号整数集合,支持常数级的基数计算以及高效的集合代数运算。其主要有以下两种使用方法:
实时精确去重
直接处理原始明细数据,提供与 COUNT DISTINCT 语义完全等效的去重能力,但在大规模数据下拥有更优的内存效率与计算性能。(参见示例 1)
多维分析预聚合
按业务维度生成分片位图(Slicing)并持久化存储。通过集合运算函数,业务方可在不访问明细数据的情况下,灵活实现任意维度的上卷(Roll-up)与下钻(Drill-down)分析。(参见示例 2、3、4)
BITMAP目前支持的内置函数及使用建议请参考BITMAP 位图函数。
使用示例
为了展示 BITMAP 如何在不同业务阶段解决性能与灵活性问题,我们将通过以下四个循序渐进的示例进行说明:
基础场景(示例 1): 关注实时结果输出。展示如何用 BITMAP 直接替换
COUNT DISTINCT,以更低的资源消耗获取等效的精确去重结果。进阶场景: 关注数仓分层与灵活分析。这三个示例构成了一套完整的“预聚合—后计算”工作流:
存储(示例 2): 将明细数据转化为位图切片并持久化,构建可复用的中间层。
分析(示例 3): 基于中间层数据,通过位图运算实现任意维度的上卷与交叉分析,无需重跑历史数据。
融合(示例 4): 结合实时流与离线历史位图,低成本实现同环比、留存率等流批融合指标计算。
1. 实时精确去重
根据实时日志,通过BITMAP_BUILD_CARDINALITY_AGG函数统计分钟级UV。
DDL
-- 数据源表
CREATE TEMPORARY TABLE user_events
(
user_id INT
,tag STRING -- 事件类别
,event_time TIMESTAMP(3)
,WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
)
WITH ('connector' = 'kafka' ...)
;
-- 分钟级 UV 统计结果表
CREATE TEMPORARY TABLE minute_uv
(
ts_date STRING
,ts_hour STRING
,ts_minute STRING
,tag STRING -- 事件类别
,minute_uv BIGINT -- UV计数
)
WITH ('connector' = 'jdbc' ...)
;DML
核心逻辑:利用
BITMAP_BUILD_CARDINALITY_AGG函数,在窗口计算结束时同时执行“去重 + 计数”。适用场景:实时大屏、简单的分钟级监控指标。
注意事项:输出结果为标量数值,不具备可加性。受限于集合去重原理,无法直接累加不同时间窗口的统计值来获取更长时间跨度的总 UV(例如:不能简单将两个“1分钟 UV”相加得出“2分钟去重 UV”)。
INSERT INTO minute_uv
SELECT
DATE_FORMAT(window_start, 'yyyy-MM-dd') AS ts_date,
DATE_FORMAT(window_start, 'HH') AS ts_hour,
DATE_FORMAT(window_start, 'mm') AS ts_minute,
tag,
-- 核心函数:构建位图并直接返回基数(去重后的 count 值)
-- 此处逻辑等效于 COUNT(DISTINCT user_id),但性能更优
BITMAP_BUILD_CARDINALITY_AGG(user_id) AS uv
FROM TABLE(
TUMBLE(
TABLE user_events,
DESCRIPTOR(event_time),
INTERVAL '1' MINUTE
)
)
GROUP BY
window_start,
window_end,
tag;2. 分钟级位图存储
根据实时日志,统计每分钟的位图数据持久化到外部存储。
DDL
-- 数据源表
CREATE TEMPORARY TABLE user_events
(
user_id INT
,tag STRING -- 事件类别
,event_time TIMESTAMP(3)
,WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
)
WITH ('connector' = 'kafka' ...)
;
-- 分钟级位图结果表
CREATE TEMPORARY TABLE minute_bitmaps
(
ts_date STRING
,ts_hour STRING
,ts_minute STRING
,tag STRING -- 事件类别
,minute_bm BYTES -- 序列化后的分钟级位图
)
WITH ('connector' = 'jdbc' ...)
;DML
核心逻辑:在 Flink 侧完成“明细到位图”的轻量级转换,并不直接输出计数值,而是输出序列化后的二进制数据。
技术特点:利用
BITMAP_BUILD_AGG构造位图对象,并通过BITMAP_TO_BYTES将其序列化为符合 RoaringBitmap 标准的字节数组(VARBINARY/BYTES)。适用场景:构建中间层数据仓库(DWD/DWS),或对接支持位图的 OLAP 引擎(如 StarRocks、Hologres)。
关键优势:具备可加性。输出的二进制位图数据保留了集合信息,下游作业或查询引擎可以随时加载这些数据,通过“位图并集(OR)”运算,将多个分钟级的位图合并为小时级或天级位图,从而实现灵活的跨时间窗口上卷。
INSERT INTO minute_bitmaps
SELECT
DATE_FORMAT(window_start, 'yyyy-MM-dd') AS ts_date,
DATE_FORMAT(window_start, 'HH') AS ts_hour,
DATE_FORMAT(window_start, 'mm') AS ts_minute,
tag,
-- 核心处理:构造位图并序列化
-- 1. BITMAP_BUILD_AGG:将窗口内的 user_id 聚合为内存中的位图对象
-- 2. BITMAP_TO_BYTES:将位图对象转码为标准二进制格式,以便持久化存储
BITMAP_TO_BYTES(
BITMAP_BUILD_AGG(user_id)
) AS user_id_bitmap
FROM TABLE(
TUMBLE(
TABLE user_events,
DESCRIPTOR(event_time),
INTERVAL '1' MINUTE
)
)
GROUP BY
window_start,
window_end,
tag;3. 用户画像与群体筛选
基于分钟级的位图数据,统计在每个小时中,搜索过关键词(A)且将商品加入购物车(B),但未完成支付(C)的用户数。
DDL
-- 预计算分钟级位图表
CREATE TEMPORARY TABLE minute_bitmaps
(
ts_date STRING
,ts_hour STRING
,ts_minute STRING
,tag STRING -- 事件类别
,minute_bm BYTES -- 预计算的序列化分钟级位图(示例2中的分钟级位图)
)
WITH ('connector' = 'jdbc' ...)
;
-- 小时级 UV 分析表
CREATE TEMPORARY TABLE hour_combined_uv
(
ts_date STRING
,ts_hour STRING
,tag STRING -- 事件类别
,uv BIGINT
)
WITH ('connector' = 'jdbc' ...)
;DML
核心逻辑:
反序列化与上卷:将存储层的二进制数据还原为位图对象,并通过并集运算将多个分钟级位图合并为小时级位图。
集合代数运算:在内存中对不同维度的位图执行交集、差集等操作。
基数计算:统计最终结果集中的元素数量。
业务场景:用户画像分析、漏斗分析或特定群体排除。
关键优势:计算逻辑完全解耦。业务逻辑的变更(如增加过滤条件)仅需修改 SQL 查询,无需修改上游数据产出链路,实现了极高的分析灵活性。
INSERT INTO hour_combined_uv
SELECT
ts_date,
ts_hour,
'A and B andnot C' AS metric_name,
-- 第三步:计算基数
-- 统计最终位图中的用户数量,输出 BIGINT
BITMAP_CARDINALITY(
-- 第二步:执行集合代数运算
-- 逻辑表达:(Tag A ∩ Tag B) - Tag C
BITMAP_ANDNOT(
BITMAP_AND(hour_bm_a, hour_bm_b),
hour_bm_c
)
) AS uv
FROM (
SELECT
ts_date,
ts_hour,
-- 第一步:反序列化与时间上卷
-- 将存储的二进制数据(BYTES)还原为位图,
-- 并使用 BITMAP_OR_AGG 将该小时内所有分钟级位图合并(Union)
BITMAP_OR_AGG(BITMAP_FROM_BYTES(minute_bm))
FILTER (WHERE tag = 'A') AS hour_bm_a,
BITMAP_OR_AGG(BITMAP_FROM_BYTES(minute_bm))
FILTER (WHERE tag = 'B') AS hour_bm_b,
BITMAP_OR_AGG(BITMAP_FROM_BYTES(minute_bm))
FILTER (WHERE tag = 'C') AS hour_bm_c
FROM minute_bitmaps
WHERE tag IN ('A', 'B', 'C')
GROUP BY
ts_date,
ts_hour
);4. 实时留存分析
基于分钟级和天级的位图数据,实时计算某个页面截止到当天某分钟的回访数据。
DDL
-- 预计算分钟级位图表
CREATE TEMPORARY TABLE minute_bitmaps
(
ts_date STRING
,ts_hour STRING
,ts_minute STRING
,tag STRING -- 页面名
,minute_bm BYTES -- 预计算的序列化分钟级位图
)
WITH ('connector' = 'jdbc' ...)
;
-- 预计算天级位图表
CREATE TEMPORARY TABLE daily_bitmaps
(
ts_date STRING
,tag STRING -- 页面名
,daily_bm BYTES -- 预计算的序列化天级位图
)
WITH ('connector' = 'jdbc' ...)
;
-- 实时留存结果表
CREATE TEMPORARY TABLE realtime_retention_metrics
(
ts_date STRING
,tag STRING -- 页面名
,retained_users BIGINT -- 当日回访数
,total_users BIGINT -- 昨日访客总人数
,update_time TIMESTAMP(3)
,PRIMARY KEY (ts_date, tag) NOT ENFORCED
)
WITH ('connector' = 'jdbc' ...)
;DML
核心逻辑:
实时聚合:将当日生成的分钟级位图流,实时聚合为“截至当前的当日总位图”。
流批关联:通过 JOIN 操作,将当日实时位图与存储层中昨天的全量位图进行关联。
交集运算:对两天的人群位图执行
BITMAP_AND(交集)操作,得出重叠用户集合。
适用场景:实时监控大促活动的用户召回效果、游戏日活用户的黏性分析。
关键优势:
极速计算:传统方案需关联两张亿级大表计算
COUNT DISTINCT,开销巨大且延迟高。BITMAP 方案仅需对少量二进制对象进行位图运算,计算仅在毫秒级完成。存储解耦:历史数据(如 daily_bitmaps)可存储在外部系统(如 HDFS/OSS/JDBC),无需长期占用 Flink 状态资源。
INSERT INTO realtime_retention_metrics
SELECT
T.ts_date,
T.tag,
-- 计算指标 1:当日实时回访数(留存用户)
-- 逻辑:今天出现的 ∩ 昨天出现的
BITMAP_CARDINALITY(
BITMAP_AND(
T.today_bm, -- 今日实时位图
BITMAP_FROM_BYTES(Y.daily_bm) -- 昨日历史位图(需反序列化)
)
) AS retained_users,
-- 计算指标 2:昨日访客基准数
-- 逻辑:直接读取昨天的位图基数,作为留存率计算的分母
BITMAP_CARDINALITY(
BITMAP_FROM_BYTES(Y.daily_bm)
) AS total_users,
CURRENT_TIMESTAMP AS update_time
FROM (
-- 子查询 T:实时流处理
-- 将当天的分钟级切片,实时聚合成“截至当前”的当日位图
SELECT
ts_date,
tag,
BITMAP_OR_AGG(BITMAP_FROM_BYTES(minute_bm)) AS today_bm
FROM minute_bitmaps
GROUP BY ts_date, tag
) T
-- 核心关联:流(今天)与 批(昨天)的结合
INNER JOIN daily_bitmaps Y
ON T.tag = Y.tag
-- 关联条件:匹配 T 的前一天数据
AND CAST(T.ts_date AS DATE) = CAST(Y.ts_date AS DATE) + INTERVAL '1' DAY;