Hologres是兼容PostgreSQL协议的一站式实时数仓引擎,支持实时离线一体化场景下的海量数据写入、更新与查询。本文针对Hologres的批量写入场景,结合Spark批量写入Hologres的性能测试与结果,为您介绍不同的写入场景下应如何选择合适的写入模式。
针对数据的实时写入与实时更新,性能测试方案与结果请参见数据写入、更新、点查场景压测最佳实践。
批量写入模式对比
针对数据批量写入场景,Hologres支持不同的写入模式,包括:传统的COPY模式、基于COPY协议开发的FIXED COPY流式导入模式,以及将批量导入转化成流式的INSERT INTO VALUES模式。
三种写入模式详细对比如下:
|
对比项 |
COPY |
FIXED COPY |
INSERT INTO VALUES |
|
|
简介 |
COPY批量导入无主键表 |
COPY批量导入有主键表 |
基于COPY协议开发的流式导入模式 |
基础流式导入模式 |
|
典型场景 |
|
|
|
|
|
锁粒度 |
行锁 |
表锁 |
行锁 |
行锁 |
|
数据可见性 |
COPY结束后可见 |
COPY结束后可见 |
实时可见 |
实时可见 |
|
性能 |
高 |
高 |
中 |
中 |
|
Hologres资源消耗 |
低 |
低 |
高 |
高 |
|
客户端资源消耗 |
低 |
高 |
低 |
中 |
|
主键冲突策略支持 |
不涉及 |
|
|
|
批量写入模式选择
针对数据批量写入场景,三种模式的特点如下:
-
COPY模式:传统的COPY模式,拥有最优的写入性能、最低的Hologres资源消耗。
-
FIXED COPY模式:基于COPY协议开发的流式导入模式。仅产生行锁、数据实时可见。
-
INSERT INTO VALUES模式:将批量导入转化为传统的流式写入模式,在批量写入场景下已不具备明显优势。
说明INSERT模式仅在批量写入场景下不具备明显优势,但仅INSERT模式具备数据回撤(即删除)能力,在Binlog数据写入等场景下仍需使用该模式。
若您对数据实时可见性、锁粒度(产生表锁时,该表无法支持多个任务同时写入),以及数据源端负载没有特殊要求,可以优先选择COPY模式进行批量写入。
-
以Spark批量写入为例:推荐将Hologres实例升级至V2.2.25或以上版本,并将Connector的写入参数
write.mode值设为auto(默认值),系统将会自动选择最优的写入模式。 -
以Flink批量写入为例:需要先根据下述决策树确定选用COPY模式或FIXED COPY模式,然后分别配置如下参数。
-
jdbccopywritemode:设为TRUE,即不使用INSERT模式。
-
bulkload:TRUE(COPY模式)或FALSE(FIXED COPY模式),请根据需求进行配置。
-
针对具体的批量写入场景,可根据如下决策树选择合适的写入模式。
批量写入性能测试
测试过程使用Hologres研发的开源组件Hologres-Spark-Connector。
准备工作
基础环境准备
您需准备以下环境:
Hologres实例和EMR-Spark集群需要位于同一地域,且使用相同的VPC。
-
开通V2.2.25或以上版本的Hologres实例,并创建数据库。
-
下载Spark-Connector包。
您可通过Maven中央仓库下载Spark读写Hologres时需要引用的连接器JAR包
hologres-connector-spark-3.x。
本文中使用的环境信息如下:
|
服务 |
版本 |
规格 |
|
Hologres |
V3.0.30 |
64 Core,256 GB(1CU=1 core 4GB) |
|
EMR-Spark |
EMR-5.18.1,Spark-3.5.3 |
8 Core,32 GB * 8(1个master节点,7个core节点) 重要
需开通OSS-HDFS服务。 |
|
Spark-Connector |
1.5.2 |
不涉及 |
测试数据准备
-
准备原始数据。
-
登录EMR-Spark集群的master节点,详情请参见连接ECS实例的方式。
-
单击TPC-H_Tools_v3.0.0.zip下载TPCH工具,将其复制到集群master节点的ECS机器中进行解压,然后进入
TPC-H_Tools_v3.0.0/TPC-H_Tools_v3.0.0/dbgen目录。 -
执行如下代码,在
dbgen目录下生成1 TB测试集文件customer.tbl,原tbl格式文件大小为23 GB。./dbgen -s 1000 -T cTPC-H 1 TB数据集中customer表的详细信息如下:
表信息
说明
字段数据量
8
字段类型
INT、BIGINT、TEXT、DECIMAL
数据行数
150,000,000
表Shard数
40
-
-
测试数据导入Spark。
执行如下命令,将customer.tbl文件上传到Spark集群。
hadoop fs -put customer.tbl <spark_resource>其中spark_resource为创建EMR-Spark集群时,配置的集群存储根路径。
-
在Hologres中创建不同存储格式的表。SQL命令如下:
行列共存
CREATE TABLE test_table_mixed ( C_CUSTKEY BIGINT PRIMARY KEY, C_NAME TEXT, C_ADDRESS TEXT, C_NATIONKEY INT, C_PHONE TEXT, C_ACCTBAL DECIMAL(15, 2), C_MKTSEGMENT TEXT, C_COMMENT TEXT ) WITH ( orientation = 'column,row' );列存(有主键)
CREATE TABLE test_table_column ( C_CUSTKEY BIGINT PRIMARY KEY, C_NAME TEXT, C_ADDRESS TEXT, C_NATIONKEY INT, C_PHONE TEXT, C_ACCTBAL DECIMAL(15, 2), C_MKTSEGMENT TEXT, C_COMMENT TEXT ) WITH ( orientation = 'column' );列存(无主键)
CREATE TABLE test_table_column_no_pk ( C_CUSTKEY BIGINT, C_NAME TEXT, C_ADDRESS TEXT, C_NATIONKEY INT, C_PHONE TEXT, C_ACCTBAL DECIMAL(15, 2), C_MKTSEGMENT TEXT, C_COMMENT TEXT ) WITH ( orientation = 'column' );行存(有主键)
CREATE TABLE test_table_row ( C_CUSTKEY BIGINT PRIMARY KEY, C_NAME TEXT, C_ADDRESS TEXT, C_NATIONKEY INT, C_PHONE TEXT, C_ACCTBAL DECIMAL(15, 2), C_MKTSEGMENT TEXT, C_COMMENT TEXT ) WITH ( orientation = 'row' );行存(无主键)
CREATE TABLE test_table_row_no_pk ( C_CUSTKEY BIGINT, C_NAME TEXT, C_ADDRESS TEXT, C_NATIONKEY INT, C_PHONE TEXT, C_ACCTBAL DECIMAL(15, 2), C_MKTSEGMENT TEXT, C_COMMENT TEXT ) WITH ( orientation = 'row' );
性能测试
测试配置
本文主要测试不同模式下的导入性能。
-
登录EMR-Spark集群的master节点(登录方式请参见连接ECS实例的方式),上传已下载的Spark-Connector包,然后执行如下命令进入spark-sql交互界面。
说明通过调整
spark.sql.files.maxPartitionBytes参数值, 可以控制Spark读取HDFS文件的并发。此处并发控制为40。# 进入spark-sql交互界面 spark-sql --jars <path>/hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar \ --conf spark.executor.instances=40 \ --conf spark.executor.cores=1 \ --conf spark.executor.memory=4g \ --conf spark.sql.files.maxPartitionBytes=644245094其中path为hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar所在的根路径。
-
在spark-sql交互界面执行如下SQL,通过创建临时表的方式写入数据。
由于测试过程中,需要多次调整参数以测试不同模式的写入性能,此处选择创建临时表的方式。
说明实际使用中,您可直接使用Catalog加载Hologres表,更加方便。
-- 创建csv格式的临时表 CREATE TEMPORARY VIEW csvtable ( c_custkey BIGINT, c_name STRING, c_address STRING, c_nationkey INT, c_phone STRING, c_acctbal DECIMAL(15, 2), c_mktsegment STRING, c_comment STRING) USING csv OPTIONS ( path "<spark_resources>/customer.tbl", sep "|" ); CREATE TEMPORARY VIEW hologresTable ( c_custkey BIGINT, c_name STRING, c_address STRING, c_nationkey INT, c_phone STRING, c_acctbal DECIMAL(15, 2), c_mktsegment STRING, c_comment STRING) USING hologres OPTIONS ( jdbcurl "jdbc:postgresql://<hologres_vpc_endpoint>/<database_name>", username "<accesskey_id>", password "<accesskey_secret>", table "<table_name>", direct_connect "false", write.mode "auto", write.insert.thread_size "3", write.insert.batch_size "2048" ); INSERT INTO hologresTable SELECT * FROM csvTable;参数说明如下:
参数名
描述
spark_resources
创建EMR-Spark集群时,配置的集群存储根路径。
您可登录EMR on ECS控制台,单击目标集群ID,在基础信息页面的集群信息区域获取集群存储根路径。
hologres_vpc_endpoint
Hologres实例对应的VPC网络类型的域名。
您可登录Hologres管理控制台,单击目标实例ID,在实例详情页的网络信息区域,获取指定VPC的域名。例如:杭州地域的VPC Endpoint格式为
<Instance ID>-cn-hangzhou-vpc-st.hologres.aliyuncs.com:80。database_name
Hologres实例的数据库名称。
accesskey_id
具有Hologres相应Database读取权限的AccessKey ID。
accesskey_secret
具有Hologres相应Database读取权限的AccessKey Secret。
table_name
待写入的Hologres目标表名称。
write.mode
写入模式,取值如下:
-
auto:默认值,Connector自行选择最佳的模式。 -
insert:使用INSERT INTO VALUES方式写入。 -
stream:使用FIXED COPY流式写入。 -
bulk_load:使用COPY模式批量导入无主键表。 -
bulk_load_on_conflict:使用COPY模式批量导入有主键表。
write.insert.thread_size
写入并发,仅使用INSERT写入时生效。
write.insert.batch_size
写入攒批,仅使用INSERT写入时生效。
write.on_conflict_action
INSERT_OR_REPLACE(默认):主键冲突更新。
INSERT_OR_IGNORE: 主键冲突忽略。
更多参数信息请参见参数说明。
-
测试场景
|
测试场景 |
可选场景 |
|
表存储格式 |
|
|
数据更新方式 |
|
|
写入方式 |
|
测试结果
测试结果包括以下字段:
|
字段 |
描述 |
|
作业总耗时 |
Spark作业总的运行时间。 即您在EMR-Spark集群节点的spark-sql交互界面,执行INSERT数据写入操作的耗时。获取方法如下图: |
|
数据写入平均耗时 |
剔除Spark集群调度以及读取数据或者数据重分布的时间,仅统计写入作业的平均耗时。 您可在Hologres管理控制台的HoloWeb页面,执行如下SQL命令,即可获取Shard并发数(count)和数据写入平均耗时(avg_duration_ms,单位为:毫秒)。
参数说明如下:
|
|
Hologres负载 |
Hologres实例的CPU使用率。 您可在Hologres管理控制台的实例监控信息页面获取CPU使用率。 |
|
Spark负载 |
EMR节点负载。 您可在EMR on ECS控制台单击目标集群ID,进入监控诊断 > 指标监控页签,选择如下参数:
然后查询CPU utilization指标,即为Spark负载。 |
详细测试结果如下:
|
存储格式 |
主键 |
写入模式 |
作业总耗时 |
数据写入平均耗时 |
Hologres负载 |
Spark负载 |
|
列存 |
无主键 |
insert |
241.61s |
232.70s |
92% |
15% |
|
stream |
228.11s |
222.34s |
100% |
36% |
||
|
bulk_load |
88.72s |
57.16s |
97% |
47% |
||
|
有主键 ignore |
insert |
190.96s |
172.60s |
90% |
14% |
|
|
stream |
149.60s |
142.16s |
100% |
14% |
||
|
bulk_load_on_conflict |
115.96s |
42.92s |
60% |
75% |
||
|
有主键 replace |
insert |
600.40s |
574.31s |
91% |
5% |
|
|
stream |
550.29s |
540.32s |
100% |
5% |
||
|
bulk_load_on_conflict |
188.05s |
109.77s |
93% |
78% |
||
|
行存 |
无主键 |
insert |
132.38s |
123.79s |
94% |
22% |
|
stream |
114.41s |
103.81s |
100% |
17% |
||
|
bulk_load |
68.20s |
41.22s |
98% |
32% |
||
|
有主键 ignore |
insert |
190.48s |
170.49s |
89% |
15% |
|
|
stream |
185.46s |
172.48s |
85% |
14% |
||
|
bulk_load_on_conflict |
117.81s |
47.69s |
58% |
75% |
||
|
有主键 replace |
insert |
177.97s |
170.78s |
93% |
15% |
|
|
stream |
142.44s |
130.16s |
100% |
20% |
||
|
bulk_load_on_conflict |
137.69s |
65.18s |
92% |
78% |
||
|
行列共存 |
有主键 ignore |
insert |
172.19s |
158.74s |
86% |
16% |
|
stream |
150.63s |
149.76s |
100% |
12% |
||
|
bulk_load_on_conflict |
128.83s |
42.09s |
59% |
79% |
||
|
有主键 replace |
insert |
690.37s |
662.00s |
92% |
5% |
|
|
stream |
625.84s |
623.08s |
100% |
4% |
||
|
bulk_load_on_conflict |
202.07s |
121.58s |
93% |
80% |
写入模式的对应关系如下:
-
insert:INSERT INTO VALUES
-
stream:FIXED COPY
-
bulk_load:COPY导入无主键表
-
bulk_load_on_conflict :COPY导入有主键表
