All Products
Search
Document Center

Realtime Compute for Apache Flink:Use BITMAP type for exact deduplication

Last Updated:Mar 26, 2026

COUNT DISTINCT is exact but expensive: Flink must keep every distinct key in state, and state grows linearly with data volume. Approximate functions like APPROX_COUNT_DISTINCT reduce overhead but introduce statistical error. The BITMAP data type eliminates both problems — it delivers the same exact count as COUNT DISTINCT by compressing integer IDs into compact bitmap objects and replacing state maintenance with efficient bitwise operations, at a fraction of the memory and CPU cost.

BITMAP for deduplication is distinct from bitmap indexes used in columnar storage engines. The two techniques share the name but solve different problems: bitmap indexes filter low-cardinality columns, while BITMAP deduplication counts distinct values.

Limitations

  • Only Ververica Runtime (VVR) 11.5 and later support the BITMAP data type and its related functions.

  • The BITMAP type stores 32-bit unsigned integers.

Key concepts

How BITMAP works

BITMAP is based on the RoaringBitmap standard. Each bitmap represents a set of 32-bit unsigned integers. Instead of scanning rows and tracking individual keys in Flink state, BITMAP aggregates IDs into a compact binary object and computes cardinality (the distinct count) through constant-time operations.

Two construction paths

Every bitmap in these examples is created one of two ways:

  • Aggregation from raw data: BITMAP_BUILD_AGG or BITMAP_BUILD_CARDINALITY_AGG aggregates integer IDs from a stream directly into a bitmap.

  • Deserialization from storage: BITMAP_FROM_BYTES converts a binary blob previously written by BITMAP_TO_BYTES back into a bitmap for further operations.

Understanding which path applies in each example helps you follow the SQL patterns below.

Additivity — a key architectural decision

How you output bitmap results determines what downstream consumers can do with the data:

Output type Function Additive? Use when
Scalar count (BIGINT) BITMAP_BUILD_CARDINALITY_AGG No — counts from different windows cannot be summed Real-time dashboards, simple monitoring
Serialized bitmap (BYTES) BITMAP_BUILD_AGG + BITMAP_TO_BYTES Yes — downstream can union bitmaps across any time grain Data warehouse layers, Online Analytical Processing (OLAP) integration, stream-batch joins
A "non-additive" scalar count means you cannot add two 1-minute UV counts to get a 2-minute deduplicated UV. Deduplication across time boundaries requires the bitmap objects themselves.
Pre-aggregation across multiple dimensions has a scaling constraint: to support queries on any combination of N dimensions, you must define 2^N groups. Plan your dimension combinations accordingly.

Usage

The four examples below progress from simple to advanced:

Example Pattern Output
1. Real-time exact deduplication Replace COUNT DISTINCT directly Scalar UV count per window
2. Store minute-level bitmaps Build and persist serialized bitmaps Binary bitmap objects for reuse
3. User segment filtering Rollup + set operations on stored bitmaps Filtered UV count per hour
4. Real-time retention analysis Stream-batch join on bitmap objects Retained user count vs. yesterday

For the complete BITMAP function reference, see BITMAP functions.

Example 1: Real-time exact deduplication

Count minute-level unique visitors (UV) directly from a real-time event stream. This is the simplest replacement for COUNT DISTINCT — same exact result, lower resource consumption.

DDL

-- Source: real-time event stream
CREATE TEMPORARY TABLE user_events
(
      user_id     INT
      ,tag        STRING          -- Event tag
      ,event_time TIMESTAMP(3)
      ,WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
)
WITH ('connector' = 'kafka' ...)
;

-- Sink: minute-level UV statistics
CREATE TEMPORARY TABLE minute_uv
(
      ts_date    STRING
      ,ts_hour   STRING
      ,ts_minute STRING
      ,tag       STRING           -- Event tag
      ,minute_uv BIGINT           -- Deduplicated UV count
)
WITH ('connector' = 'jdbc' ...)
;

DML

INSERT INTO minute_uv
SELECT
    DATE_FORMAT(window_start, 'yyyy-MM-dd') AS ts_date,
    DATE_FORMAT(window_start, 'HH')         AS ts_hour,
    DATE_FORMAT(window_start, 'mm')         AS ts_minute,
    tag,
    -- Builds a bitmap from user_ids in the window, then immediately returns
    -- its cardinality (the exact deduplicated count).
    -- Equivalent to COUNT(DISTINCT user_id) but with lower memory overhead.
    BITMAP_BUILD_CARDINALITY_AGG(user_id)   AS uv
FROM TABLE(
    TUMBLE(
        TABLE user_events,
        DESCRIPTOR(event_time),
        INTERVAL '1' MINUTE
    )
)
GROUP BY
    window_start,
    window_end,
    tag;

When to use this pattern: Real-time dashboards and per-minute monitoring metrics.

Important

The output is a scalar count and is not additive. You cannot add two 1-minute UV counts to get a 2-minute deduplicated UV. For aggregations that span multiple windows, use Example 2 instead.

Example 2: Store minute-level bitmaps

Instead of outputting a scalar count, serialize the bitmap itself to storage. Downstream jobs can then union these binary blobs across any time grain — minutes to hours to days — without reprocessing the original event data.

DDL

-- Source: real-time event stream (same schema as Example 1)
CREATE TEMPORARY TABLE user_events
(
      user_id     INT
      ,tag        STRING          -- Event tag
      ,event_time TIMESTAMP(3)
      ,WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
)
WITH ('connector' = 'kafka' ...)
;

-- Sink: minute-level serialized bitmaps
CREATE TEMPORARY TABLE minute_bitmaps
(
      ts_date    STRING
      ,ts_hour   STRING
      ,ts_minute STRING
      ,tag       STRING           -- Event tag
      ,minute_bm BYTES            -- Serialized bitmap (RoaringBitmap binary format)
)
WITH ('connector' = 'jdbc' ...)
;

DML

INSERT INTO minute_bitmaps
SELECT
    DATE_FORMAT(window_start, 'yyyy-MM-dd') AS ts_date,
    DATE_FORMAT(window_start, 'HH')         AS ts_hour,
    DATE_FORMAT(window_start, 'mm')         AS ts_minute,
    tag,
    -- Two-step process:
    -- 1. BITMAP_BUILD_AGG: aggregates user_ids within the tumbling window
    --    into an in-memory bitmap object.
    -- 2. BITMAP_TO_BYTES: encodes the bitmap into RoaringBitmap binary format
    --    for persistent storage.
    BITMAP_TO_BYTES(
        BITMAP_BUILD_AGG(user_id)
    ) AS user_id_bitmap
FROM TABLE(
    TUMBLE(
        TABLE user_events,
        DESCRIPTOR(event_time),
        INTERVAL '1' MINUTE
    )
)
GROUP BY
    window_start,
    window_end,
    tag;

When to use this pattern: Building a Data Warehouse Detail (DWD) or data warehouse service (DWS) intermediate layer, or feeding OLAP engines that support bitmaps natively — such as StarRocks or Hologres.

Key advantage: The serialized BYTES output retains the full set information. Any downstream job can call BITMAP_OR_AGG(BITMAP_FROM_BYTES(...)) to union minute-level bitmaps into hour-level or day-level bitmaps, enabling flexible time-grain rollups without touching the original raw data.

Example 3: User segment filtering

Using the minute-level bitmaps from Example 2, count hourly users who searched for a keyword (tag A) and added a product to their cart (tag B), but did not complete payment (tag C).

All set operations run on a small number of in-memory bitmap objects — no large table scans or shuffles.

DDL

-- Source: pre-computed minute-level bitmaps (produced by Example 2)
CREATE TEMPORARY TABLE minute_bitmaps
(
      ts_date    STRING
      ,ts_hour   STRING
      ,ts_minute STRING
      ,tag       STRING           -- Event tag
      ,minute_bm BYTES            -- Pre-computed serialized minute-level bitmap
)
WITH ('connector' = 'jdbc' ...)
;

-- Sink: hourly filtered UV counts
CREATE TEMPORARY TABLE hour_combined_uv
(
      ts_date  STRING
      ,ts_hour STRING
      ,tag     STRING             -- Metric label
      ,uv      BIGINT
)
WITH ('connector' = 'jdbc' ...)
;

DML

The query runs in three logical steps:

  1. Deserialize and rollup: Convert stored BYTES back to bitmap objects, then use BITMAP_OR_AGG to union all minute-level bitmaps within the hour — one unioned bitmap per tag.

  2. Set operations: Apply intersection and difference on the hourly bitmaps in memory.

  3. Count: Call BITMAP_CARDINALITY on the result set.

INSERT INTO hour_combined_uv
SELECT
    ts_date,
    ts_hour,
    'A and B andnot C' AS metric_name,
    -- Step 3: Count the elements in the final bitmap.
    BITMAP_CARDINALITY(
        -- Step 2: Set operations -- (Tag A ∩ Tag B) − Tag C.
        BITMAP_ANDNOT(
            BITMAP_AND(hour_bm_a, hour_bm_b),
            hour_bm_c
        )
    ) AS uv
FROM (
    SELECT
        ts_date,
        ts_hour,
        -- Step 1: Deserialize stored BYTES and union all minute-level bitmaps
        -- within the hour into a single hourly bitmap per tag.
        BITMAP_OR_AGG(BITMAP_FROM_BYTES(minute_bm))
            FILTER (WHERE tag = 'A') AS hour_bm_a,

        BITMAP_OR_AGG(BITMAP_FROM_BYTES(minute_bm))
            FILTER (WHERE tag = 'B') AS hour_bm_b,

        BITMAP_OR_AGG(BITMAP_FROM_BYTES(minute_bm))
            FILTER (WHERE tag = 'C') AS hour_bm_c
    FROM minute_bitmaps
    WHERE tag IN ('A', 'B', 'C')
    GROUP BY
        ts_date,
        ts_hour
);

When to use this pattern: User segment analysis, funnel analysis, or any scenario requiring flexible cross-dimensional filtering.

Key advantage: Business logic is fully decoupled from data generation. To add or change a filter condition, modify only the SQL query — no changes to the upstream pipeline are needed.

Example 4: Real-time retention analysis

Calculate the real-time count of return visitors for a given page: users who appeared today and also appeared yesterday.

This combines a real-time minute-level bitmap stream (today's data, continuously updated) with a pre-computed day-level bitmap from storage (yesterday's data, static). The intersection gives you the retained user count in milliseconds — no large table joins required.

DDL

-- Source 1: real-time minute-level bitmaps for today (produced by Example 2)
CREATE TEMPORARY TABLE minute_bitmaps
(
      ts_date    STRING
      ,ts_hour   STRING
      ,ts_minute STRING
      ,tag       STRING           -- Page name
      ,minute_bm BYTES            -- Pre-computed serialized minute-level bitmap
)
WITH ('connector' = 'jdbc' ...)
;

-- Source 2: pre-computed day-level bitmaps from storage (historical data)
CREATE TEMPORARY TABLE daily_bitmaps
(
      ts_date   STRING
      ,tag      STRING            -- Page name
      ,daily_bm BYTES             -- Pre-computed serialized day-level bitmap
)
WITH ('connector' = 'jdbc' ...)
;

-- Sink: real-time retention metrics
CREATE TEMPORARY TABLE realtime_retention_metrics
(
      ts_date         STRING
      ,tag            STRING      -- Page name
      ,retained_users BIGINT      -- Users present both today and yesterday
      ,total_users    BIGINT      -- Total visitors yesterday (retention denominator)
      ,update_time    TIMESTAMP(3)
      ,PRIMARY KEY (ts_date, tag) NOT ENFORCED
)
WITH ('connector' = 'jdbc' ...)
;

DML

The query runs in three logical steps:

  1. Real-time aggregation (subquery T): Continuously union today's minute-level bitmap slices into a cumulative "current-to-date" bitmap.

  2. Stream-batch join: Join today's real-time bitmap with yesterday's static daily bitmap from storage.

  3. Intersection: BITMAP_AND on the two bitmaps produces the set of users present on both days.

INSERT INTO realtime_retention_metrics
SELECT
    T.ts_date,
    T.tag,
    -- Metric 1: retained users -- appeared today AND yesterday.
    BITMAP_CARDINALITY(
        BITMAP_AND(
            T.today_bm,                           -- Real-time bitmap for today
            BITMAP_FROM_BYTES(Y.daily_bm)         -- Yesterday's historical bitmap
        )
    ) AS retained_users,

    -- Metric 2: yesterday's total visitors (retention denominator).
    BITMAP_CARDINALITY(
        BITMAP_FROM_BYTES(Y.daily_bm)
    ) AS total_users,

    CURRENT_TIMESTAMP AS update_time
FROM (
    -- Subquery T: aggregate today's minute-level slices in real time
    -- into a single cumulative bitmap for the current day.
    SELECT
        ts_date,
        tag,
        BITMAP_OR_AGG(BITMAP_FROM_BYTES(minute_bm)) AS today_bm
    FROM minute_bitmaps
    GROUP BY ts_date, tag
) T
-- Stream-batch join: match each real-time record with the previous day's bitmap.
INNER JOIN daily_bitmaps Y
    ON T.tag = Y.tag
    AND CAST(T.ts_date AS DATE) = CAST(Y.ts_date AS DATE) + INTERVAL '1' DAY;

When to use this pattern: Real-time monitoring of user response to promotions, or day-over-day active user stickiness analysis.

Key advantages:

  • Fast: Traditional solutions join two tables with hundreds of millions of rows to compute COUNT DISTINCT. BITMAP performs the same computation on a few binary objects in milliseconds.

  • Storage-efficient: Historical data such as daily_bitmaps lives in external storage — Hadoop Distributed File System (HDFS), OSS, or JDBC — keeping Flink state small.

What's next

  • BITMAP functions — full function reference including syntax, parameter types, and return values