You can adjust the memory and vCPU resources and the number of tasks to optimize Hive jobs. This topic describes how to optimize a Hive job.
Optimization solutions
Category | Optimization solution |
Optimize code | |
Modify parameters |
Optimize code
Cleanse data
When you read data from a partitioned table, filter data by partition. This way, full table scans are avoided.
Filter data before you perform a JOIN operation.
To avoid repeated calculations, create an intermediate table to store temporary computing results that are repeatedly used.
Optimize code that contains multiple DISTINCT operators
Code before optimization
If you use multiple DISTINCT operators, data expansion may occur.
SELECT k, COUNT(DISTINCT CASE WHEN a > 1 THEN user_id END) user1, COUNT(DISTINCT CASE WHEN a > 2 THEN user_id END) user2, COUNT(DISTINCT CASE WHEN a > 3 THEN user_id END) user3, COUNT(DISTINCT CASE WHEN a > 4 THEN user_id END) user4 FROM t GROUP BY k;Code after optimization
Use two GROUP BY clauses to replace the DISTINCT operators. Use one GROUP BY clause in the inner query to deduplicate data and reduce the amount of data. Use the other GROUP BY clause in the outer query to obtain the sum.
SELECT k, SUM(CASE WHEN user1 > 0 THEN 1 ELSE 0 END) AS user1, SUM(CASE WHEN user2 > 0 THEN 1 ELSE 0 END) AS user2, SUM(CASE WHEN user3 > 0 THEN 1 ELSE 0 END) AS user3, SUM(CASE WHEN user4 > 0 THEN 1 ELSE 0 END) AS user4 FROM (SELECT k, user_id, COUNT(CASE WHEN a > 1 THEN user_id END) user1, COUNT(CASE WHEN a > 2 THEN user_id END) user2, COUNT(CASE WHEN a > 3 THEN user_id END) user3, COUNT(CASE WHEN a > 4 THEN user_id END) user4 FROM t GROUP BY k, user_id ) tmp GROUP BY k;
Handle data skew in GROUP BY operations
Use one of the following methods to process skewed keys:
If skewed keys appear in a GROUP BY operation, perform the following operations:
Enable aggregation in the map stage: Configure Hive parameters to enable aggregation in the map stage to reduce the amount of data that can be transferred to the reduce stage.
set hive.map.aggr=true; set hive.groupby.mapaggr.checkinterval=100000; --The number of data entries that are aggregated in the map stage.Randomly distribute keys and aggregate them multiple times. Alternatively, directly specify reducers.
set hive.groupby.skewindata=true;If you set hive.groupby.skewindata to true, the generated query plan contains two MapReduce jobs. In the first MapReduce job, the output results of map tasks are randomly distributed to reduce tasks. Then, the reduce tasks aggregate the data and generate an output. This way, data entries that have the same key in the GROUP BY operation may be distributed to different reduce tasks to achieve load balancing. In the second MapReduce job, the preprocessing results are distributed to reduce tasks based on keys. This way, data entries that have the same key in the GROUP BY operation are distributed to the same reduce tasks, and then a final aggregation result is generated.
If skewed keys appear when you join two large tables, randomize the skewed keys.
For example, tables named log and bmw_users are joined. The table log contains a large number of null values in the user_id column, and the table bmw_users does not have null values in the user_id column. In this case, you can randomize null values before you associate the values. This way, the null values are distributed to different reduce tasks. Example:
SELECT * FROM log a LEFT OUTER JOIN bmw_users b ON CASE WHEN a.user_id IS NULL THEN CONCAT('dp_hive',RAND()) ELSE a.user_id=b.user_id END;If skewed keys appear in a GROUP BY operation when you join a small table and a large table, use MAP JOIN.
Modify memory parameters
You can modify the parameters that are described in the following tables to optimize the memory resources in the map and reduce stages.
Map stage
Parameter
Description
Example
mapreduce.map.java.opts
Required. The Java virtual machine (JVM) heap memory.
-Xmx2048m
mapreduce.map.memory.mb
Required. The memory space that is occupied by the JVM process. You can use the following formula to calculate the value: Heap memory + Non-heap memory. Example: 2048 + 256.
2304
Reduce stage
Parameter
Description
Example
mapreduce.reduce.java.opts
Required. The JVM heap memory.
-Xmx2048m
mapreduce.reduce.memory.mb
Required. The memory space that is occupied by the JVM process. You can use the following formula to calculate the value: Heap memory + Non-heap memory. Example: 2048 + 256.
2304
Modify vCPU parameters
You can modify the parameters that are described in the following table to optimize the vCPU resources in the map and reduce stages.
Parameter | Description |
mapreduce.map.cpu.vcores | The maximum number of vCPUs that can be used in each map task. |
mapreduce.reduce.cpu.vcores | The maximum number of vCPUs that can be used in each reduce task. Note This parameter does not take effect in fair queuing scenarios. In most cases, this parameter is used to limit the number of vCPUs that users or applications can use in a large cluster. |
Adjust the number of tasks
Adjust the number of map tasks
In a distributed computing system, the number of data blocks in raw data is one of the factors that determine the number of map tasks. In most cases, each map task reads one data block. However, manual adjustments are required in some scenarios. If a large number of small files exist, you can reduce the number of map tasks to improve resource utilization. If the number of files is small but the size of each file is large, you can increase the number of map tasks to reduce the workloads of each map task.
The parameters that determine the number of map tasks are mapred.map.tasks, mapred.min.split.size, and dfs.block.size.
Most Hive files are stored in HDFS, and all files in HDFS are stored as data blocks. Therefore, the number of data blocks to which a Hive file is split may be equivalent to the default number of initial map tasks for the Hive file. The default_mapper_num parameter specifies the default number of initial map tasks. The total data size divided by the default maximum size of each data block in HDFS equals the value of the default_mapper_num parameter.
Formula:
default_mapper_num = total_size/dfs.block.sizeThe system calculates the default split size based on the following formula:
default_split_size = max(mapred.min.split.size, min(mapred.max.split.size, dfs.block.size))In the preceding formula, mapred.min.split.size specifies the minimum split size in a Hive job, and mapred.max.split.size specifies the maximum split size in a Hive job.
The system splits data into data blocks based on the following formula:
split_num = total_size/default_split_size;The system calculates the number of map tasks based on the following formula:
map_task_num = min(split_num, max(mapred.map.tasks, default_mapper_num))In this complex calculation process, various factors are used to ensure that the number of map tasks is not significantly large or significantly small. If you want to increase the number of map tasks, you can reduce the value of mapred.min.split.size. This way, the value of default_split_size is reduced, which further causes the value of split_num to increase. You can also increase the value of mapred.map.tasks.
ImportantHive on Tez jobs and Hive on MapReduce jobs have different computing mechanisms. For example, if you use a Hive on Tez job and a Hive on MapReduce job to perform the same query on the same data, the numbers of map tasks generated by the jobs are significantly different. The main reason is that Tez combines input splits into groups and generates one map task for each group instead of each input split.
Adjust the number of reduce tasks
Use the hive.exec.reducers.bytes.per.reducer parameter to determine the number of bytes that are processed by each reduce task.
The system calculates the number of reduce tasks based on the following formula:
reducer_num = min(total_size/hive.exec.reducers.bytes.per.reducers, hive.exec.reducers.max)Use the mapred.reduce.tasks parameter to specify the number of reduce tasks.
NoteIf you use the Tez engine, you can run the
set hive.tez.auto.reducer.parallelism = true;command to enable auto reducer parallelism. This way, Tez dynamically adjusts the number of reduce tasks based on the output sizes of vertices.The operations to start and initialize reduce tasks consume time and resources, like the operations to start and initialize map tasks. In addition, each reduce task generates one output file. If a large number of small files are generated and used as the input of other tasks, an excessive number of small files are further generated.
Run tasks in different stages in parallel
You can run tasks in different stages of a job in parallel. Hive translates a query into one or more stages. A job may contain multiple stages, and the stages may be independent of each other. If the independent tasks in different stages run in parallel, the overall time that is used to run the job is reduced.
You can configure the parameters described in the following table to enable the parallel running of tasks and specify the maximum number of tasks that can run in parallel.
Parameter | Description |
hive.exec.parallel | Default value: false. If you set the hive.exec.parallel parameter to true, independent tasks in different stages can run in parallel. |
hive.exec.parallel.thread.number | Default value: 8. The maximum number of threads that can run in parallel. |
Fetch task
You can configure the hive.fetch.task.conversion parameter to convert Hive queries into fetch tasks. This helps prevent the overhead of starting the MapReduce program and reduce the execution time of statements.
Parameter | Description |
hive.fetch.task.conversion | Default value: none. Valid values:
|
Enable vectorized query execution
You can configure the parameters described in the following table to enable vectorized query execution. This helps you improve the query performance.
Parameter | Description |
hive.vectorized.execution.enabled | Default value: true. The value true indicates that vectorized query execution is enabled. |
hive.vectorized.execution.reduce.enabled | Default value: true. The value true indicates that vectorized query execution is enabled for reduce tasks. |
Merge small files
If a large number of small files are generated, the file storage performance and data processing efficiency are affected. You can merge the output files of map and reduce tasks to reduce the number of small files.
You can configure the parameters described in the following table to merge small files.
Parameter | Description |
hive.merge.mapfiles | Default value: true. Specifies whether to merge the output files of map tasks. |
hive.merge.mapredfiles | Default value: false. Specifies whether to merge the output files of reduce tasks. |
hive.merge.size.per.task | Default value: 256000000. Unit: bytes. The size of a file into which small files are merged. |
Configuration example
SET hive.merge.mapfiles = true;
SET hive.merge.mapredfiles = true;
SET hive.merge.size.per.task = 536870912; -- 512 MB