This topic describes how to use Hologres and Flink to perform accurate, real-time unique visitor (UV) deduplication.
Prerequisites
You have activated Hologres and connected it to a developer tool. This topic uses HoloWeb as an example. For more information, see Connect to HoloWeb and run queries.
You have set up a Flink cluster. You can use fully managed Flink from Alibaba Cloud or open source Flink.
Background information
Hologres is tightly integrated with Flink. It supports high-throughput, real-time data writes from Flink with immediate visibility. It also supports Flink SQL dimension table joins and can act as a Change Data Capture (CDC) source for event-driven development. Real-time UV deduplication primarily uses Flink and Hologres. The following diagram shows the architecture.
Flink subscribes to real-time data from sources such as Kafka logs.
Flink processes the data, converts the stream into a table, joins it with a Hologres dimension table, and writes the result to Hologres in real time.
Hologres processes the data from Flink in real time.
The query results feed into higher-level data applications, such as DataService Studio and Quick BI.
Real-time UV calculation flow
Flink and Hologres are tightly integrated. You can use the native RoaringBitmap support in Hologres to calculate real-time UVs and deduplicate user tags. The following diagram shows the process.
Flink subscribes to real-time user data from sources such as Kafka or Redis and converts it into a source table using DataStream.
Create a user mapping table in Hologres. This table stores historical user IDs (UIDs) and their corresponding 32-bit auto-incrementing UIDs.
NoteUser IDs from business systems or instrumentation are often strings or long integers. RoaringBitmap requires user IDs to be 32-bit integers that are as dense as possible, which means the IDs should be consecutive. The mapping table uses the `SERIAL` type in Hologres, which is an auto-incrementing 32-bit integer, to manage user mapping automatically and consistently.
In Flink, use the Hologres user mapping table as a Flink dimension table. You can use the `insertIfNotExists` attribute with an auto-increment field to efficiently map UIDs. Then, join the dimension table with the data source table and convert the results into a DataStream.
Flink processes the result data from the dimension table join by time window, applies the RoaringBitmap function based on the query dimension, and stores the results in an aggregation sink table in Hologres.
Querying is similar to the offline method. You can retrieve the number of users by directly querying the aggregation sink table based on your query conditions, performing an
oroperation on the key RoaringBitmap field, and then calculating the cardinality.
This method provides fine-grained, real-time user UV and page view (PV) data. You can easily adjust the minimum time window, such as the UVs in the last five minutes, to create a real-time monitoring dashboard for business intelligence (BI) displays. This approach is better for fine-grained statistics during events than daily, weekly, or monthly deduplication. You can also use simple aggregation to get statistics over larger time units. If the aggregation granularity is too fine and queries lack corresponding filter conditions or aggregation dimensions, it can lead to extra aggregation operations that degrade performance.
This solution provides a simple data link and flexible calculations across any dimension. It requires only a single Bitmap for storage, which avoids storage bloat and ensures real-time updates. This creates a more responsive, flexible, and feature-rich data warehouse for multidimensional analysis.
Procedure
Create the required base tables in Hologres
Create a user mapping table
In Hologres, create a table named `uid_mapping` to serve as the user mapping table. This table maps UIDs to a 32-bit integer type. If your original UIDs are already 32-bit integers, you can skip this step.
User IDs from business systems or instrumentation are often strings or long integers. RoaringBitmap requires user IDs to be 32-bit integers that are as dense as possible, which means the IDs should be consecutive. The mapping table uses the `SERIAL` type in Hologres, which is an auto-incrementing 32-bit integer, for automatic and consistent user mapping.
Because this is real-time data, the table is created as a row-oriented table in Hologres to improve the queries per second (QPS) of real-time joins with the Flink dimension table.
Enable the corresponding Grand Unified Configuration (GUC) parameter to use the optimized execution engine for writes to tables that contain a serial field. For more information, see Accelerate SQL execution with Fixed Plan.
-- Enable GUC to support Fixed Plan writes for columns of the Serial type. alter database <db_name> set hg_experimental_enable_fixed_dispatcher_autofill_series=on; alter database <db_name> set hg_experimental_enable_fixed_dispatcher_for_multi_values=on; BEGIN; CREATE TABLE public.uid_mapping ( uid text NOT NULL, uid_int32 serial, PRIMARY KEY (uid) ); -- Set uid as the clustering_key and distribution_key to quickly find its corresponding int32 value. CALL set_table_property('public.uid_mapping', 'clustering_key', 'uid'); CALL set_table_property('public.uid_mapping', 'distribution_key', 'uid'); CALL set_table_property('public.uid_mapping', 'orientation', 'row'); COMMIT;Create a sink table for aggregation
Create a table named `dws_app` to store the aggregated results.
Before you use the RoaringBitmap function, you must create the RoaringBitmap extension. Your Hologres instance must be V0.10 or later.
CREATE EXTENSION IF NOT EXISTS roaringbitmap;Unlike an offline sink table, this table includes a timestamp field for statistics based on Flink window epochs. The following is the Data Definition Language (DDL) for the sink table.
BEGIN; CREATE TABLE dws_app( country text, prov text, city text, ymd text NOT NULL, -- Date field timetz TIMESTAMPTZ, -- Statistical timestamp, which allows for statistics based on Flink window epochs uid32_bitmap roaringbitmap, -- Use roaringbitmap to record UVs PRIMARY KEY (country, prov, city, ymd, timetz)-- Use query dimensions and time as the primary key to prevent duplicate data insertion. ); CALL set_table_property('public.dws_app', 'orientation', 'column'); -- Set the date field as the clustering_key and event_time_column for easier filtering. CALL set_table_property('public.dws_app', 'clustering_key', 'ymd'); CALL set_table_property('public.dws_app', 'event_time_column', 'ymd'); -- Set the group by fields as the distribution_key. CALL set_table_property('public.dws_app', 'distribution_key', 'country,prov,city'); COMMIT;
Read data in real time with Flink and update the sink table
For the complete example source code in Flink, see alibabacloud-hologres-connectors examples. The following steps describe the operations in Flink.
Read a data source as a DataStream in Flink and convert it to a source table
In Flink, read data from a streaming source. The data source can be a CSV file, Kafka, Redis, or another source, depending on your scenario. The following code example shows how to convert the stream to a source table in Flink.
// A CSV file is used as the data source here. You can also use Kafka, Redis, or other sources. DataStreamSource odsStream = env.createInput(csvInput, typeInfo); // A proctime field must be added to join with the dimension table. Table odsTable = tableEnv.fromDataStream( odsStream, $("uid"), $("country"), $("prov"), $("city"), $("ymd"), $("proctime").proctime()); // Register the table in the catalog environment. tableEnv.createTemporaryView("odsTable", odsTable);Join the source table with the Hologres dimension table (uid_mapping)
When you create a Hologres dimension table in Flink, you must use the
insertIfNotExistsparameter to insert data if it does not exist. This allows the `uid_int32` field to auto-increment using the `Serial` type in Hologres. The following sample code shows how to join the tables:-- Create the Hologres dimension table. insertIfNotExists means that if a record is not found, it is automatically inserted. String createUidMappingTable = String.format( "create table uid_mapping_dim(" + " uid string," + " uid_int32 INT" + ") with (" + " 'connector'='hologres'," + " 'dbname' = '%s'," // Hologres DB name + " 'tablename' = '%s',"// Hologres table name + " 'username' = '%s'," // AccessKey ID of the current account + " 'password' = '%s'," // AccessKey secret of the current account + " 'endpoint' = '%s'," // Hologres endpoint + " 'insertifnotexists'='true'" + ")", database, dimTableName, username, password, endpoint); tableEnv.executeSql(createUidMappingTable); -- Join the source table and the dimension table. String odsJoinDim = "SELECT ods.country, ods.prov, ods.city, ods.ymd, dim.uid_int32" + " FROM odsTable AS ods JOIN uid_mapping_dim FOR SYSTEM_TIME AS OF ods.proctime AS dim" + " ON ods.uid = dim.uid"; Table joinRes = tableEnv.sqlQuery(odsJoinDim);Convert the join result to a DataStream
Process the data using a Flink time window and use RoaringBitmap to deduplicate the metrics. The following code provides an example.
DataStream<Tuple6<String, String, String, String, Timestamp, byte[]>> processedSource = source -- Filter by the dimensions for statistics (country, prov, city, ymd). .keyBy(0, 1, 2, 3) -- Tumbling time window. Because a CSV file is used to simulate the input stream, ProcessingTime is used here. In a real-world scenario, you can use EventTime. .window(TumblingProcessingTimeWindows.of(Time.minutes(5))) -- Trigger, which can get aggregation results before the window closes. .trigger(ContinuousProcessingTimeTrigger.of(Time.minutes(1))) .aggregate( -- Aggregate function, which performs aggregation based on the dimensions filtered by keyBy. new AggregateFunction< Tuple5<String, String, String, String, Integer>, RoaringBitmap, RoaringBitmap>() { @Override public RoaringBitmap createAccumulator() { return new RoaringBitmap(); } @Override public RoaringBitmap add( Tuple5<String, String, String, String, Integer> in, RoaringBitmap acc) { -- Add the 32-bit UID to the RoaringBitmap for deduplication. acc.add(in.f4); return acc; } @Override public RoaringBitmap getResult(RoaringBitmap acc) { return acc; } @Override public RoaringBitmap merge( RoaringBitmap acc1, RoaringBitmap acc2) { return RoaringBitmap.or(acc1, acc2); } }, -- Window function, which outputs the aggregation result. new WindowFunction< RoaringBitmap, Tuple6<String, String, String, String, Timestamp, byte[]>, Tuple, TimeWindow>() { @Override public void apply( Tuple keys, TimeWindow timeWindow, Iterable<RoaringBitmap> iterable, Collector< Tuple6<String, String, String, String, Timestamp, byte[]>> out) throws Exception { RoaringBitmap result = iterable.iterator().next(); // Optimize the RoaringBitmap. result.runOptimize(); // Convert the RoaringBitmap to a byte array to be stored in Hologres. byte[] byteArray = new byte[result.serializedSizeInBytes()]; result.serialize(ByteBuffer.wrap(byteArray)); // The Tuple6.f4(Timestamp) field indicates that statistics are performed based on the window length epoch, in seconds. out.collect( new Tuple6<>( keys.getField(0), keys.getField(1), keys.getField(2), keys.getField(3), new Timestamp( timeWindow.getEnd() / 1000 * 1000), byteArray)); } });Write data to the Hologres sink table
Write the deduplicated data from Flink to the dws_app sink table in Hologres. Note that the `RoaringBitmap` type in Hologres corresponds to a byte array type in Flink. The following is the Flink code.
-- Convert the calculation result to a table. Table resTable = tableEnv.fromDataStream( processedSource, $("country"), $("prov"), $("city"), $("ymd"), $("timest"), $("uid32_bitmap")); -- Create the Hologres sink table. The RoaringBitmap type in Hologres is stored as a byte array. String createHologresTable = String.format( "create table sink(" + " country string," + " prov string," + " city string," + " ymd string," + " timetz timestamp," + " uid32_bitmap BYTES" + ") with (" + " 'connector'='hologres'," + " 'dbname' = '%s'," + " 'tablename' = '%s'," + " 'username' = '%s'," + " 'password' = '%s'," + " 'endpoint' = '%s'," + " 'connectionSize' = '%s'," + " 'mutatetype' = 'insertOrReplace'" + ")", database, dwsTableName, username, password, endpoint, connectionSize); tableEnv.executeSql(createHologresTable); -- Write the calculation result to the dws_app table. tableEnv.executeSql("insert into sink select * from " + resTable);
Query data
In Hologres, calculate the UVs from the `dws_app` sink table. Perform an aggregation based on your query dimensions and query the bitmap cardinality to retrieve the number of users for the `GROUP BY` condition.
Example 1: Query the UVs of each city on a specific day
-- To run the following RB_AGG query, you can first disable the three-stage aggregation switch (disabled by default) for better performance. This step is optional. set hg_experimental_enable_force_three_stage_agg=off; SELECT country ,prov ,city ,RB_CARDINALITY(RB_OR_AGG(uid32_bitmap)) AS uv FROM dws_app WHERE ymd = '20210329' GROUP BY country ,prov ,city ;Example 2: Query the UVs and PVs of each province within a specific time range
-- To run the following RB_AGG query, you can first disable the three-stage aggregation switch (disabled by default) for better performance. This step is optional. set hg_experimental_enable_force_three_stage_agg=off; SELECT country ,prov ,RB_CARDINALITY(RB_OR_AGG(uid32_bitmap)) AS uv ,SUM(pv) AS pv FROM dws_app WHERE time > '2021-04-19 18:00:00+08' and time < '2021-04-19 19:00:00+08' GROUP BY country ,prov ;
Visualize the data
After you calculate the UVs and PVs, you can use a BI tool for visualization. Because the aggregation requires RB_CARDINALITY and RB_OR_AGG, the BI tool must support user-defined aggregate functions. BI tools that support this feature include Apache Superset and Tableau.
Apache Superset
Connect Apache Superset to Hologres. For more information, see Apache Superset.
Set the dws_app table as a dataset.

In the dataset, create a metric named UV with the following expression.

RB_CARDINALITY(RB_OR_AGG(uid32_bitmap))You can now start exploring your data.
(Optional) Create a dashboard.
To create a dashboard, see Create Dashboard.
Tableau
Connect Tableau to Hologres. For more information, see Tableau.
You can use Tableau's passthrough functions to directly implement user-defined functions. For more information, see Passthrough Functions.
Create a calculated field with the following expression.

RAWSQLAGG_INT("RB_CARDINALITY(RB_OR_AGG(%1))", [Uid32 Bitmap])You can now start exploring your data.
(Optional) Create a dashboard.
To create a dashboard, see Create a Dashboard.