Use Hologres and Flink to calculate accurate, real-time unique visitor (UV) counts across multiple dimensions—country, province, and city—with sub-second query latency.
Prerequisites
Before you begin, make sure you have:
A Hologres instance (V0.10 or later) connected to a developer tool. This guide uses HoloWeb. See Connect to HoloWeb and run queries
A Flink cluster. Use fully managed Flink on Alibaba Cloud or open source Flink
How it works
This solution combines Flink stream processing with Hologres's native RoaringBitmap support to deliver real-time UV deduplication.
Why RoaringBitmap instead of COUNT(DISTINCT)?
RoaringBitmap stores user IDs as compressed bitmaps and uses bitwise operations for deduplication. This approach offers better storage efficiency and query performance than COUNT(DISTINCT) at large scale.
A key constraint: RoaringBitmap only accepts 32-bit integer inputs. User IDs from business systems are typically strings or 64-bit integers, so you must map them to 32-bit integers first. This guide covers that mapping step as part of the setup.
Architecture

Flink subscribes to real-time user event data from sources such as Kafka.
Flink joins the stream with a Hologres dimension table to map user IDs, then writes aggregated RoaringBitmap results to Hologres.
Hologres stores the pre-aggregated bitmap data per time window and dimension.
BI tools such as DataService Studio and Quick BI query the aggregated data directly.
Data flow

The end-to-end pipeline has five stages:
Source ingestion — Flink reads user event data from Kafka or Redis and converts it to a DataStream source table.
UID mapping — Flink joins the source table against the
uid_mappingdimension table in Hologres. TheinsertIfNotExistsparameter auto-inserts new user IDs and assigns them auto-incremented 32-bit integers using theSERIALtype.Window aggregation — Flink groups events by dimension (country, province, city, date) using a 5-minute tumbling window with a 1-minute trigger, and aggregates user IDs into a RoaringBitmap per window.
Hologres write — Flink serializes the RoaringBitmap as a byte array and writes it to the
dws_appsink table usinginsertOrReplace.Query — Query the sink table using
RB_CARDINALITY(RB_OR_AGG(uid32_bitmap))to get deduplicated UV counts for any combination of dimensions and time ranges.
Set up Hologres tables
Create the UID mapping table
The uid_mapping table maps your original user IDs to 32-bit integers. Skip this step if your user IDs are already 32-bit integers.
Two design decisions in this table:
Row-oriented storage (
orientation = 'row'): optimized for the high-QPS point lookups that Flink dimension table joins require.SERIAL type for
uid_int32: an auto-incrementing 32-bit integer that produces dense, consecutive IDs—the format RoaringBitmap is optimized for.
Enable two Grand Unified Configuration (GUC) parameters so that Fixed Plan writes work correctly with SERIAL fields:
-- Enable Fixed Plan writes for SERIAL columns.
ALTER DATABASE <db_name> SET hg_experimental_enable_fixed_dispatcher_autofill_series = on;
ALTER DATABASE <db_name> SET hg_experimental_enable_fixed_dispatcher_for_multi_values = on;Replace <db_name> with your Hologres database name.
For more information about Fixed Plan, see Accelerate SQL execution with Fixed Plan.
Then create the table:
BEGIN;
CREATE TABLE public.uid_mapping (
uid text NOT NULL,
uid_int32 serial,
PRIMARY KEY (uid)
);
-- clustering_key and distribution_key on uid for fast point lookups.
CALL set_table_property('public.uid_mapping', 'clustering_key', 'uid');
CALL set_table_property('public.uid_mapping', 'distribution_key', 'uid');
CALL set_table_property('public.uid_mapping', 'orientation', 'row');
COMMIT;Create the aggregation sink table
The dws_app table stores pre-aggregated RoaringBitmap data per time window and dimension. Unlike a static offline table, it includes a timetz timestamp field so you can filter by Flink window epochs.
First, enable the RoaringBitmap extension. Your Hologres instance must be V0.10 or later:
CREATE EXTENSION IF NOT EXISTS roaringbitmap;Then create the table:
BEGIN;
CREATE TABLE dws_app (
country text,
prov text,
city text,
ymd text NOT NULL, -- Date field (e.g., '20210329')
timetz TIMESTAMPTZ, -- Flink window epoch timestamp
uid32_bitmap roaringbitmap, -- RoaringBitmap for UV deduplication
PRIMARY KEY (country, prov, city, ymd, timetz)
);
CALL set_table_property('public.dws_app', 'orientation', 'column');
-- ymd as clustering_key for efficient date-range filtering.
CALL set_table_property('public.dws_app', 'clustering_key', 'ymd');
CALL set_table_property('public.dws_app', 'event_time_column','ymd');
-- Dimension fields as distribution_key for parallel aggregation.
CALL set_table_property('public.dws_app', 'distribution_key', 'country,prov,city');
COMMIT;The primary key (country, prov, city, ymd, timetz) prevents duplicate inserts when Flink retries within the same window.
Read and aggregate data with Flink
The complete example is on GitHub: FlinkRoaringBitmapAggJob.java.
All steps below use the Flink DataStream and Table APIs to build the pipeline.
Step 1: Read the source data
Read from a streaming source (CSV, Kafka, Redis, or another connector) and convert it to a Flink source table. A proctime field is required for the dimension table join in Step 2.
// Read from a CSV file here; replace with Kafka or another connector for production.
DataStreamSource odsStream = env.createInput(csvInput, typeInfo);
// Convert the DataStream to a Table and add a processing-time attribute.
Table odsTable = tableEnv.fromDataStream(
odsStream,
$("uid"),
$("country"),
$("prov"),
$("city"),
$("ymd"),
$("proctime").proctime() // Required for dimension table join
);
tableEnv.createTemporaryView("odsTable", odsTable);Step 2: Join with the UID mapping dimension table
Create the Hologres dimension table with insertifnotexists='true'. This tells the connector to auto-insert any uid that doesn't exist yet, which triggers the SERIAL auto-increment and assigns a new uid_int32 value.
// Create the Hologres dimension table.
// insertifnotexists: auto-inserts new UIDs and assigns sequential 32-bit integers.
String createUidMappingTable = String.format(
"CREATE TABLE uid_mapping_dim ("
+ " uid STRING,"
+ " uid_int32 INT"
+ ") WITH ("
+ " 'connector' = 'hologres',"
+ " 'dbname' = '%s'," // Hologres database name
+ " 'tablename' = '%s'," // uid_mapping table name
+ " 'username' = '%s'," // AccessKey ID
+ " 'password' = '%s'," // AccessKey secret
+ " 'endpoint' = '%s'," // Hologres endpoint
+ " 'insertifnotexists'= 'true'" // Auto-insert + auto-increment uid_int32
+ ")",
database, dimTableName, username, password, endpoint);
tableEnv.executeSql(createUidMappingTable);
// Join the source table with the dimension table to resolve uid -> uid_int32.
String odsJoinDim =
"SELECT ods.country, ods.prov, ods.city, ods.ymd, dim.uid_int32"
+ " FROM odsTable AS ods"
+ " JOIN uid_mapping_dim FOR SYSTEM_TIME AS OF ods.proctime AS dim"
+ " ON ods.uid = dim.uid";
Table joinRes = tableEnv.sqlQuery(odsJoinDim);Step 3: Aggregate into RoaringBitmap by time window
Group events by dimension using a 5-minute tumbling window. The ContinuousProcessingTimeTrigger fires every minute so results are available before the window closes—useful for near-real-time dashboards.
DataStream<Tuple6<String, String, String, String, Timestamp, byte[]>> processedSource =
source
// Group by the four query dimensions.
.keyBy(0, 1, 2, 3)
// 5-minute tumbling window.
// For production, switch from ProcessingTime to EventTime.
.window(TumblingProcessingTimeWindows.of(Time.minutes(5)))
// Emit intermediate results every minute (before the window closes).
.trigger(ContinuousProcessingTimeTrigger.of(Time.minutes(1)))
.aggregate(
new AggregateFunction<
Tuple5<String, String, String, String, Integer>,
RoaringBitmap,
RoaringBitmap>() {
@Override
public RoaringBitmap createAccumulator() {
return new RoaringBitmap();
}
@Override
public RoaringBitmap add(
Tuple5<String, String, String, String, Integer> in,
RoaringBitmap acc) {
// Add the 32-bit uid to the bitmap; duplicates are ignored automatically.
acc.add(in.f4);
return acc;
}
@Override
public RoaringBitmap getResult(RoaringBitmap acc) {
return acc;
}
@Override
public RoaringBitmap merge(RoaringBitmap acc1, RoaringBitmap acc2) {
return RoaringBitmap.or(acc1, acc2);
}
},
new WindowFunction<
RoaringBitmap,
Tuple6<String, String, String, String, Timestamp, byte[]>,
Tuple,
TimeWindow>() {
@Override
public void apply(
Tuple keys,
TimeWindow timeWindow,
Iterable<RoaringBitmap> iterable,
Collector<Tuple6<String, String, String, String, Timestamp, byte[]>> out)
throws Exception {
RoaringBitmap result = iterable.iterator().next();
// Compress the bitmap before serialization.
result.runOptimize();
// Serialize to a byte array for Hologres storage.
byte[] byteArray = new byte[result.serializedSizeInBytes()];
result.serialize(ByteBuffer.wrap(byteArray));
// f4 (Timestamp): the window end time, truncated to seconds.
out.collect(new Tuple6<>(
keys.getField(0),
keys.getField(1),
keys.getField(2),
keys.getField(3),
new Timestamp(timeWindow.getEnd() / 1000 * 1000),
byteArray));
}
});Step 4: Write to the Hologres sink table
The roaringbitmap type in Hologres maps to BYTES in Flink. Use mutatetype = 'insertOrReplace' so that each window's bitmap is upserted rather than duplicated.
// Convert the aggregation result back to a Table.
Table resTable = tableEnv.fromDataStream(
processedSource,
$("country"),
$("prov"),
$("city"),
$("ymd"),
$("timest"),
$("uid32_bitmap"));
// Create the Hologres sink. roaringbitmap maps to BYTES in Flink.
String createHologresTable = String.format(
"CREATE TABLE sink ("
+ " country STRING,"
+ " prov STRING,"
+ " city STRING,"
+ " ymd STRING,"
+ " timetz TIMESTAMP,"
+ " uid32_bitmap BYTES" // roaringbitmap stored as a byte array
+ ") WITH ("
+ " 'connector' = 'hologres',"
+ " 'dbname' = '%s',"
+ " 'tablename' = '%s',"
+ " 'username' = '%s',"
+ " 'password' = '%s',"
+ " 'endpoint' = '%s',"
+ " 'connectionSize' = '%s',"
+ " 'mutatetype' = 'insertOrReplace'" // Upsert per primary key
+ ")",
database, dwsTableName, username, password, endpoint, connectionSize);
tableEnv.executeSql(createHologresTable);
// Write results to the dws_app table.
tableEnv.executeSql("INSERT INTO sink SELECT * FROM " + resTable);Query UV data
Query the dws_app table using RB_CARDINALITY(RB_OR_AGG(uid32_bitmap)). This merges the per-window bitmaps with a bitwise OR and returns the cardinality (distinct user count).
To improve performance for RB_OR_AGG queries, disable the three-stage aggregation switch (it is off by default):
SET hg_experimental_enable_force_three_stage_agg = off;Example 1: UV per city on a specific date
SELECT
country,
prov,
city,
RB_CARDINALITY(RB_OR_AGG(uid32_bitmap)) AS uv
FROM dws_app
WHERE ymd = '20210329'
GROUP BY
country,
prov,
city;Example 2: UV and page view (PV) per province within a time range
SELECT
country,
prov,
RB_CARDINALITY(RB_OR_AGG(uid32_bitmap)) AS uv,
SUM(pv) AS pv
FROM dws_app
WHERE
timetz > '2021-04-19 18:00:00+08'
AND timetz < '2021-04-19 19:00:00+08'
GROUP BY
country,
prov;Because bitmaps are pre-aggregated per window, the OR merge at query time operates on a small number of bitmap objects rather than raw event rows—keeping queries fast regardless of the underlying data volume.
For fine-grained time windows, add matching filter conditions or aggregation dimensions to avoid extra merge operations that can degrade performance.
Visualize UV data
BI tools that query dws_app must support user-defined aggregate functions (UDAFs) because the UV metric requires RB_CARDINALITY and RB_OR_AGG. Apache Superset and Tableau both support this.
Apache Superset
Connect Apache Superset to Hologres. See Apache Superset.
Add
dws_appas a dataset.
In the dataset, create a metric named UV with the following expression:
RB_CARDINALITY(RB_OR_AGG(uid32_bitmap))
(Optional) Create a dashboard. See Create Dashboard.
Tableau
Connect Tableau to Hologres. See Tableau. Tableau's passthrough functions let you call Hologres aggregate functions directly. See Passthrough Functions.
Create a calculated field with the following expression:
RAWSQLAGG_INT("RB_CARDINALITY(RB_OR_AGG(%1))", [Uid32 Bitmap])
(Optional) Create a dashboard. See Create a Dashboard.
What's next
Explore the RoaringBitmap function reference for the full list of bitmap operations available in Hologres.
Switch the Flink time window from ProcessingTime to EventTime for production deployments where event ordering matters.
Adjust the tumbling window size (currently 5 minutes) based on your dashboard refresh requirements.