This topic describes how to integrate Hologres with Flink to count the number of unique visitors (UVs) in real time.
Prerequisites
A Hologres instance is purchased and connected by using a development tool. In this example, HoloWeb is used. For more information about how to connect to a Hologres instance by using HoloWeb, see Connect to HoloWeb and perform queries.
A Flink cluster is created. You can use fully managed Flink of Realtime Compute for Apache Flink or Apache Flink.
Background information
Hologres is highly compatible with Flink. Hologres supports high-throughput data writes from Flink in real time and real-time queries of the written data. Hologres allows you to join a source table with a dimension table by executing Flink SQL statements. Hologres also allows you to use the change data capture (CDC) feature for data analytics. You can integrate Hologres with Flink to count the number of UVs in real time. The following figure shows the workflow.
Flink subscribes to newly collected data in real time. The data can be collected from logs, such as Kafka logs.
Flink converts the subscribed data streams into a source table. Then, Flink joins the source table with a Hologres dimension table to write the data of the source table to Hologres in real time.
Hologres processes the written data in real time.
The processed data is used by upper-layer data services, such as DataService Studio and Quick BI.
How it works
The highly integration between Flink and Hologres and roaring bitmaps that are natively supported by Hologres allow you to count the number of UVs in real time based on tag-based deduplication. The following figure shows the flowchart.
In Flink, subscribe to user data from data sources such as Kafka and Redis, and use DataStream programs to convert data streams into a source table.
Create a unique ID (UID) mapping table in Hologres to store the UIDs of historical users and their auto-increment 32-bit UIDs.
NoteIn many cases, UIDs collected in business or tracking point-related activities are of the STRING or LONG type. In these cases, you must create a UID mapping table. UIDs stored in roaring bitmaps must be 32-bit integers, and consecutive integers are preferred. The UID mapping table contains a column of the SERIAL type that consists of auto-increment 32-bit integers. This way, the UID mapping is automatically managed and remains stable.
In Flink, use the UID mapping table in Hologres as a dimension table, and use the insertIfNotExists feature of the Hologres dimension table to efficiently map UIDs based on auto-increment 32-bit integers. Join the source table with the Hologres dimension table and convert the joined results into data streams.
Create a table in Hologres to aggregate the processed results. Flink processes the joined results based on the time windows and runs roaring bitmap functions based on query dimensions.
Query the aggregation result table based on query dimensions. Perform the
OR
operation on the roaring bitmap fields and calculate the number of data entries The number of UVs is obtained.
This way, you can obtain fine-grained UV and page view (PV) data. You can adjust the minimum statistical window such as UVs in the previous 5 minutes based on your business requirements. This has similar effects of real-time monitoring and displays data better in the big screen of business intelligence (BI) tools. This solution provides better performance in finer-grained deduplication of data on a specific business date than deduplication by day, week, or month. This solution can also provide deduplicated data for a relatively long period of time by aggregating deduplication results. If the results are aggregated in a fine-grained manner but filter conditions or aggregation dimensions are not provided, the results may be aggregated once more when they are queried. This negatively affects the calculation performance.
This solution is easy to use. You can set dimensions for calculation. This solution stores data in bitmaps, which significantly reduces the storage space required. In addition, this solution returns deduplication results in real time. All of these benefits together help build a multi-dimensional data warehouse that provides abundant features and supports flexible data analytics in real time.
Procedure
Create tables in Hologres.
Create a UID mapping table.
Execute the following statements to create a UID mapping table named uid_mapping in Hologres. The UID mapping table is used to establish mappings between UIDs and their 32-bit integers. If the original UIDs are 32-bit integers, skip this step.
In many cases, UIDs collected in business or tracking point-related activities are of the STRING or LONG type. In these cases, you must create a UID mapping table. UIDs stored in roaring bitmaps must be 32-bit integers, and consecutive integers are preferred. The UID mapping table contains a column of the SERIAL type that consists of auto-increment 32-digit integers. This way, the UID mapping is automatically managed and remains stable.
Data streams about UIDs are collected in real time and converted into a row-oriented source table. This increases the number of queries per second (QPS) when you join the source table with the Hologres dimension table in Flink.
Grand Unified Configuration (GUC) parameters must be specified to use optimized execution engines to write data to the table that contains columns of the SERIAL type. For more information, see Accelerate the execution of SQL statements by using fixed plans.
-- Specify GUC parameters to allow the use of fixed plans to write data to a table that contains a column of the SERIAL data type. alter database <dbname> set hg_experimental_enable_fixed_dispatcher_autofill_series=on; alter database <dbname> set hg_experimental_enable_fixed_dispatcher_for_multi_values=on; BEGIN; CREATE TABLE public.uid_mapping ( uid text NOT NULL, uid_int32 serial, PRIMARY KEY (uid) ); -- Configure the UID column as the clustering key and distribution key to quickly find the 32-bit integers that correspond to the UIDs. CALL set_table_property('public.uid_mapping', 'clustering_key', 'uid'); CALL set_table_property('public.uid_mapping', 'distribution_key', 'uid'); CALL set_table_property('public.uid_mapping', 'orientation', 'row'); COMMIT;
Create an aggregation result table.
Create an aggregation result table named dws_app to store the aggregated results.
Before you use roaring bitmap functions, make sure that you have installed an extension for roaring bitmaps and the version of your Hologres instance is V0.10 or later.
CREATE EXTENSION IF NOT EXISTS roaringbitmap;
Compared with offline result tables, this aggregation result table adds a timestamp column to calculate data collected based on the lifecycle of Flink time windows. The following DDL statements provide an example:
BEGIN; CREATE TABLE dws_app( country text, prov text, city text, ymd text NOT NULL, -- The date column. timetz TIMESTAMPTZ, -- The timestamp column used to calculate data collected based on the lifecycle of Flink time windows. uid32_bitmap roaringbitmap, -- The roaring bitmap data used to calculate UVs. PRIMARY KEY (country, prov, city, ymd, timetz) -- Configure columns about query dimensions and time as primary key columns to prevent data from being repeatedly inserted. ); CALL set_table_property('public.dws_app', 'orientation', 'column'); -- Set the date column as the clustering key and event time column to filter data. CALL set_table_property('public.dws_app', 'clustering_key', 'ymd'); CALL set_table_property('public.dws_app', 'event_time_column', 'ymd'); -- Set columns about query dimensions as distribution key columns. CALL set_table_property('public.dws_app', 'distribution_key', 'country,prov,city'); COMMIT;
Use Flink to read data streams in real time and update the aggregation result table.
For information about the complete sample code, see alibabacloud-hologres-connectors. The following steps are performed in Flink in this example:
Read data streams and convert the data into a source table.
Use Flink to read data from a data source in streaming mode. You can select a CSV file or a Kafka or Redis data source based on your business requirements. The following sample code provides an example on how to convert the data into a table:
// In this example, the data source is a CSV file. You can also select a Kafka or Redis data source. DataStreamSource odsStream = env.createInput(csvInput, typeInfo); // Before you join the source table with a dimension table, add a column that describes the proctime property to the source table. Table odsTable = tableEnv.fromDataStream( odsStream, $("uid"), $("country"), $("prov"), $("city"), $("ymd"), $("proctime").proctime()); -- Create a catalog view. tableEnv.createTemporaryView("odsTable", odsTable);
Join the source table with a Hologres dimension table named uid_mapping.
When you create a Hologres dimension table in Flink, set the
insertIfNotExists
parameter to true. This ensures that you can manually insert data into the dimension table if no data is queried. The uid_int32 field is the column of the SERIAL type that contains auto-increment 32-bit integers. The following sample code provides an example on how to join the tables:-- Create a Hologres dimension table. The insertIfNotExists parameter specifies whether to manually insert data into the dimension table if the data cannot be queried. String createUidMappingTable = String.format( "create table uid_mapping_dim(" + " uid string," + " uid_int32 INT" + ") with (" + " 'connector'='hologres'," + " 'dbname' = '%s'," // The Hologres database in which the Hologres dimension table resides. + " 'tablename' = '%s'," // The name of the Hologres dimension table. + " 'username' = '%s'," // The AccessKey ID of your Alibaba Cloud account. + " 'password' = '%s'," // The AccessKey secret of your Alibaba Cloud account. + " 'endpoint' = '%s'," //Hologres endpoint + " 'insertifnotexists'='true'" + ")", database, dimTableName, username, password, endpoint); tableEnv.executeSql(createUidMappingTable); -- Join the source table with the Hologres dimension table. String odsJoinDim = "SELECT ods.country, ods.prov, ods.city, ods.ymd, dim.uid_int32" + " FROM odsTable AS ods JOIN uid_mapping_dim FOR SYSTEM_TIME AS OF ods.proctime AS dim" + " ON ods.uid = dim.uid"; Table joinRes = tableEnv.sqlQuery(odsJoinDim);
Convert the joined results into data streams.
Use Flink time windows to process data streams and run roaring bitmap functions to deduplicate data. The following sample code provides an example:
DataStream<Tuple6<String, String, String, String, Timestamp, byte[]>> processedSource = source -- The dimensions by which the data is queried. In this example, the dimensions are the country, prov, city, and ymd columns. .keyBy(0, 1, 2, 3) -- The Flink tumbling window. In this example, the data source is a CSV file, so data streams are assigned to the windows based on processing time. In actual scenarios, you can assign data streams based on either processing time or event time based on your business requirements. .window(TumblingProcessingTimeWindows.of(Time.minutes(5))) -- The trigger. You can obtain the aggregated results before the windows are removed. .trigger(ContinuousProcessingTimeTrigger.of(Time.minutes(1))) .aggregate( -- The aggregate function, which is used to aggregate the results based on the specified query dimensions. new AggregateFunction< Tuple5<String, String, String, String, Integer>, RoaringBitmap, RoaringBitmap>() { @Override public RoaringBitmap createAccumulator() { return new RoaringBitmap(); } @Override public RoaringBitmap add( Tuple5<String, String, String, String, Integer> in, RoaringBitmap acc) { -- Run roaring bitmap functions for the 32-digit UIDs to remove duplicate UIDs. acc.add(in.f4); return acc; } @Override public RoaringBitmap getResult(RoaringBitmap acc) { return acc; } @Override public RoaringBitmap merge( RoaringBitmap acc1, RoaringBitmap acc2) { return RoaringBitmap.or(acc1, acc2); } }, -- The window function, which is used to generate the aggregated results. new WindowFunction< RoaringBitmap, Tuple6<String, String, String, String, Timestamp, byte[]>, Tuple, TimeWindow>() { @Override public void apply( Tuple keys, TimeWindow timeWindow, Iterable<RoaringBitmap> iterable, Collector< Tuple6<String, String, String, String, Timestamp, byte[]>> out) throws Exception { RoaringBitmap result = iterable.iterator().next(); // Optimize the results of roaring bitmap functions. result.runOptimize(); // Convert the results of roaring bitmap functions into byte arrays and store them in Hologres. byte[] byteArray = new byte[result.serializedSizeInBytes()]; result.serialize(ByteBuffer.wrap(byteArray)); // The Tuple6 parameter specifies that the data streams are processed based on the lifecycle of the windows. The value of the parameter is measured in seconds. out.collect( new Tuple6<>( keys.getField(0), keys.getField(1), keys.getField(2), keys.getField(3), new Timestamp( timeWindow.getEnd() / 1000 * 1000), byteArray)); } });
Write the distinct data to the Hologres aggregation result table.
Write the distinct data to the Hologres aggregation result table named dws_app. The results of roaring bitmap functions are stored as byte arrays in Flink. The following sample code provides an example:
-- Convert the processed results into a table. Table resTable = tableEnv.fromDataStream( processedSource, $("country"), $("prov"), $("city"), $("ymd"), $("timest"), $("uid32_bitmap")); -- Create an aggregation result table in Hologres. Store the results of roaring bitmap functions into the table as byte arrays. String createHologresTable = String.format( "create table sink(" + " country string," + " prov string," + " city string," + " ymd string," + " timetz timestamp," + " uid32_bitmap BYTES" + ") with (" + " 'connector'='hologres'," + " 'dbname' = '%s'," + " 'tablename' = '%s'," + " 'username' = '%s'," + " 'password' = '%s'," + " 'endpoint' = '%s'," + " 'connectionSize' = '%s'," + " 'mutatetype' = 'insertOrReplace'" + ")", database, dwsTableName, username, password, endpoint, connectionSize); tableEnv.executeSql(createHologresTable); -- Write the results to a table named dws_app. tableEnv.executeSql("insert into sink select * from " + resTable);
Query data.
Calculate UVs based on data in the dws_app table. Perform an aggregation operation based on query dimensions and query the number of bits in a bitmap. This way, you can calculate the UVs under the conditions specified by the GROUP BY clause.
Example 1: Query the number of UVs of each city on a specific day
-- Perform the following RB_AGG operation to query data. You can disable the three-stage aggregation feature for better performance. You can enable or disable this feature based on your requirements. By default, the feature is disabled. set hg_experimental_enable_force_three_stage_agg=off; SELECT country ,prov ,city ,RB_CARDINALITY(RB_OR_AGG(uid32_bitmap)) AS uv FROM dws_app WHERE ymd = '20210329' GROUP BY country ,prov ,city ;
Example 2: Query the UVs and PVs of each province within a specific period of time
-- Perform the following RB_AGG operation to query data. You can disable the three-stage aggregation feature for better performance. You can enable or disable this feature based on your requirements. By default, the feature is disabled. set hg_experimental_enable_force_three_stage_agg=off; SELECT country ,prov ,RB_CARDINALITY(RB_OR_AGG(uid32_bitmap)) AS uv ,SUM(pv) AS pv FROM dws_app WHERE time > '2021-04-19 18:00:00+08' and time < '2021-04-19 19:00:00+08' GROUP BY country ,prov ;
Visually display data.
In most cases, you need to use Business Intelligence (BI) tools to visually display the calculated UVs and PVs. In the calculation process, RB_CARDINALITY and RB_OR_AGG functions are used to aggregate data. Therefore, BI tools must support custom aggregation functions. You can use common BI tools such as Apache Superset and Tableau.
Apache Superset
Connect Apache Superset to Hologres. For more information, see Apache Superset.
Set the dws_app table as a dataset.
Create a metric named UV in the dataset by using the expression that is shown in the following figure.
RB_CARDINALITY(RB_OR_AGG(uid32_bitmap))
Then, you can start to explore data.
Optional. Create a dashboard.
For more information about how to create a dashboard, see Creating Your First Dashboard.
Tableau
Connect Tableau to Hologres. For more information, see Tableau.
You can use pass-through functions in Tableau to customize functions. For more information, see Pass-Through Functions (RAWSQL).
Create a calculation field by using the expression that is shown in the following figure.
RAWSQLAGG_INT("RB_CARDINALITY(RB_OR_AGG(%1))", [Uid32 Bitmap])
Then, you can start to explore data.
Optional. Create a dashboard.
For more information about how to create a dashboard, see Create a Dashboard.