本文為您介紹Hologres基於Delta Lake實現湖倉一體的背景、架構、環境準備及使用說明等資訊。
背景資訊
-
Delta Lake是DataBricks公司推出的一種資料湖方案。Delta Lake以資料為中心,圍繞資料流走向(資料從流入資料湖、資料群組織管理和資料查詢到流出資料湖)推出了一系列功能特性,協助您搭配第三方上下遊工具,搭建快捷、易用和安全的資料湖。詳情請參見DeltaLake。
-
EMR是阿里雲提供的雲原生開源巨量資料平台,向您提供簡單易整合的Hadoop、Hive、Spark、Flink等開源巨量資料計算和儲存引擎,便於您使用Hadoop和Spark生態系統中的其他周邊系統分析和處理資料。詳情請參見什麼是E-MapReduce。
-
DLF是一款全託管的協助您構建雲上資料湖及Lakehouse的服務,為您提供了統一的中繼資料管理、統一的許可權與安全管理、便捷的資料入湖能力以及一鍵式資料探索能力。詳情請參見DLF產品簡介。
-
Hologres作為一站式即時數倉,與DLF、EMR無縫整合,打破資料湖與資料倉儲割裂的體系,構建完整的湖倉一體解決方案,將資料湖的靈活性、生態豐富性與即時數倉的高效能線上複雜分析、企業級能力相結合,為您提供一站式即時湖倉解決方案。詳情請參見基於DLF訪問OSS資料湖加速。
整體架構
本解決方案通過EMR Spark來進行資料加工與處理,中繼資料存放區在DLF中,資料存放區在OSS上,Hologres可以利用DLF對OSS中繼資料的管理能力,對OSS多種格式的湖資料(Hudi、Delta、CSV、Parquet、ORC、SequenceFile)進行加速查詢和湖倉融合分析,將資料提供給BI報表、可視化大屏和上層應用進一步消費使用。
環境準備
資料來源準備
此步驟主要針對初次使用EMR或者OSS服務的使用者。如您在實際業務中,已經有大量業務資料通過EMR服務寫入OSS Bucket,可直接使用DLF中繼資料抽取功能自動產生中繼資料資訊,供Hologres來查詢訪問。中繼資料抽取方式請參見中繼資料抽取。
-
開通EMR資料湖叢集,選擇需要的服務和儲存格式,選擇DLF來管理中繼資料。本文以Spark+Hive+Delta為例,開通方式請參見快速建立和使用資料湖分析叢集。

-
開通OSS服務,建立儲存空間用於儲存資料,詳情請參見開通OSS服務。
-
使用EMR Spark構建資料。
-
登入EMR叢集,可選擇SSH方式登入叢集主節點或者免密登入叢集Core節點,詳情請參見登入叢集。
-
構建TPC-H 100GB測試資料,命令如下。
說明本文的TPC-H的實現基於TPC-H的基準測試,並不能與發行的TPC-H基準測試結果相比較,本文中的測試並不符合TPC-H基準測試的所有要求。
# 執行yum update更新所有庫 yum update # 安裝 git 和 gcc yum install git yum install gcc #下載TPC-H資料產生代碼 git clone https://github.com/gregrahn/tpch-kit.git #進入資料產生工具代碼目錄 cd tpch-kit/dbgen # 編譯資料產生工具代碼 make # 運行如下代碼產生資料 ./dbgen -vf -s 100 -
進入Hive互動介面,建立資料庫和表,匯入上述產生的資料。
# 進入hive互動介面 hive # 建立資料庫 CREATE DATABASE IF NOT EXISTS testdb_textfile location 'oss://oss-bucket-dlftest/testdb_textfile'; # 切換至剛建立的資料庫 USE testdb_textfile; # 建立表 CREATE TABLE IF NOT EXISTS nation_textfile ( n_nationkey integer , n_name char(25) , n_regionkey integer , n_comment varchar(152) ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '|'; CREATE TABLE IF NOT EXISTS region_textfile ( r_regionkey integer , r_name char(25) , r_comment varchar(152) ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '|'; CREATE TABLE IF NOT EXISTS part_textfile ( p_partkey integer , p_name varchar(55) , p_mfgr char(25) , p_brand char(10) , p_type varchar(25) , p_size integer , p_container char(10) , p_retailprice decimal(15,2) , p_comment varchar(23) ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '|'; CREATE TABLE IF NOT EXISTS supplier_textfile ( s_suppkey integer , s_name char(25) , s_address varchar(40) , s_nationkey integer , s_phone char(15) , s_acctbal decimal(15,2) , s_comment varchar(101) ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '|'; CREATE TABLE IF NOT EXISTS partsupp_textfile ( ps_partkey integer , ps_suppkey integer , ps_availqty integer , ps_supplycost decimal(15,2) , ps_comment varchar(199) ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '|'; CREATE TABLE IF NOT EXISTS customer_textfile ( c_custkey integer , c_name varchar(25) , c_address varchar(40) , c_nationkey integer , c_phone char(15) , c_acctbal decimal(15,2) , c_mktsegment char(10) , c_comment varchar(117) ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '|'; CREATE TABLE IF NOT EXISTS orders_textfile ( o_orderkey integer , o_custkey integer , o_orderstatus char(1) , o_totalprice decimal(15,2) , o_orderdate date , o_orderpriority char(15) , o_clerk char(15) , o_shippriority integer , o_comment varchar(79) ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '|'; CREATE TABLE IF NOT EXISTS lineitem_textfile ( l_orderkey integer , l_partkey integer , l_suppkey integer , l_linenumber integer , l_quantity decimal(15,2) , l_extendedprice decimal(15,2) , l_discount decimal(15,2) , l_tax decimal(15,2) , l_returnflag char(1) , l_linestatus char(1) , l_shipdate date , l_commitdate date , l_receiptdate date , l_shipinstruct char(25) , l_shipmode char(10) , l_comment varchar(44) ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '|'; # 匯入資料 LOAD DATA LOCAL INPATH '${YOUR_PATH}/nation.tbl*' OVERWRITE INTO TABLE nation_textfile; LOAD DATA LOCAL INPATH '${YOUR_PATH}/region.tbl*' OVERWRITE INTO TABLE region_textfile; LOAD DATA LOCAL INPATH '${YOUR_PATH}/supplier.tbl*' OVERWRITE INTO TABLE supplier_textfile; LOAD DATA LOCAL INPATH '${YOUR_PATH}/customer.tbl*' OVERWRITE INTO TABLE customer_textfile; LOAD DATA LOCAL INPATH '${YOUR_PATH}/part.tbl*' OVERWRITE INTO TABLE part_textfile; LOAD DATA LOCAL INPATH '${YOUR_PATH}/partsupp.tbl*' OVERWRITE INTO TABLE partsupp_textfile; LOAD DATA LOCAL INPATH '${YOUR_PATH}/orders.tbl*' OVERWRITE INTO TABLE orders_textfile; LOAD DATA LOCAL INPATH '${YOUR_PATH}/lineitem.tbl*' OVERWRITE INTO TABLE lineitem_textfile; -
輸入
spark-sql命令進入互動介面,建立資料庫和delta格式的表。# 進入spark-sql互動介面 spark-sql --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' --conf 'spark.sql.delta.mergeSchema=true' --conf 'autoMerge.enable=true' --conf 'spark.sql.parquet.writeLegacyFormat=true' # 建立資料庫 CREATE DATABASE IF NOT EXISTS test_spark_delta LOCATION 'oss://oss-bucket-dlftest/test_spark_delta'; # 切換至剛建立的資料庫並建立表 USE test_spark_delta; CREATE TABLE nation_delta USING delta AS SELECT * FROM ${SOURCE}.nation_textfile; CREATE TABLE region_delta USING delta AS SELECT * FROM ${SOURCE}.region_textfile; CREATE TABLE supplier_delta USING delta AS SELECT * FROM ${SOURCE}.supplier_textfile; CREATE TABLE customer_delta USING delta partitioned BY (c_mktsegment) AS SELECT * FROM ${SOURCE}.customer_textfile; CREATE TABLE part_delta USING delta partitioned BY (p_brand) AS SELECT * FROM ${SOURCE}.part_textfile; CREATE TABLE partsupp_delta USING delta AS SELECT * FROM ${SOURCE}.partsupp_textfile; CREATE TABLE orders_delta USING delta partitioned BY (o_orderdate) AS SELECT * FROM ${SOURCE}.orders_textfile; CREATE TABLE lineitem_delta USING delta partitioned BY (l_shipdate) AS SELECT * FROM ${SOURCE}.lineitem_textfile;
-
Hologres開啟資料加速配置
前往Hologres管理主控台,在實例清單頁單擊對應執行個體操作列的數據湖加速即可開啟。
使用說明
Hologres的資料湖加速能力,可以滿足實際業務中以下兩種使用情境,您可以根據業務需要選擇合適的情境。
情境一:使用Hologres直接加速查詢OSS上的表資料
樣本:
-- 建立DLF外部表格外掛程式
CREATE EXTENSION IF NOT EXISTS dlf_fdw;
-- 建立外部伺服器
CREATE SERVER IF NOT EXISTS dlf_server FOREIGN data wrapper dlf_fdw options
(
dlf_region 'cn-beijing',
dlf_endpoint 'dlf-share.cn-beijing.aliyuncs.com',
oss_endpoint 'oss-cn-beijing-internal.aliyuncs.com'
);
-- 匯入外部表格定義
IMPORT FOREIGN SCHEMA "test_spark_delta" LIMIT TO
(
customer_delta,
lineitem_delta,
nation_delta,
orders_delta,
part_delta,
partsupp_delta,
region_delta,
supplier_delta
)
FROM SERVER dlf_server INTO oss_ext_tables options (if_table_exist 'update');
-- 查詢表資料,以Q22為例
SELECT
cntrycode,
count(*) AS numcust,
sum(c_acctbal) AS totacctbal
FROM
(
SELECT
substring(c_phone FROM 1 FOR 2) AS cntrycode,
c_acctbal
FROM
customer_delta
WHERE
substring(c_phone FROM 1 FOR 2) IN
('24', '32', '17', '18', '12', '14', '22')
AND c_acctbal > (
SELECT
avg(c_acctbal)
FROM
customer_delta
WHERE
c_acctbal > 0.00
AND substring(c_phone FROM 1 FOR 2) IN
('24', '32', '17', '18', '12', '14', '22')
)
AND NOT EXISTS (
SELECT
*
FROM
orders_delta
WHERE
o_custkey = c_custkey
)
) AS custsale
GROUP BY
cntrycode
ORDER BY
cntrycode;
返回結果:
+------------+-------------+---------------+
| cntrycode | numcust | totacctbal |
+------------+-------------+---------------+
| 12 | 90805 | 681136537.68 |
| 14 | 91459 | 685826271.21 |
| 17 | 91313 | 685025263.11 |
| 18 | 91292 | 684588251.63 |
| 22 | 90399 | 677402363.79 |
| 24 | 90635 | 680033065.67 |
| 32 | 90668 | 680459221.16 |
+------------+-------------+---------------+
情境二:匯入Hologres標準儲存以擷取更好的查詢效能
Hologres標準儲存採用SSD(NVME)硬碟,隨機讀寫效能更好。將OSS外表匯入Hologres內部標準儲存,可通過建立索引、設定適合的Shard數、選擇合適的分布列等手段最佳化查詢效能,以Q2為例,可獲得18倍以上的效能提升。詳情請參見最佳化內部表效能化。
-
Hologres中建立相同結構的內部表並匯入資料。
樣本如下,更多建表語句請參見Hologres查詢體驗快速入門。
--建立內部表 BEGIN; CREATE TABLE region ( R_REGIONKEY INT NOT NULL PRIMARY KEY, R_NAME TEXT NOT NULL, R_COMMENT TEXT ); CALL set_table_property('region', 'distribution_key', 'R_REGIONKEY'); CALL set_table_property('region', 'bitmap_columns', 'R_REGIONKEY,R_NAME,R_COMMENT'); CALL set_table_property('region', 'dictionary_encoding_columns', 'R_NAME,R_COMMENT'); CALL set_table_property('region', 'time_to_live_in_seconds', '31536000'); COMMIT; --匯入資料 INSERT INTO public.region SELECT * FROM region_delta ; -
內表查詢結果。
SELECT cntrycode, count(*) AS numcust, sum(c_acctbal) AS totacctbal FROM ( SELECT substring(c_phone FROM 1 FOR 2) AS cntrycode, c_acctbal FROM customer WHERE substring(c_phone FROM 1 FOR 2) IN ('24', '32', '17', '18', '12', '14', '22') AND c_acctbal > ( SELECT avg(c_acctbal) FROM customer WHERE c_acctbal > 0.00 AND substring(c_phone FROM 1 FOR 2) IN ('24', '32', '17', '18', '12', '14', '22') ) AND NOT EXISTS ( SELECT * FROM orders WHERE o_custkey = c_custkey ) ) AS custsale GROUP BY cntrycode ORDER BY cntrycode;
效能對比
以32 Core獨享執行個體為例,可以看到Hologres內部表查詢速度比OSS外部表格會高出約100倍:
-
OSS外部表格
-
查詢耗時:17.24s。
-
執行計畫:

-
-
Hologres內部表
-
查詢耗時:106.67ms。
-
執行計畫:

-