使用Flink+Hologres搭建即時數倉可以充分利用Flink強大的即時處理能力和Hologres提供的Binlog、行列共存和資源強隔離等能力,實現高效、可擴充的即時資料處理和分析,協助您更好地應對不斷增長的資料量和即時業務需求。本文介紹如何通過Realtime ComputeFlink版和即時數倉Hologres搭建即時數倉。
背景資訊
隨著社會數字化發展,企業對資料時效性的需求越來越強烈。除傳統的面向海量資料加工情境設計的離線情境外,大量業務需要解決面向即時加工、即時儲存、即時分析的即時情境問題。傳統離線數倉搭建的方法論比較明確,通過定時調度實現數倉分層(ODS->DWD->DWS->ADS);但對於即時數倉的搭建,目前缺乏明確的方法體系。基於Streaming Warehouse理念,實現數倉分層之間即時資料的高效流動,可以解決即時數倉分層問題。
實踐情境
本文以某個電商平台為例,通過將Flink與Hologres深度整合搭建一套即時數倉,實現資料的即時加工清洗和對接上層應用資料查詢,形成即時資料的分層和複用,支撐各個業務方的報表查詢(交易大屏、行為資料分析、使用者畫像標籤)以及個人化推薦等多個業務情境。
方案架構
構建ODS層:業務資料庫即時入倉
MySQL有orders(訂單表),orders_pay(訂單支付表),product_catalog(商品類別字典表)3張業務表,這3張表通過Flink即時同步到Hologres中作為ODS層。
構建DWD層:即時主題寬表
將訂單表、商品類別字典表、訂單支付表進行即時打寬,產生DWD層寬表。
構建DWS層:即時指標計算
即時消費寬表的Binlog,事件驅動地彙總出相應的DWS層的使用者維度和商戶維度指標表。
通過Hologres提供應用查詢。
對DWS層的彙總指標表進行查詢,支援百萬級RPS。
對DWD層寬表進行OLAP分析或基於寬表資料展示即時報表,支援秒級響應。
方案優勢及核心能力
該方案有如下優勢:
高效更新與即時查詢:Hologres支援每一層資料的高效更新、修正和寫入即可查,解決了傳統即時數倉中介層資料難以查詢、更新和修正的問題。
資料分層複用:Hologres的所有層級資料都可單獨對外提供服務,實現高效複用,真正達成數倉分層複用的目標。
架構簡化與效率提升:基於Flink SQL構建即時ETL鏈路,ODS層、DWD層和DWS層的資料統一儲存在Hologres,從而降低架構複雜度並提升資料處理效率。
該方案依賴於Hologres的3個核心能力,詳情如下表所示。
Hologres核心能力 | 詳情 |
Hologres提供Binlog能力,用於驅動Flink進行Realtime Compute,以此作為流式計算的上遊。 | |
Hologres支援行列共存的儲存格式。一張表同時儲存行存資料和列存資料,並且兩份資料強一致。該特性保證中介層表既能作為Flink的源表,也能作為Flink的維表進行主鍵點查與維表Join,還能供其他應用(OLAP、線上服務等)查詢。 | |
資源強隔離 | Hologres執行個體的負載較高時,可能影響中介層的點查效能。Hologres支援通過主從執行個體讀寫分離部署(共用儲存)或計算群組執行個體架構實現資源強隔離,從而保證Flink對Hologres Binlog的資料拉取不影響線上服務。 |
注意事項
僅獨享Hologre執行個體支援該即時數倉方案。
Realtime ComputeFlink版、RDS MySQL和Hologres需要在同一VPC。如果不在同一VPC,需要先打通跨VPC的網路或者使用公網的形式訪問,詳情請參見如何訪問跨VPC的其他服務?和如何訪問公網?。
通過RAM使用者或RAM角色等身份訪問Realtime ComputeFlink、Hologres和RDS MySQL資源時,需要其具備對應資源的許可權。
步驟一:準備工作
建立RDS MySQL執行個體並準備資料來源
建立RDS MySQL執行個體,詳情請參見建立RDS MySQL執行個體。
RDS MySQL執行個體需要與Flink工作空間和Hologres執行個體在同一VPC。
建立資料庫和帳號。
為目標執行個體建立名稱為order_dw的資料庫和具有對應資料庫讀寫權限的普通帳號。具體操作請參見建立資料庫和建立帳號。
準備MySQL CDC資料來源。
在目標執行個體詳情頁面,單擊上方的登入資料庫。
在彈出的登入執行個體頁面中,填寫建立的資料庫帳號名和密碼,然後單擊登入。
登入成功後,在資料庫執行個體頁面雙擊order_dw資料庫,切換資料庫。
在SQL Console地區編寫三張業務表的建表DDL以及插入的資料語句。
CREATE TABLE `orders` ( order_id bigint not null primary key, user_id varchar(50) not null, shop_id bigint not null, product_id bigint not null, buy_fee numeric(20,2) not null, create_time timestamp not null, update_time timestamp not null default now(), state int not null ); CREATE TABLE `orders_pay` ( pay_id bigint not null primary key, order_id bigint not null, pay_platform int not null, create_time timestamp not null ); CREATE TABLE `product_catalog` ( product_id bigint not null primary key, catalog_name varchar(50) not null ); -- 準備資料 INSERT INTO product_catalog VALUES(1, 'phone_aaa'),(2, 'phone_bbb'),(3, 'phone_ccc'),(4, 'phone_ddd'),(5, 'phone_eee'); INSERT INTO orders VALUES (100001, 'user_001', 12345, 1, 5000.05, '2023-02-15 16:40:56', '2023-02-15 18:42:56', 1), (100002, 'user_002', 12346, 2, 4000.04, '2023-02-15 15:40:56', '2023-02-15 18:42:56', 1), (100003, 'user_003', 12347, 3, 3000.03, '2023-02-15 14:40:56', '2023-02-15 18:42:56', 1), (100004, 'user_001', 12347, 4, 2000.02, '2023-02-15 13:40:56', '2023-02-15 18:42:56', 1), (100005, 'user_002', 12348, 5, 1000.01, '2023-02-15 12:40:56', '2023-02-15 18:42:56', 1), (100006, 'user_001', 12348, 1, 1000.01, '2023-02-15 11:40:56', '2023-02-15 18:42:56', 1), (100007, 'user_003', 12347, 4, 2000.02, '2023-02-15 10:40:56', '2023-02-15 18:42:56', 1); INSERT INTO orders_pay VALUES (2001, 100001, 1, '2023-02-15 17:40:56'), (2002, 100002, 1, '2023-02-15 17:40:56'), (2003, 100003, 0, '2023-02-15 17:40:56'), (2004, 100004, 0, '2023-02-15 17:40:56'), (2005, 100005, 0, '2023-02-15 18:40:56'), (2006, 100006, 0, '2023-02-15 18:40:56'), (2007, 100007, 0, '2023-02-15 18:40:56');
單擊執行,單擊直接執行。
建立Hologres執行個體和計算群組
建立獨享Hologres執行個體,詳情請參見購買Hologres。
Hologres執行個體需要與RDS MySQL執行個體在同一VPC。為了體驗Hologres通過讀寫分離實現資源強隔離的核心能力,本文執行個體類型選擇計算群組型,計算群組預留計算資源設定為64,從而支援新增計算群組。
登入執行個體後,建立資料庫並授權。
登入步驟建立名為order_dw的資料庫(需要開啟簡單許可權模型),並授予使用者admin許可權。資料庫建立和授權操作,請參見DB管理。
說明如果在被授權帳號的下拉式清單找不到對應的帳號,則說明該帳號並未添加至當前執行個體,您需要前往使用者管理頁面添加使用者為SuperUser。
Hologres2.0之後版本預設開啟Binlog擴充,無需手動執行。
新增計算群組。
您可以通過不同的計算群組實現資源隔離,使用初始計算群組init_warehouse用於寫入資料,使用read_warehouse_1計算群組用於服務查詢。
預留計算資源會全部分配給初始計算群組init_warehouse,需先減少計算群組資源,再新增計算群組。詳情請參見建立新計算群組執行個體。
單擊,確認執行個體名為目標執行個體名稱。
單擊已有資源群組init_warehouse操作列下的調整配置,調小資源後單擊確認。
單擊新增計算群組,新增名稱為read_warehouse_1的計算群組,單擊確認。
建立Flink工作空間和Catalog
建立Flink工作空間,詳情請參見開通Realtime ComputeFlink版。
Flink工作空間需要與RDS MySQL執行個體、Hologres執行個體在同一VPC。
登入Realtime Compute控制台,單擊目標工作空間操作列下的控制台。
建立Session叢集,為後續建立Catalog和查詢指令碼提供執行環境,詳情請參見步驟一:建立Session叢集。
建立Hologres Catalog。
在頁面的查詢指令碼頁簽,將下面代碼拷貝到查詢指令碼,並修改目標參數取值,選中目標片段後單擊右側程式碼上的運行。在頁面右下角的執行環境使用的是建立的Session叢集。
CREATE CATALOG dw WITH ( 'type' = 'hologres', 'endpoint' = '<ENDPOINT>', 'username' = 'BASIC$flinktest', 'password' = '${secret_values.holosecrect}', 'dbname' = 'order_dw@init_warehouse', --資料庫名稱,並指定串連init_warehouse計算群組。 'binlog' = 'true', -- 建立catalog時可以設定源表、維表和結果表支援的with參數,之後在使用此catalog下的表時會預設添加這些預設參數。 'sdkMode' = 'jdbc', -- 推薦使用jdbc模式。 'cdcmode' = 'true', 'connectionpoolname' = 'the_conn_pool', 'ignoredelete' = 'true', -- 寬表merge需要開啟,防止回撤。 'partial-insert.enabled' = 'true', -- 寬表merge需要開啟此參數,實現部分列更新。 'mutateType' = 'insertOrUpdate', -- 寬表merge需要開啟此參數,實現部分列更新。 'table_property.binlog.level' = 'replica', --也可以在建立catalog時傳入持久化的hologres表屬性,之後建立表時,預設都開啟binlog。 'table_property.binlog.ttl' = '259200' );您需要修改以下參數取值為您實際Hologres服務資訊。
參數
說明
備忘
endpoint
Hologres的Endpoint地址。
在Hologres執行個體詳情頁面擷取網路類型為指定VPC的網域名稱資訊,網域名稱詳情請參見訪問網域名稱。
username
請任選其一:
自訂帳號的使用者名稱,格式為
BASIC$<user_name>。阿里雲帳號或RAM使用者的AccessKey ID。
當前配置的使用者需要能夠訪問對應的Hologres資料庫,Hologres資料庫許可權及使用者管理詳情請參見Hologres許可權模型和使用者管理。
本樣本使用名為
BASIC$flinktest的自訂帳號,並通過名為holosecrect的專案變數來設定其密碼值,從而避免明文儲存帶來的安全風險,詳情請參見專案變數。
password
自訂帳號的密碼。
阿里雲帳號或RAM使用者的AccessKey Secret。
說明建立Catalog時可以設定預設的源表、維表和結果表的WITH參數,也可以設定建立Hologres物理表的預設屬性,例如上方table_property開頭的參數。詳情請參見管理Hologres Catalog和即時數倉Hologres WITH參數。
建立MySQL Catalog。
將如下代碼拷貝到查詢指令碼,並修改目標參數取值,選中目標片段後單擊左側程式碼上的運行。在頁面右下角的執行環境使用的是建立的Session叢集。
CREATE CATALOG mysqlcatalog WITH( 'type' = 'mysql', 'hostname' = '<hostname>', 'port' = '<port>', 'username' = '<username>', 'password' = '${secret_values.mysql_pw}', 'default-database' = 'order_dw' );您需要修改以下參數取值為您實際的MySQL服務資訊。
參數
說明
hostname
MySQL資料庫的IP地址或者Hostname。您可以在資料庫基本資料頁面單擊網路類型地區的查看串連詳情,擷取內網地址。
port
MySQL資料庫服務的連接埠號碼,預設值為3306。
username
MySQL資料庫服務的使用者名稱。
password
MySQL資料庫服務的密碼。
本樣本通過使用名為mysql_pw變數的方式填寫password取值,避免明文等風險,詳情請參見變數管理。
步驟二:搭建即時數倉
構建ODS層:業務資料庫即時入倉
基於Catalog的CREATE DATABASE AS(CDAS)語句功能,可以一次性把ODS層建出來。ODS層一般不直接做OLAP或SERVING(KV點查),主要作為流式作業的事件驅動,開啟Binlog即可滿足需求。Binlog是Hologres的核心能力之一,Hologres連接器也支援先全量讀取再增量消費Binlog的全增量模式。
建立CDAS同步作業ODS。
在頁面,建立名為ODS的SQL流作業,並將如下代碼拷貝到SQL編輯器。
CREATE DATABASE IF NOT EXISTS dw.order_dw -- 建立catalog時設定了table_property.binlog.level參數,因此通過CDAS建立的所有表都開啟了binlog。 AS DATABASE mysqlcatalog.order_dw INCLUDING all tables -- 可以根據需要選擇上遊資料庫需要入倉的表。 /*+ OPTIONS('server-id'='8001-8004') */ ; -- 指定mysql-cdc執行個體server-id範圍。說明本樣本預設將資料同步到資料庫order_dw的Public Schema下。您也可以將資料同步到Hologres目標庫的指定Schema中,詳情請參見作為CDAS的目標端Catalog,指定後使用Catalog時的表名格式也會發生變化,詳情請參見使用Hologres Catalog。
如果源表的資料結構發生變化,則需要等待源表的資料出現變更(刪除、插入、更新),結果表的資料結構才會看到變化。
單擊右上方的部署,進行作業部署。
單擊左側導覽列的,單擊剛剛部署的ODS作業操作列的啟動,選擇無狀態啟動後單擊啟動。
向計算群組載入資料。
Table Group是Hologres中資料的載體。使用read_warehouse_1查詢order_dw資料庫中Table Group(本樣本為order_dw_tg_default,建立步驟請參見Table Group管理)的資料時,為計算群組read_warehouse_1載入order_dw_tg_default,從而實現使用
init_warehouse計算群組寫入資料,使用read_warehouse_1計算群組進行服務查詢。在HoloWeb開發頁單擊SQL編輯器,確認執行個體名和資料庫名稱後,執行如下命令。更多詳情請參見建立新計算群組執行個體。載入後,可以查看到read_warehouse_1已經載入了order_dw_tg_default Table Group的資料。
--查看當前資料庫有哪些Table Group SELECT tablegroup_name FROM hologres.hg_table_group_properties GROUP BY tablegroup_name; --為計算群組載入Table Group CALL hg_table_group_load_to_warehouse ('order_dw.order_dw_tg_default', 'read_warehouse_1', 1); --查看計算群組載入Table Group的情況 select * from hologres.hg_warehouse_table_groups;在右上方切換計算群組為read_warehouse_1,後續查詢分析使用read_warehouse_1計算群組。

在SQL編輯器頁面執行如下命令,查看MySQL同步到Hologres的3張表資料。
---查orders中的資料。 SELECT * FROM orders; ---查orders_pay中的資料。 SELECT * FROM orders_pay; ---查product_catalog中的資料。 SELECT * FROM product_catalog;
構建DWD層:即時主題寬表
構建DWD層用到了Hologres連接器特有的部分列更新能力,可以使用INSERT DML方便地表達部分列更新的語義。作業中需要對不同的維表進行查詢,是基於Hologres行存以及行列共存表提供的高效能的點查能力。同時,Hologres資源強隔離的架構,可以保證寫入、讀取、分析等作業之間互不干擾。
通過Flink Catalog功能在Hologres中建DWD層的寬表dwd_orders。
在頁面的查詢指令碼頁簽,將如下代碼拷貝到查詢指令碼後,選中目標片段後單擊左側程式碼上的運行。
-- 寬表欄位要nullable,因為不同的流寫入到同一張結果表,每一列都可能出現null的情況。 CREATE TABLE dw.order_dw.dwd_orders ( order_id bigint not null, order_user_id string, order_shop_id bigint, order_product_id bigint, order_product_catalog_name string, order_fee numeric(20,2), order_create_time timestamp, order_update_time timestamp, order_state int, pay_id bigint, pay_platform int comment 'platform 0: phone, 1: pc', pay_create_time timestamp, PRIMARY KEY(order_id) NOT ENFORCED ); -- 支援通過catalog修改Hologres物理表屬性。 ALTER TABLE dw.order_dw.dwd_orders SET ( 'table_property.binlog.ttl' = '604800' --修改binlog的逾時時間為一周。 );實現即時消費ODS層orders、orders_pay表的Binlog。
在頁面,建立名為DWD的SQL流作業,並將如下代碼拷貝到SQL編輯器後,部署並啟動作業。通過如下SQL作業,orders表會與product_catalog表進行維表關聯,將最終結果寫入dwd_orders表中,實現資料的即時打寬。
BEGIN STATEMENT SET; INSERT INTO dw.order_dw.dwd_orders ( order_id, order_user_id, order_shop_id, order_product_id, order_fee, order_create_time, order_update_time, order_state, order_product_catalog_name ) SELECT o.*, dim.catalog_name FROM dw.order_dw.orders as o LEFT JOIN dw.order_dw.product_catalog FOR SYSTEM_TIME AS OF proctime() AS dim ON o.product_id = dim.product_id; INSERT INTO dw.order_dw.dwd_orders (pay_id, order_id, pay_platform, pay_create_time) SELECT * FROM dw.order_dw.orders_pay; END;查看寬表dwd_orders資料。
在HoloWeb開發頁面串連Hologres執行個體並登入目標資料庫後,在SQL編輯器上執行如下命令。
SELECT * FROM dwd_orders;
構建DWS層:即時指標計算
通過Flink Catalog功能,在Hologres中建立dws層的彙總dws_users以及dws_shops。
在頁面的查詢指令碼頁簽,將如下代碼拷貝到查詢指令碼後,選中目標片段後單擊左側程式碼上的運行。
-- 使用者維度彙總指標表。 CREATE TABLE dw.order_dw.dws_users ( user_id string not null, ds string not null, paied_buy_fee_sum numeric(20,2) not null comment '當日完成支付的總金額', primary key(user_id,ds) NOT ENFORCED ); -- 商戶維度彙總指標表。 CREATE TABLE dw.order_dw.dws_shops ( shop_id bigint not null, ds string not null, paied_buy_fee_sum numeric(20,2) not null comment '當日完成支付總金額', primary key(shop_id,ds) NOT ENFORCED );即時消費DWD層的寬表dw.order_dw.dwd_orders,在Flink中做彙總計算,最終寫入Hologres中的DWS表。
在頁面,建立名為DWS的SQL流作業,並將如下代碼拷貝到SQL編輯器後,部署並啟動作業。
BEGIN STATEMENT SET; INSERT INTO dw.order_dw.dws_users SELECT order_user_id, DATE_FORMAT (pay_create_time, 'yyyyMMdd') as ds, SUM (order_fee) FROM dw.order_dw.dwd_orders c WHERE pay_id IS NOT NULL AND order_fee IS NOT NULL -- 訂單流和支付流資料都已寫入寬表。 GROUP BY order_user_id, DATE_FORMAT (pay_create_time, 'yyyyMMdd'); INSERT INTO dw.order_dw.dws_shops SELECT order_shop_id, DATE_FORMAT (pay_create_time, 'yyyyMMdd') as ds, SUM (order_fee) FROM dw.order_dw.dwd_orders c WHERE pay_id IS NOT NULL AND order_fee IS NOT NULL -- 訂單流和支付流資料都已寫入寬表。 GROUP BY order_shop_id, DATE_FORMAT (pay_create_time, 'yyyyMMdd'); END;查看DWS層的彙總結果,其結果會根據上遊資料的變更即時更新。
在Hologres控制台查看變更前資料。
dws_users表
SELECT * FROM dws_users;
dws_shops表
SELECT * FROM dws_shops;
在RDS控制台向order_dw資料庫orders和orders_pay表中分別插入1條新資料。
INSERT INTO orders VALUES (100008, 'user_003', 12345, 5, 6000.02, '2023-02-15 09:40:56', '2023-02-15 18:42:56', 1); INSERT INTO orders_pay VALUES (2008, 100008, 1, '2023-02-15 19:40:56');在Hologres控制台查看變更後的資料。
dwd_orders表
SELECT * FROM dwd_orders;
dws_users表
SELECT * FROM dws_users;
dws_shops表
SELECT * FROM dws_shops;
資料探查
因為開啟了Binlog,所以可直接探查到資料的變化情況。如果對中間結果需要即席(Ad-hoc)性質的業務資料探查,或者對最終計算結果進行資料正確性排查,此方案的每一層資料都實現了持久化,可以便捷地探查中間過程。
流模式探查
通過Print連接器可以輔助確認輸出到其他結果表中的訊息是否符合預期。
建立並啟動資料探查流作業。
在頁面,建立名為Data-exploration的SQL流作業,並將如下代碼拷貝到SQL編輯器後,部署並啟動作業。
-- 流模式探查,列印到print可以看到資料的變化情況。 CREATE TEMPORARY TABLE print_sink( order_id bigint not null, order_user_id string, order_shop_id bigint, order_product_id bigint, order_product_catalog_name string, order_fee numeric(20,2), order_create_time timestamp, order_update_time timestamp, order_state int, pay_id bigint, pay_platform int, pay_create_time timestamp, PRIMARY KEY(order_id) NOT ENFORCED ) WITH ( 'connector' = 'print' ); INSERT INTO print_sink SELECT * FROM dw.order_dw.dwd_orders /*+ OPTIONS('startTime'='2023-02-15 12:00:00') */ --這裡的startTime是binlog產生的時間 WHERE order_user_id = 'user_001';查看資料探查結果。
在詳情頁面,單擊目標作業名稱,在作業日誌頁簽下左側作業記錄頁簽,單擊運行Task Managers頁簽下的Path, ID。在Stdout頁面搜尋user_001相關的日誌資訊。

批模式探查
批模式探查不會將資料寫入結果表,而是擷取當前時刻的終態資料,直接通過調試查看結果。
在頁面,建立SQL流作業,並將如下代碼拷貝到SQL編輯器後,單擊調試。詳情請參見作業調試。
在Flink作業開發介面調試結果如下圖所示。
SELECT *
FROM dw.order_dw.dwd_orders /*+ OPTIONS('binlog'='false') */
WHERE order_user_id = 'user_001' and order_create_time > '2023-02-15 12:00:00'; --批量模式支援filter下推,提升批作業執行效率。
步驟三:使用即時數倉
步驟二展示了通過Flink Catalog在Flink側搭建一個基於Flink和Hologres的Streaming Warehouse即時分層數倉。接下來會介紹數倉搭建完成之後的一些簡單應用情境。
Key-Value服務
根據主鍵查詢DWS層的彙總指標表,支援百萬級RPS。
在HoloWeb開發頁面查詢指定使用者指定日期的消費額的程式碼範例如下。
-- holo sql
SELECT * FROM dws_users WHERE user_id ='user_001' AND ds = '20230215';
明細查詢
對DWD層寬表進行OLAP分析。
在HoloWeb開發頁面查詢某個客戶23年2月特定支付平台支付的訂單明細的程式碼範例如下。
-- holo sql
SELECT * FROM dwd_orders
WHERE order_create_time >= '2023-02-01 00:00:00' and order_create_time < '2023-03-01 00:00:00'
AND order_user_id = 'user_001'
AND pay_platform = 0
ORDER BY order_create_time LIMIT 100;
即時報表
基於DWD層寬表資料展示即時報表,Hologres的行列共存以及列存表有非常優秀的OLAP分析能力,支援秒級響應。
在HoloWeb開發頁面查詢23年2月內每個品類的訂單總量和訂單總金額的程式碼範例如下。
-- holo sql
SELECT
TO_CHAR(order_create_time, 'YYYYMMDD') AS order_create_date,
order_product_catalog_name,
COUNT(*),
SUM(order_fee)
FROM
dwd_orders
WHERE
order_create_time >= '2023-02-01 00:00:00' and order_create_time < '2023-03-01 00:00:00'
GROUP BY
order_create_date, order_product_catalog_name
ORDER BY
order_create_date, order_product_catalog_name;
相關文檔
相關業務情境實踐文檔:
Hologres的Binlog能力詳情,請參見訂閱Hologres Binlog。
Flink支援在一個作業中寫入多個INSERT INTO語句,文法請參見INSERT INTO語句。
Realtime ComputeFlink版支援豐富的連接器,詳情請參見支援的連接器。