即時UV計算主要依賴Hologres與Flink結合完成,本文將為您介紹Hologres如何進行即時UV精確去重。
前提條件
開通Hologres並串連開發工具,樣本使用HoloWeb,詳情請參見串連HoloWeb並執行查詢。
準備並搭建好Flink叢集環境,您可以使用阿里雲Flink全託管或者開源Flink。
背景資訊
Hologres與Flink有著強大的融合最佳化,支援Flink資料高通量即時寫入,寫入即可見,支援Flink SQL維表關聯,以及作為CDC Source事件驅動開發。因此即時UV去重主要通過Flink和Hologres來實現,情境架構圖如下所示。
Flink即時訂閱即時採集的資料,資料來源可以來源於日誌資料,如Kafka等。
Flink對資料做進一步加工處理,將流式資料轉化為表與Hologres維表進行JOIN操作,即時寫入Hologres。
Hologres對Flink即時寫入的資料即時處理。
最終查詢的資料對接上層資料應用,如資料服務、Quick BI等。
即時UV計算方案流程
Flink與Hologres有著非常強的融合性,再結合Hologres天然支援的RoaringBitmap,完成即時UV計算,即時對使用者標籤去重,詳細方案流程如下圖所示。
Flink即時訂閱使用者資料,這些資料可以來源於Kafka、Redis等,並通過DataStream轉化為資料來源表。
在Hologres中建立使用者映射表,存放歷史使用者的uid以及對應的32位自增uid。
說明常見的業務系統或者埋點中的使用者ID很多是字串類型或Long類型,因此需要使用uid_mapping類型構建一張映射表。RoaringBitmap類型要求使用者ID必須是32位int類型且越稠密越好(即使用者ID最好連續)。映射表利用Hologres的SERIAL類型(自增的32位int)來實現使用者映射的自動管理和穩定映射。
在Flink中,將Hologres中的使用者映射表作為Flink維表,利用Hologres維表的insertIfNotExists特性結合自增欄位實現高效的uid映射。維表與資料來源表進行Join關聯,並將Join得到的結果轉化為流式資料DataStream。
Hologres中建立彙總結果表,Flink把維表關聯的結果資料按照時間視窗進行處理,根據查詢維度使用RoaringBitmap函數。
查詢時,與離線方式相似,直接按照查詢條件查詢彙總結果表,並對其中關鍵的RoaringBitmap欄位做
or運算後並統計基數,即可得出對應使用者數。
這樣的方式,可以較細粒度的即時得到使用者UV、PV資料,同時便於根據需求調整最小統計視窗(如最近5分鐘的UV),實作類別似即時監控的效果,更好的在大屏等BI展示。相較於以天、周、月等為單位的去重,更適合在活動日期進行更細粒度的統計,並且通過簡單的彙總,也可以得到較大時間單位的統計結果。如果加工彙總的粒度較細,但查詢時缺少相應的過濾條件或彙總維度,則也會在查詢時引起較多的二次彙總操作,對效能有不利影響。
該方案資料鏈路簡單,可以任意維度靈活計算,只需要一份Bitmap儲存,也沒有儲存爆炸問題,還能保證即時更新,從而實現更即時、開發更靈活、功能更完善的多維分析數倉。
操作步驟
在Hologres中建立相關基礎資料表
建立使用者映射表
在Hologres建立表uid_mapping為使用者映射表,命令語句如下所示。用於映射uid到32位int類型。如果原始uid已經是int32類型,此步驟可忽略。
常見的業務系統或者埋點中的使用者ID很多是字串類型或Long類型,因此需要使用uid_mapping類型構建一張映射表。RoaringBitmap類型要求使用者ID必須是32位int類型且越稠密越好(即使用者ID最好連續)。映射表利用Hologres的SERIAL類型(自增的32位int)來實現使用者映射的自動管理和穩定映射。
由於是即時資料,在Hologres中該表為行存表,以提高Flink維表即時JOIN的QPS。
需要開啟相應GUC使用最佳化的執行引擎對包含serial欄位的表進行寫入,詳情請參見Fixed Plan加速SQL執行。
--開啟GUC,支援含有Serial類型列的Fixed Plan寫入 alter database <dbname> set hg_experimental_enable_fixed_dispatcher_autofill_series=on; alter database <dbname> set hg_experimental_enable_fixed_dispatcher_for_multi_values=on; BEGIN; CREATE TABLE public.uid_mapping ( uid text NOT NULL, uid_int32 serial, PRIMARY KEY (uid) ); --將uid設為clustering_key和distribution_key便於快速尋找其對應的int32值 CALL set_table_property('public.uid_mapping', 'clustering_key', 'uid'); CALL set_table_property('public.uid_mapping', 'distribution_key', 'uid'); CALL set_table_property('public.uid_mapping', 'orientation', 'row'); COMMIT;建立彙總結果表
建立表dws_app為彙總結果表,用於存放在基礎維度上彙總後的結果。
使用RoaringBitmap函數前需要建立RoaringBitmap extension,同時也需要Hologres執行個體為 V0.10及以上版本。
CREATE EXTENSION IF NOT EXISTS roaringbitmap;相比離線結果表,此結果表增加了時間戳記欄位,用於實現以Flink視窗周期為單位的統計,結果表DDL如下。
BEGIN; CREATE TABLE dws_app( country text, prov text, city text, ymd text NOT NULL, --日期欄位 timetz TIMESTAMPTZ, --統計時間戳記,可以實現以Flink視窗周期為單位的統計 uid32_bitmap roaringbitmap, -- 使用roaringbitmap記錄uv PRIMARY KEY (country, prov, city, ymd, timetz)--查詢維度和時間作為主鍵,防止重複插入資料 ); CALL set_table_property('public.dws_app', 'orientation', 'column'); --日期欄位設為clustering_key和event_time_column,便於過濾 CALL set_table_property('public.dws_app', 'clustering_key', 'ymd'); CALL set_table_property('public.dws_app', 'event_time_column', 'ymd'); --group by欄位設為distribution_key CALL set_table_property('public.dws_app', 'distribution_key', 'country,prov,city'); COMMIT;
Flink即時讀取資料並更新彙總結果表
在Flink中的完整樣本源碼請參見alibabacloud-hologres-connectors examples,下面是在Flink中的具體操作步驟。
Flink流式讀取資料來源(DataStream)並轉化為源表(Table)
在Flink中使用流式讀取資料來源,資料來源可以是CSV檔案,也可以來源於Kafka、Redis等,可以根據業務情境準備。在Flink中轉化為源表的程式碼範例如下。
// 此處使用csv檔案作為資料來源,也可以是kafka/redis等 DataStreamSource odsStream = env.createInput(csvInput, typeInfo); // 與維表join需要添加proctime欄位 Table odsTable = tableEnv.fromDataStream( odsStream, $("uid"), $("country"), $("prov"), $("city"), $("ymd"), $("proctime").proctime()); // 註冊到catalog環境 tableEnv.createTemporaryView("odsTable", odsTable);將源表與Hologres維表(uid_mapping)進行關聯
在Flink中建立Hologres維表,需要使用
insertIfNotExists參數,即查詢不到資料時自行插入,uid_int32欄位便可以利用Hologres的Serial類型自增建立。將Flink源表與Hologres維表進行關聯(JOIN),程式碼範例如下。-- 建立Hologres維表,其中insertIfNotExists表示查詢不到則自行插入 String createUidMappingTable = String.format( "create table uid_mapping_dim(" + " uid string," + " uid_int32 INT" + ") with (" + " 'connector'='hologres'," + " 'dbname' = '%s'," //Hologres DB名 + " 'tablename' = '%s',"//Hologres 表名 + " 'username' = '%s'," //當前帳號access id + " 'password' = '%s'," //當前帳號access key + " 'endpoint' = '%s'," //Hologres endpoint + " 'insertifnotexists'='true'" + ")", database, dimTableName, username, password, endpoint); tableEnv.executeSql(createUidMappingTable); -- 源表與維表join String odsJoinDim = "SELECT ods.country, ods.prov, ods.city, ods.ymd, dim.uid_int32" + " FROM odsTable AS ods JOIN uid_mapping_dim FOR SYSTEM_TIME AS OF ods.proctime AS dim" + " ON ods.uid = dim.uid"; Table joinRes = tableEnv.sqlQuery(odsJoinDim);將關連接果轉化為DataStream
通過Flink時間視窗處理,結合RoaringBitmap進行對指標進行去重處理,程式碼範例如下所示。
DataStream<Tuple6<String, String, String, String, Timestamp, byte[]>> processedSource = source -- 篩選需要統計的維度(country, prov, city, ymd) .keyBy(0, 1, 2, 3) -- 滾動時間視窗;此處由於使用讀取csv類比輸入資料流,採用ProcessingTime,實際使用中可使用EventTime .window(TumblingProcessingTimeWindows.of(Time.minutes(5))) -- 觸發器,可以在視窗未結束時擷取彙總結果 .trigger(ContinuousProcessingTimeTrigger.of(Time.minutes(1))) .aggregate( -- 彙總函式,根據key By篩選的維度,進行彙總 new AggregateFunction< Tuple5<String, String, String, String, Integer>, RoaringBitmap, RoaringBitmap>() { @Override public RoaringBitmap createAccumulator() { return new RoaringBitmap(); } @Override public RoaringBitmap add( Tuple5<String, String, String, String, Integer> in, RoaringBitmap acc) { -- 將32位的uid添加到RoaringBitmap進行去重 acc.add(in.f4); return acc; } @Override public RoaringBitmap getResult(RoaringBitmap acc) { return acc; } @Override public RoaringBitmap merge( RoaringBitmap acc1, RoaringBitmap acc2) { return RoaringBitmap.or(acc1, acc2); } }, -- 視窗函數,輸出彙總結果 new WindowFunction< RoaringBitmap, Tuple6<String, String, String, String, Timestamp, byte[]>, Tuple, TimeWindow>() { @Override public void apply( Tuple keys, TimeWindow timeWindow, Iterable<RoaringBitmap> iterable, Collector< Tuple6<String, String, String, String, Timestamp, byte[]>> out) throws Exception { RoaringBitmap result = iterable.iterator().next(); // 最佳化RoaringBitmap result.runOptimize(); // 將RoaringBitmap轉化為位元組數組以存入Holo中 byte[] byteArray = new byte[result.serializedSizeInBytes()]; result.serialize(ByteBuffer.wrap(byteArray)); // 其中 Tuple6.f4(Timestamp) 欄位表示以視窗長度為周期進行統計,以秒為單位 out.collect( new Tuple6<>( keys.getField(0), keys.getField(1), keys.getField(2), keys.getField(3), new Timestamp( timeWindow.getEnd() / 1000 * 1000), byteArray)); } });寫入Hologres彙總結果表
經過Flink去重處理的資料寫入至Hologres結果表dws_app,但需要注意的是Hologres中RoaringBitmap類型在Flink中對應Byte數群組類型,Flink中代碼如下。
-- 計算結果轉換為表 Table resTable = tableEnv.fromDataStream( processedSource, $("country"), $("prov"), $("city"), $("ymd"), $("timest"), $("uid32_bitmap")); -- 建立Hologres結果表, 其中Hologres的RoaringBitmap類型通過Byte數組存入 String createHologresTable = String.format( "create table sink(" + " country string," + " prov string," + " city string," + " ymd string," + " timetz timestamp," + " uid32_bitmap BYTES" + ") with (" + " 'connector'='hologres'," + " 'dbname' = '%s'," + " 'tablename' = '%s'," + " 'username' = '%s'," + " 'password' = '%s'," + " 'endpoint' = '%s'," + " 'connectionSize' = '%s'," + " 'mutatetype' = 'insertOrReplace'" + ")", database, dwsTableName, username, password, endpoint, connectionSize); tableEnv.executeSql(createHologresTable); -- 寫入計算結果到dws_app表 tableEnv.executeSql("insert into sink select * from " + resTable);
資料查詢
在Hologres中對彙總結果表(dws_app)進行UV計算。按照查詢維度做彙總計算,查詢Bitmap基數,得出Group By條件下的使用者數。
樣本一:查詢某天內各個城市的uv
-- 運行下面RB_AGG運算查詢,可執行參數先關閉三階段彙總開關(預設關閉),效能更好,此步驟可選 set hg_experimental_enable_force_three_stage_agg=off; SELECT country ,prov ,city ,RB_CARDINALITY(RB_OR_AGG(uid32_bitmap)) AS uv FROM dws_app WHERE ymd = '20210329' GROUP BY country ,prov ,city ;樣本二:查詢某段時間內各個省份的UV、PV
-- 運行下面RB_AGG運算查詢,可執行參數先關閉三階段彙總開關(預設關閉),效能更好,此步驟可選 set hg_experimental_enable_force_three_stage_agg=off; SELECT country ,prov ,RB_CARDINALITY(RB_OR_AGG(uid32_bitmap)) AS uv ,SUM(pv) AS pv FROM dws_app WHERE time > '2021-04-19 18:00:00+08' and time < '2021-04-19 19:00:00+08' GROUP BY country ,prov ;
可視化展示
計算出UV、PV後,大多數情況需要使用BI工具以更直觀的方式可視化展示,由於需要使用RB_CARDINALITY和RB_OR_AGG進行彙總計算,需要使用BI的自訂彙總函式的能力,常見的具備該能力的BI包括Apache Superset和Tableau。
Apache Superset
Apache Superset串連Hologres,詳情請參見Apache Superset。
設定dws_app表作為資料集。

在資料集中建立一個名稱為UV的單獨Metrics,運算式如下。

RB_CARDINALITY(RB_OR_AGG(uid32_bitmap))完成後您就可以開始探索資料了。
(可選)建立Dashboard。
建立儀表板請參見Create Dashboard。
Tableau
Tableau串連Hologres,詳情請參見Tableau。
可以使用Tableau的直通函數直接實現自訂函數的能力,詳細介紹請參見直通函數。
建立一個計算欄位,運算式如下。

RAWSQLAGG_INT("RB_CARDINALITY(RB_OR_AGG(%1))", [Uid32 Bitmap])完成後您就可以開始探索資料了。
(可選)建立Dashboard。
建立儀表板請參見Create a Dashboard。