You can adjust the memory and vCPU resources and the number of tasks to optimize Hive jobs. This topic describes how to optimize a Hive job.

Optimization solutions

Category Optimization solution
Modify parameters
Optimize code Optimize code

Optimize code

  • Cleanse data
    1. When you read data from a partitioned table, filter data by partition. This way, full table scans are avoided.
    2. Filter data before you perform a JOIN operation.
    3. To avoid repeated calculations, create an intermediate table to store temporary computing results that are repeatedly used.
  • Optimize code that contains multiple DISTINCT operators
    • Code before optimization
      If you use multiple DISTINCT operators, data expansion may occur.
      select k,count(distinct case when a > 1 then user_id) user1,
             count(distinct case when a > 2 then user_id) user2,
             count(distinct case when a > 3 then user_id) user3,
             count(distinct case when a > 4 then user_id) user4
      from t  
      group by k
    • Code after optimization
      Use two GROUP BY clauses to replace the DISTINCT operators. Use one GROUP BY clause in the inner query to deduplicate data and reduce the amount of data. Use the other GROUP BY clause in the outer query to obtain the sum.
      select k,sum(case when user1 > 0 then 1 end) as user1,
             sum(case when user2 > 0 then 1 end) as user2,
             sum(case when user3 > 0 then 1 end) as user3,
             sum(case when user4 > 0 then 1 end) as user4
      from 
              (select k,user_id,count(case when a > 1 then user_id) user1,
                      count(case when a > 2 then user_id) user2,
                      count(case when a > 3 then user_id) user3,
                      count(case when a > 4 then user_id) user4
              from t
              group by k,user_id  
              ) tmp 
      group by k
  • Handle data skew

    Use one of the following methods to process skewed keys:

    • If skewed keys appear in a GROUP BY operation, perform the following operations:
      1. Enable aggregation in the map stage.
        set hive.map.aggr=true
        hive.groupby.mapaggr.checkinterval=100000 (the number of data entries that are aggregated in the map stage)
      2. You can randomly distribute keys and aggregate them multiple times. You can also directly specify reducers.
        set hive.groupby.skewindata=true;

        If you set hive.groupby.skewindata to true, the generated query plan contains two MapReduce jobs. In the first MapReduce job, the output results of map tasks are randomly distributed to reduce tasks. Then, the reduce tasks aggregate the data and generate an output. This way, data entries that have the same key in the GROUP BY operation may be distributed to different reduce tasks to achieve load balancing. In the second MapReduce job, the preprocessing results are distributed to reduce tasks based on keys. This way, data entries that have the same key in the GROUP BY operation are distributed to the same reduce tasks, and then a final aggregation result is generated.

    • If skewed keys appear when you join two large tables, randomize the skewed keys.
      For example, tables named log and bmw_users are joined. The table log contains a large number of null values in the user_id column, and the table bmw_users does not have null values in the user_id column. In this case, you can randomize null values before you associate the values. This way, the null values are distributed to different reduce tasks. Example:
      SELECT * FROM log a LEFT OUTER 
      JOIN bmw_users b ON 
      CASE WHEN a.user_id IS NULL THEN CONCAT('dp_hive',RAND()) ELSE a.user_id=b.user_id END;
    • If skewed keys appear in a GROUP BY operation when you join a small table and a large table, use MAP JOIN.

Modify memory parameters

You can modify the parameters that are described in the following tables to optimize the memory resources in the map and reduce stages.
  • Map stage
    Parameter Description Example
    mapreduce.map.java.opts Required. The Java virtual machine (JVM) heap memory. -Xmx2048m
    mapreduce.map.memory.mb Required. The memory space that is occupied by the JVM process. You can use the following formula to calculate the value: Heap memory + Non-heap memory. Example: 2048 + 256. 2304
  • Reduce stage
    Parameter Description Example
    mapreduce.reduce.java.opts Required. The Java virtual machine (JVM) heap memory. -Xmx2048m
    mapreduce.reduce.memory.mb Required. The memory space that is occupied by the JVM process. You can use the following formula to calculate the value: Heap memory + Non-heap memory. Example: 2048 + 256. 2304

Modify vCPU parameters

You can modify the parameters that are described in the following table to optimize the vCPU resources in the map and reduce stages.
Parameter Description
mapreduce.map.cpu.vcores The maximum number of vCPUs that can be used in each map task.
mapreduce.reduce.cpu.vcores The maximum number of vCPUs that can be used in each reduce task.
Note This parameter does not take effect in fair queuing scenarios. In most cases, this parameter is used to limit the number of vCPUs that users or applications can use in a large cluster.

Adjust the number of tasks

  • Adjust the number of map tasks

    In a distributed computing system, the number of data blocks in raw data is one of the factors that determine the number of map tasks. If you do not manually modify the related parameters, the number of map tasks corresponds to the number of data blocks in the raw data. This way, each map task reads one data block in most cases. However, manual adjustments are required in some scenarios. If a large number of small files exist, you can reduce the number of map tasks to improve resource utilization. If the number of files is small but the size of each file is large, you can increase the number of map tasks to reduce the workloads of each map task.

    The parameters that determine the number of map tasks are mapred.map.tasks, mapred.min.split.size, and dfs.block.size.

    1. Most Hive files are stored in HDFS, and all files in HDFS are stored as data blocks. Therefore, the number of data blocks to which a Hive file is split may be equivalent to the default number of initial map tasks for the Hive file. The default_mapper_num parameter specifies the default number of initial map tasks. The total data size divided by the default maximum size of each data block in HDFS equals the value of the default_mapper_num parameter.
      Formula:
      default_mapper_num = total_size/dfs.block.size
    2. The system calculates the default split size based on the following formula:
      default_split_size = max(mapred.min.split.size, min(mapred.max.split.size, dfs.block.size))

      In the preceding formula, mapred.min.split.size specifies the minimum split size in a Hive job, and mapred.max.split.size specifies the maximum split size in a Hive job.

    3. The system splits data into data blocks based on the following formula:
      split_num = total_size/default_split_size;
    4. The system calculates the number of map tasks based on the following formula:
      map_task_num = min(split_num, max(mapred.map.tasks, default_mapper_num))
      In this complex calculation process, various factors are used to ensure that the number of map tasks is not significantly large or significantly small. If you want to increase the number of map tasks, you can reduce the value of mapred.min.split.size. This way, the value of default_split_size is reduced, which further causes the value of split_num to increase. You can also increase the value of mapred.map.tasks.
      Notice Hive on Tez jobs and Hive on MapReduce jobs have different computing mechanisms. For example, if you use a Hive on Tez job and a Hive on MapReduce job to perform the same query on the same data, the numbers of map tasks generated by the jobs are significantly different. The main reason is that Tez combines input splits into groups and generates one map task for each group instead of each input split.
  • Adjust the number of reduce tasks
    • Use the hive.exec.reducers.bytes.per.reducer parameter to determine the number of bytes that are processed by each reduce task.
      The system calculates the number of reduce tasks based on the following formula:
      reducer_num = min(total_size/hive.exec.reducers.bytes.per.reducers, hive.exec.reducers.max) 
    • Use the mapred.reduce.tasks parameter to specify the number of reduce tasks.
      Note If you use the Tez engine, you can run the set hive.tez.auto.reducer.parallelism = true; command to enable auto reducer parallelism. This way, Tez dynamically adjusts the number of reduce tasks based on the output sizes of vertices.

      The operations to start and initialize reduce tasks consume time and resources. In addition, each reduce task generates one output file. If a large number of small files are generated and used as the input of other tasks, an excessive number of small files are further generated.

Run tasks in different stages in parallel

You are allowed to run tasks in different stages of a job in parallel. Hive translates a query into one or more stages. A job may contain multiple stages, and the stages may be independent of each other. If the independent tasks in different stages run in parallel, the overall time that is used to run the job is reduced.

You can configure the parameters described in the following table to enable the parallel running of tasks and specify the maximum number of tasks that can run in parallel.
Parameter Description
hive.exec.parallel Default value: false. The value true indicates that independent tasks in different stages can run in parallel.
hive.exec.parallel.thread.number The maximum number of tasks that can run in parallel. Default value: 8.

Use fetch tasks

You can configure the hive.fetch.task.conversion parameter to convert Hive queries into fetch tasks. This helps prevent the overhead of starting the MapReduce program and reduce the execution time of statements.
Parameter Description
hive.fetch.task.conversion Default value: none. Valid values:
  • none: No queries are converted to fetch tasks.

    The MapReduce program is run when you execute statements.

  • minimal: Fetch tasks can be used only in SELECT*, FILTER on partition key columns, and LIMIT statements.
  • more: This value is more powerful than minimal. If you use this value, fetch tasks can also be used in SELECT for specified columns and FILTER on non-partition key columns. Virtual columns (aliases) are also supported.

Enable vectorized query execution

You can configure the parameters described in the following table to enable vectorized query execution, which helps prevent the overhead of starting the MapReduce program and reduce the execution time of statements.
Parameter Description
hive.vectorized.execution.enabled Default value: true. The value true indicates that vectorized query execution is enabled.
hive.vectorized.execution.reduce.enabled Default value: true. The value true indicates that vectorized query execution is enabled for reduce tasks.

Merge small files

If a large number of small files are generated, the file storage performance and data processing efficiency are affected. You can merge the output files of map and reduce tasks to reduce the number of small files.

You can configure the parameters described in the following table to merge small files.
Parameter Description
hive.merge.mapfiles Specifies whether to merge the output files of map tasks. Default value: true.
hive.merge.mapredfiles Specifies whether to merge the output files of reduce tasks. Default value: false.
hive.merge.size.per.task The size of a file into which small files are merged. Default value: 256000000. Unit: bytes.