This topic describes how to use the BITMAP data type in Realtime Compute for Apache Flink for exact deduplication. It also provides SQL examples for typical application scenarios.
Background information
In Flink real-time computing, traditional deduplication solutions have the following limitations:
High resource overhead: The
COUNT DISTINCTfunction must maintain all deduplication keys in the state. Processing large data volumes causes the state to grow, which consumes significant memory and CPU resources.Difficult multidimensional scaling: Pre-aggregation solutions lack flexibility. To support queries on any combination of N dimensions, you must list 2N groups, which makes the job logic complex.
Compromised accuracy: Approximate algorithms such as
APPROX_COUNT_DISTINCThave statistical errors. They cannot meet the strict accuracy requirements of some business scenarios.
The BITMAP solution
To balance accuracy and performance, Realtime Compute for Apache Flink introduces the BITMAP data type.
How it works: This solution is typically based on the RoaringBitmap algorithm. It compresses detailed integer data into bitmap objects. The computation logic changes from expensive set maintenance to efficient bitmap logical operations, such as union, intersection, and difference.
Core advantages: BITMAP significantly reduces state storage requirements. It also lets you perform flexible roll-ups, drill-downs, and cross-analyses on any dimension without any loss of accuracy.
Limits
Only Ververica Runtime (VVR) 11.5 and later versions of Realtime Compute for Apache Flink support the BITMAP data type and its related functions.
Usage
The BITMAP type is a set of 32-bit unsigned integers based on the RoaringBitmap standard. It supports constant-time cardinality calculations and efficient set algebra operations. There are two main ways to use it:
Real-time exact deduplication
You can process raw, detailed data directly. This provides the same deduplication capability as COUNT DISTINCT but with better memory efficiency and computing performance for large-scale data. (See Example 1)
Pre-aggregation for multidimensional analysis
You can generate bitmaps sliced by business dimension and persist them to storage. This lets you use set operation functions to flexibly perform roll-up and drill-down analysis on any dimension without accessing the detailed data. (See Examples 2, 3, and 4)
For more information about BITMAP built-in functions and their usage, see BITMAP functions.
Examples
The following four examples show how BITMAP solves performance and flexibility problems at different business stages:
Basic scenario (Example 1): Focus on real-time result output. This example shows how to directly replace
COUNT DISTINCTwith BITMAP to obtain the same exact deduplication results with lower resource consumption.Advanced scenarios: Focus on data warehouse layering and flexible analysis. These three examples form a complete pre-aggregation and post-computation workflow:
Storage (Example 2): Transform detailed data into bitmap slices and persist them to build a reusable intermediate layer.
Analysis (Example 3): Based on the intermediate layer data, you can use bitmap operations to perform roll-ups and cross-analyses on any dimension without reprocessing historical data.
Integration (Example 4): Combine real-time streams with offline historical bitmaps. This allows for low-cost calculation of stream-batch integration metrics, such as month-over-month, year-over-year, and retention rates.
1. Real-time exact deduplication
Use the BITMAP_BUILD_CARDINALITY_AGG function to count minute-level unique visitors (UVs) from real-time logs.
DDL
-- Data source table
CREATE TEMPORARY TABLE user_events
(
user_id INT
,tag STRING -- Event tag
,event_time TIMESTAMP(3)
,WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
)
WITH ('connector' = 'kafka' ...)
;
-- Sink table for minute-level UV statistics
CREATE TEMPORARY TABLE minute_uv
(
ts_date STRING
,ts_hour STRING
,ts_minute STRING
,tag STRING -- Event tag
,minute_uv BIGINT -- UV count
)
WITH ('connector' = 'jdbc' ...)
;DML
Core logic: Use the
BITMAP_BUILD_CARDINALITY_AGGfunction to perform deduplication and counting simultaneously when the window calculation is complete.Scenarios: Real-time dashboards and simple minute-level monitoring metrics.
Note: The output is a scalar value and is not additive. Because of the principles of set deduplication, you cannot directly add the statistical values from different time windows to obtain the total UV for a longer time span. For example, you cannot simply add two "1-minute UV" counts to obtain a "2-minute deduplicated UV" count.
INSERT INTO minute_uv
SELECT
DATE_FORMAT(window_start, 'yyyy-MM-dd') AS ts_date,
DATE_FORMAT(window_start, 'HH') AS ts_hour,
DATE_FORMAT(window_start, 'mm') AS ts_minute,
tag,
-- Core function: Builds a bitmap and directly returns its cardinality (the deduplicated count).
-- This logic is equivalent to COUNT(DISTINCT user_id) but offers better performance.
BITMAP_BUILD_CARDINALITY_AGG(user_id) AS uv
FROM TABLE(
TUMBLE(
TABLE user_events,
DESCRIPTOR(event_time),
INTERVAL '1' MINUTE
)
)
GROUP BY
window_start,
window_end,
tag;2. Store minute-level bitmaps
Persist minute-level bitmap data from real-time logs to an external storage system.
DDL
-- Data source table
CREATE TEMPORARY TABLE user_events
(
user_id INT
,tag STRING -- Event tag
,event_time TIMESTAMP(3)
,WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
)
WITH ('connector' = 'kafka' ...)
;
-- Sink table for minute-level bitmaps
CREATE TEMPORARY TABLE minute_bitmaps
(
ts_date STRING
,ts_hour STRING
,ts_minute STRING
,tag STRING -- Event tag
,minute_bm BYTES -- Serialized minute-level bitmap
)
WITH ('connector' = 'jdbc' ...)
;DML
Core logic: Perform a lightweight transformation from detailed data to a bitmap on the Flink side. Instead of directly outputting a count value, you can output the serialized binary data.
Technical details: Use
BITMAP_BUILD_AGGto construct a bitmap object. Then, useBITMAP_TO_BYTESto serialize it into a byte array (VARBINARY/BYTES) that complies with the RoaringBitmap standard.Scenarios: Building an intermediate data warehouse (DWD/DWS) or connecting to an Online Analytical Processing (OLAP) engine that supports bitmaps, such as StarRocks or Hologres.
Key advantage: It is additive. The output binary bitmap data retains the set information. Downstream jobs or query engines can load this data at any time. They can then use a bitmap union (OR) operation to merge multiple minute-level bitmaps into hour-level or day-level bitmaps. This allows for flexible roll-ups across time windows.
INSERT INTO minute_bitmaps
SELECT
DATE_FORMAT(window_start, 'yyyy-MM-dd') AS ts_date,
DATE_FORMAT(window_start, 'HH') AS ts_hour,
DATE_FORMAT(window_start, 'mm') AS ts_minute,
tag,
-- Core process: Construct and serialize the bitmap.
-- 1. BITMAP_BUILD_AGG: Aggregates user_ids within the window into an in-memory bitmap object.
-- 2. BITMAP_TO_BYTES: Encodes the bitmap object into a standard binary format for persistent storage.
BITMAP_TO_BYTES(
BITMAP_BUILD_AGG(user_id)
) AS user_id_bitmap
FROM TABLE(
TUMBLE(
TABLE user_events,
DESCRIPTOR(event_time),
INTERVAL '1' MINUTE
)
)
GROUP BY
window_start,
window_end,
tag;3. User profile and group filter
Based on minute-level bitmap data, count the number of users per hour who searched for a keyword (A) and added a product to their shopping cart (B), but did not complete the payment (C).
DDL
-- Pre-calculated minute-level bitmap table
CREATE TEMPORARY TABLE minute_bitmaps
(
ts_date STRING
,ts_hour STRING
,ts_minute STRING
,tag STRING -- Event tag
,minute_bm BYTES -- Pre-calculated serialized minute-level bitmap (from Example 2)
)
WITH ('connector' = 'jdbc' ...)
;
-- Hour-level UV analysis table
CREATE TEMPORARY TABLE hour_combined_uv
(
ts_date STRING
,ts_hour STRING
,tag STRING -- Event tag
,uv BIGINT
)
WITH ('connector' = 'jdbc' ...)
;DML
Core logic:
Deserialize and roll up: Convert the binary data from the storage layer back to bitmap objects. Then, use a union operation to merge multiple minute-level bitmaps into an hour-level bitmap.
Set algebra operations: Perform intersection, difference, and other operations on bitmaps of different dimensions in memory.
Cardinality calculation: Count the number of elements in the final result set.
Business scenarios: User persona analysis, funnel analysis, or specific group filtering.
Key advantage: The computation logic is completely decoupled. Changes to business logic, such as adding a filter condition, only require modifying the SQL query. You do not need to change the upstream data generation pipeline. This provides a high degree of analytical flexibility.
INSERT INTO hour_combined_uv
SELECT
ts_date,
ts_hour,
'A and B andnot C' AS metric_name,
-- Step 3: Calculate the cardinality.
-- Count the number of users in the final bitmap and output a BIGINT.
BITMAP_CARDINALITY(
-- Step 2: Perform set algebra operations.
-- Logical expression: (Tag A ∩ Tag B) - Tag C
BITMAP_ANDNOT(
BITMAP_AND(hour_bm_a, hour_bm_b),
hour_bm_c
)
) AS uv
FROM (
SELECT
ts_date,
ts_hour,
-- Step 1: Deserialize and perform a time roll-up.
-- Revert the stored binary data (BYTES) to bitmaps,
-- and use BITMAP_OR_AGG to merge (union) all minute-level bitmaps within the hour.
BITMAP_OR_AGG(BITMAP_FROM_BYTES(minute_bm))
FILTER (WHERE tag = 'A') AS hour_bm_a,
BITMAP_OR_AGG(BITMAP_FROM_BYTES(minute_bm))
FILTER (WHERE tag = 'B') AS hour_bm_b,
BITMAP_OR_AGG(BITMAP_FROM_BYTES(minute_bm))
FILTER (WHERE tag = 'C') AS hour_bm_c
FROM minute_bitmaps
WHERE tag IN ('A', 'B', 'C')
GROUP BY
ts_date,
ts_hour
);4. Real-time retention analysis
Based on minute-level and day-level bitmap data, this example calculates the real-time return visitor count for a specific page up to a specific minute of the current day.
DDL
-- Pre-calculated minute-level bitmap table
CREATE TEMPORARY TABLE minute_bitmaps
(
ts_date STRING
,ts_hour STRING
,ts_minute STRING
,tag STRING -- Page name
,minute_bm BYTES -- Pre-calculated serialized minute-level bitmap
)
WITH ('connector' = 'jdbc' ...)
;
-- Pre-calculated day-level bitmap table
CREATE TEMPORARY TABLE daily_bitmaps
(
ts_date STRING
,tag STRING -- Page name
,daily_bm BYTES -- Pre-calculated serialized day-level bitmap
)
WITH ('connector' = 'jdbc' ...)
;
-- Sink table for real-time retention metrics
CREATE TEMPORARY TABLE realtime_retention_metrics
(
ts_date STRING
,tag STRING -- Page name
,retained_users BIGINT -- Number of return visits on the current day
,total_users BIGINT -- Total number of visitors from the previous day
,update_time TIMESTAMP(3)
,PRIMARY KEY (ts_date, tag) NOT ENFORCED
)
WITH ('connector' = 'jdbc' ...)
;DML
Core logic:
Real-time aggregation: Aggregate the minute-level bitmap stream from the current day in real time to create a cumulative bitmap for the day.
Stream-batch join: Use a JOIN operation to associate the real-time bitmap of the current day with the full bitmap of the previous day from the storage layer.
Intersection operation: Perform a
BITMAP_AND(intersection) operation on the user group bitmaps for the two days to obtain the set of overlapping users.
Scenarios: Real-time monitoring of user response to sales promotions, or analysis of daily active user stickiness in games.
Key advantages:
Extremely fast computation: Traditional solutions need to join two large tables with hundreds of millions of rows to calculate
COUNT DISTINCT. This process has enormous overhead and high latency. The BITMAP solution only needs to perform bitmap operations on a few binary objects, and the computation is completed in milliseconds.Storage decoupling: Historical data, such as daily_bitmaps, can be stored in external systems like HDFS, OSS, or JDBC. This avoids long-term use of Flink state resources.
INSERT INTO realtime_retention_metrics
SELECT
T.ts_date,
T.tag,
-- Metric 1: Real-time return visits on the current day (retained users).
-- Logic: Users who appeared today ∩ Users who appeared yesterday.
BITMAP_CARDINALITY(
BITMAP_AND(
T.today_bm, -- Real-time bitmap for today
BITMAP_FROM_BYTES(Y.daily_bm) -- Historical bitmap from yesterday (requires deserialization)
)
) AS retained_users,
-- Metric 2: Baseline number of visitors from yesterday.
-- Logic: Directly read the cardinality of yesterday's bitmap to use as the denominator for retention rate calculation.
BITMAP_CARDINALITY(
BITMAP_FROM_BYTES(Y.daily_bm)
) AS total_users,
CURRENT_TIMESTAMP AS update_time
FROM (
-- Subquery T: Real-time stream processing.
-- Aggregates the minute-level slices of the current day into a "current-to-date" bitmap for today in real time.
SELECT
ts_date,
tag,
BITMAP_OR_AGG(BITMAP_FROM_BYTES(minute_bm)) AS today_bm
FROM minute_bitmaps
GROUP BY ts_date, tag
) T
-- Core join: Combines the stream (today) with the batch (yesterday).
INNER JOIN daily_bitmaps Y
ON T.tag = Y.tag
-- Join condition: Match the data from the day before T.
AND CAST(T.ts_date AS DATE) = CAST(Y.ts_date AS DATE) + INTERVAL '1' DAY;