您可以通过调整内存、CPU和Task个数等,实现对Hive作业的调优。本文为您介绍如何调优Hive作业。

作业调优方案

作业调优方向 调优方案
参数调优
代码优化 代码优化

代码优化

  • 数据清洗
    1. 读取表时分区过滤,避免全表扫描。
    2. 数据过滤之后再JOIN。
    3. 重复使用数据时,避免重复计算,构建中间表,重复使用中间表。
  • 多Distinct优化
    • 优化前代码
      多个Distinct时,数据会出现膨胀。
      select k,count(distinct case when a > 1 then user_id) user1,
             count(distinct case when a > 2 then user_id) user2,
             count(distinct case when a > 3 then user_id) user3,
             count(distinct case when a > 4 then user_id) user4
      from t  
      group by k
    • 优化后代码
      通过两次Group By的方式代替Distinct操作,通过内层的Group By去重并降低数据量,通过外层的Group By取sum,即可实现Distinct的效果。
      select k,sum(case when user1 > 0 then 1 end) as user1,
             sum(case when user2 > 0 then 1 end) as user2,
             sum(case when user3 > 0 then 1 end) as user3,
             sum(case when user4 > 0 then 1 end) as user4
      from 
              (select k,user_id,count(case when a > 1 then user_id) user1,
                      count(case when a > 2 then user_id) user2,
                      count(case when a > 3 then user_id) user3,
                      count(case when a > 4 then user_id) user4
              from t
              group by k,user_id  
              ) tmp 
      group by k
  • 数据倾斜

    热点Key处理方式如下:

    • 如果是Group By出现热点,请按照以下方法操作:
      1. 先开启Map端聚合。
        set hive.map.aggr=true
        hive.groupby.mapaggr.checkinterval=100000(用于设定Map端进行聚合操作的条目数)
      2. 可以对Key随机化打散,多次聚合,或者直接设置。
        set hive.groupby.skewindata=true;

        当选项设定为true时,生成的查询计划有两个MapReduce任务。在第一个MapReduce中,Map的输出结果集合会随机分布到Reduce中, 每个部分进行聚合操作,并输出结果。这样处理的结果是,相同的Group By Key有可能分发到不同的Reduce中,从而达到负载均衡的目的;第二个MapReduce任务再根据预处理的数据结果按照Group By Key分布到Reduce中(这个过程可以保证相同的Group By Key分布到同一个Reduce中),最后完成最终的聚合操作。

    • 如果两个大表进行JOIN操作时,出现热点,则使用热点Key随机化。
      例如,log表存在大量user_id为null的记录,但是表bmw_users中不会存在user_id为空,则可以把null随机化再关联,这样就避免null值都分发到一个Reduce Task上。代码示例如下。
      SELECT * FROM log a LEFT OUTER 
      JOIN bmw_users b ON 
      CASE WHEN a.user_id IS NULL THEN CONCAT(‘dp_hive’,RAND()) ELSE a.user_id=b.user_id END;
    • 如果大表和小表进行JOIN操作时,出现热点,则使用MAP JOIN。

内存参数

您可以通过设置以下参数,对Map和Reduce阶段的内存进行调优:
  • Map阶段
    参数 描述 示例
    mapreduce.map.java.opts 默认参数,表示JVM堆内存。 -Xmx2048m
    mapreduce.map.memory.mb 默认参数,表示整个JVM进程占用的内存,计算方法为堆内存+堆外内存=2048+256 2304
  • Reduce阶段
    参数 描述 示例
    mapreduce.reduce.java.opts 默认参数,表示JVM堆内存。 -Xmx2048m
    mapreduce.reduce.memory.mb 默认参数,表示整个JVM进程占用的内存,计算方法为堆内存+堆外内存=2048+256 2304

CPU参数

您可以通过设置以下参数,对Map和Reduce任务的内存进行调优。
参数 描述
mapreduce.map.cpu.vcores 每个Map任务可用的最多的CPU Core数目。
mapreduce.reduce.cpu.vcores 每个Reduce任务可用的最多的CPU Core数目。
说明 此设置在公平队列是不生效的,通常vCores用于较大的集群,以限制不同用户或应用程序的CPU。

Task数量优化

  • Map Task数量优化

    在分布式计算系统中,决定Map数量的一个因素就是原始数据,在不加干预的情况下,原始数据有多少个块,就可能有多少个起始的Task,因为每个Task对应读取一个块的数据;当然这个也不是绝对的,当文件数量特别多,并且每个文件的大小特别小时,您就可以限制减少初始Map对相应的Task的数量,以减少计算资源的浪费,如果文件数量较少,但是单个文件较大,您可以增加Map的Task的数量,以减小单个Task的压力。

    通常,Map Task数量是由mapred.map.tasksmapred.min.split.sizedfs.block.size决定的。

    1. Hive的文件基本上都是存储在HDFS上,而HDFS上的文件,都是分块的,所以具体的Hive数据文件在HDFS上分多少块,可能对应的是默认Hive起始的Task的数量,使用default_mapper_num参数表示。使用数据总大小除以dfs默认的最大块大小来决定初始默认数据分区数。
      初始默认的Map Task数量,具体公式如下。
      default_mapper_num = total_size/dfs.block.size
    2. 计算Split的size,具体公式如下。
      default_split_size = max(mapred.min.split.size, min(mapred.max.split.size, dfs.block.size))

      上述公式中的mapred.min.split.sizemapred.max.split.size,分别为Hive计算的时Split的最小值和最大值。

    3. 将数据按照计算出来的size划分为数据块,具体公式如下。
      split_num = total_size/default_split_size;
    4. 计算的Map Task数量,具体公式如下。
      map_task_num = min(split_num, max(mapred.map.tasks, default_mapper_num))
      从上面的过程来看,Task的数量受各个方面的限制,不至于Task的数量太多,也不至于Task的数量太少。如果需要提高Task数量,就要降低mapper.min.split.size的数值,在一定的范围内可以减小default_split_size的数值,从而增加split_num的数量,也可以增大mapred.map.tasks的数量。
      注意 Hive on TEZ和Hive on MR使用是有差异的。例如,在Hive中执行一个Query时,可以发现Hive的执行引擎在使用Tez与MR时,两者生成的mapper数量差异较大。主要原因在于Tez中对inputSplit做了grouping操作,可以将多个inputSplit组合成更少的groups,然后为每个group生成一个mapper任务,而不是为每个inputSplit生成一个mapper任务。
  • Reduce Task数量优化
    • 通过hive.exec.reducers.bytes.per.reducer参数控制单个Reduce处理的字节数。
      Reduce的计算方法如下。
      reducer_num = min(total_size/hive.exec.reducers.bytes.per.reducers, hive.exec.reducers.max)。
    • 通过mapred.reduce.tasks参数来设置Reduce Task的数量。
      说明 在TEZ引擎模式下,通过命令set hive.tez.auto.reducer.parallelism = true;,TEZ将会根据vertice的输出大小动态预估调整Reduce Task的数量。

      同Map一样,启动和初始化Reduce也会消耗时间和资源。另外,有多少个Reduce,就会有多少个输出文件,如果生成了很多个小文件,并且这些小文件作为下一个任务的输入,则会出现小文件过多的问题。

并行运行

并行运行表示同步执行Hive的多个阶段。Hive在执行过程中,将一个查询转化成一个或者多个阶段。某个特定的Job可能包含多个阶段,而这些阶段可能并非完全相互依赖的,也就是可以并行运行的,这样可以使得整个Job的运行时间缩短。

您可以通过设置以下参数,控制不同的作业是否可以同时运行。
参数 描述
hive.exec.parallel 默认值为false。设置true时,表示允许不同的Job可以同时运行。
hive.exec.parallel.thread.number 默认值为8。表示允许同时运行的Job的最大值。

Fetch task

您可以通过设置以下参数,在执行查询等语句时,不执行MapReduce程序,以减少等待时间。
参数 描述
hive.fetch.task.conversion 默认值为none。参数取值如下:
  • none:关闭Fetch task优化。

    在执行语句时,执行MapReduce程序。

  • minimal:只在SELECT、FILTER和LIMIT的语句上进行优化。
  • more:在minimal的基础上更强大,SELECT不仅仅是查看,还可以单独选择列,FILTER也不再局限于分区字段,同时支持虚拟列(别名)。

开启向量化

您可以通过设置以下参数,在执行查询等语句时,不执行MapReduce程序,以减少等待时间。
参数 描述
hive.vectorized.execution.enabled 默认值为true。开启向量化查询的开关。
hive.vectorized.execution.reduce.enabled 默认值为true。表示是否启用Reduce任务的向量化执行模式。

合并小文件

大量小文件容易在文件存储端造成瓶颈,影响处理效率。对此,您可以通过合并Map和Reduce的结果文件来处理。

您可以通过设置以下参数,合并小文件。
参数 描述
hive.merge.mapfiles 默认值为true。表示是否合并Map输出文件。
hive.merge.mapredfiles 默认值为flase。表示是否合并Reduce输出文件。
hive.merge.size.per.task 默认值为256000000,单位字节。表示合并文件的大小。