全部產品
Search
文件中心

E-MapReduce:Hive作業調優

更新時間:Apr 22, 2025

您可以通過調整記憶體、CPU和Task個數等,實現對Hive作業的調優。本文為您介紹如何調優Hive作業。

作業調優方案

作業調優方向

調優方案

代碼最佳化

代碼最佳化

參數調優

代碼最佳化

  • 資料清洗

    1. 讀取表時分區過濾,避免全表掃描。

    2. 資料過濾之後再JOIN。

    3. 重複使用資料時,避免重複計算,構建中間表,重複使用中間表。

  • 多DISTINCT最佳化

    • 最佳化前代碼

      多個DISTINCT時,資料會出現膨脹。

      SELECT k,
             COUNT(DISTINCT CASE WHEN a > 1 THEN user_id END) user1,
             COUNT(DISTINCT CASE WHEN a > 2 THEN user_id END) user2,
             COUNT(DISTINCT CASE WHEN a > 3 THEN user_id END) user3,
             COUNT(DISTINCT CASE WHEN a > 4 THEN user_id END) user4
      FROM t  
      GROUP BY k;
    • 最佳化後代碼

      通過兩次GROUP BY的方式代替DISTINCT操作,通過內層的GROUP BY去重並降低資料量,通過外層的GROUP BY取SUM,即可實現DISTINCT的效果。

      SELECT k,
             SUM(CASE WHEN user1 > 0 THEN 1 ELSE 0 END) AS user1,
             SUM(CASE WHEN user2 > 0 THEN 1 ELSE 0 END) AS user2,
             SUM(CASE WHEN user3 > 0 THEN 1 ELSE 0 END) AS user3,
             SUM(CASE WHEN user4 > 0 THEN 1 ELSE 0 END) AS user4
      FROM 
              (SELECT k,
                      user_id,
                      COUNT(CASE WHEN a > 1 THEN user_id END) user1,
                      COUNT(CASE WHEN a > 2 THEN user_id END) user2,
                      COUNT(CASE WHEN a > 3 THEN user_id END) user3,
                      COUNT(CASE WHEN a > 4 THEN user_id END) user4
              FROM t
              GROUP BY k, user_id  
              ) tmp 
      GROUP BY k;
  • GROUP BY 操作中的資料扭曲

    熱點Key處理方式如下:

    • 當執行GROUP BY操作時如果發現存在熱點索引值(即某些索引值對應的記錄數遠超其他),可以通過以下步驟來最佳化:

      1. 啟用Map端彙總:首先,通過設定Hive參數開啟Map階段的部分彙總功能,減少傳輸到Reduce階段的資料量。

        set hive.map.aggr=true;
        set hive.groupby.mapaggr.checkinterval=100000;  -- 用於設定Map端進行彙總操作的條目數
      2. 可以對Key隨機化打散,多次彙總,或者直接設定。

        set hive.groupby.skewindata=true;

        當選項設定為true時,產生的查詢計劃有兩個MapReduce任務。在第一個MapReduce中,Map的輸出結果集合會隨機分布到Reduce中, 每個部分進行彙總操作,並輸出結果。這樣處理的結果是,相同的Group By Key有可能分發到不同的Reduce中,從而達到負載平衡的目的;第二個MapReduce任務再根據前置處理過的資料結果按照Group By Key分布到Reduce中(這個過程可以保證相同的Group By Key分布到同一個Reduce中),最後完成最終的彙總操作。

    • 如果兩個大表進行JOIN操作時,出現熱點,則使用熱點Key隨機化。

      例如,log表存在大量user_id為null的記錄,但是表bmw_users中不會存在user_id為空白,則可以把null隨機化再關聯,這樣就避免null值都分發到一個Reduce Task上。程式碼範例如下。

      SELECT * FROM log a LEFT OUTER 
      JOIN bmw_users b ON 
      CASE WHEN a.user_id IS NULL THEN CONCAT('dp_hive',RAND()) ELSE a.user_id=b.user_id END;
    • 如果大表和小表進行JOIN操作時,出現熱點,則使用MAP JOIN。

記憶體參數

您可以通過設定以下參數,對Map和Reduce階段的記憶體進行調優:

  • Map階段

    參數

    描述

    樣本

    mapreduce.map.java.opts

    預設參數,表示JVM堆記憶體。

    -Xmx2048m

    mapreduce.map.memory.mb

    預設參數,表示整個JVM進程佔用的記憶體,計算方法為堆記憶體+堆外記憶體=2048+256

    2304

  • Reduce階段

    參數

    描述

    樣本

    mapreduce.reduce.java.opts

    預設參數,表示JVM堆記憶體。

    -Xmx2048m

    mapreduce.reduce.memory.mb

    預設參數,表示整個JVM進程佔用的記憶體,計算方法為堆記憶體+堆外記憶體=2048+256

    2304

CPU參數

您可以通過設定以下參數,對Map和Reduce任務的CPU進行調優。

參數

描述

mapreduce.map.cpu.vcores

每個Map任務可用的最多的CPU Core數目。

mapreduce.reduce.cpu.vcores

每個Reduce任務可用的最多的CPU Core數目。

說明

此設定在公平隊列是不生效的,通常vCores用於較大的叢集,以限制不同使用者或應用程式的CPU。

Task數量最佳化

  • Map Task數量最佳化

    在分散式運算系統中,Map數量的決定因素之一是未經處理資料塊的數量,因為通常每個Task會讀取一個資料區塊;然而,這不是絕對的,當檔案數量多且檔案較小時,可以減少初始Map Task的數量以節約資源,而當檔案數量少但檔案較大時,可以增加Map Task的數量以減輕單個Task的負擔。

    通常,Map Task數量是由mapred.map.tasksmapred.min.split.sizedfs.block.size決定的。

    • Hive的檔案基本上都是儲存在HDFS上,而HDFS上的檔案,都是分塊的,所以具體的Hive資料檔案在HDFS上分多少塊,可能對應的是預設Hive起始的Task的數量,使用default_mapper_num參數表示。使用資料總大小除以dfs預設的最大塊大小來決定初始預設資料分區數。

      初始預設的Map Task數量,具體公式如下。

      default_mapper_num = total_size/dfs.block.size
    • 計算Split的size,具體公式如下。

      default_split_size = max(mapred.min.split.size, min(mapred.max.split.size, dfs.block.size))

      上述公式中的mapred.min.split.sizemapred.max.split.size,分別為Hive計算的時Split的最小值和最大值。

    • 將資料按照計算出來的size劃分為資料區塊,具體公式如下。

      split_num = total_size/default_split_size;
    • 計算的Map Task數量,具體公式如下。

      map_task_num = min(split_num, max(mapred.map.tasks, default_mapper_num))

      從上面的過程來看,Task的數量受各個方面的限制,不至於Task的數量太多,也不至於Task的數量太少。如果需要提高Task數量,就要降低mapred.min.split.size的數值,在一定的範圍內可以減小default_split_size的數值,從而增加split_num的數量,也可以增大mapred.map.tasks的數量。

      重要

      Hive on TEZ和Hive on MR使用是有差異的。例如,在Hive中執行一個Query時,可以發現Hive的執行引擎在使用Tez與MR時,兩者產生的mapper數量差異較大。主要原因在於Tez中對inputSplit做了grouping操作,可以將多個inputSplit組合成更少的groups,然後為每個group產生一個mapper任務,而不是為每個inputSplit產生一個mapper任務。

  • Reduce Task數量最佳化

    • 通過hive.exec.reducers.bytes.per.reducer參數控制單個Reduce處理的位元組數。

      Reduce的計算方法如下。

      reducer_num = min(total_size/hive.exec.reducers.bytes.per.reducers, hive.exec.reducers.max)。
    • 通過mapred.reduce.tasks參數來設定Reduce Task的數量。

      說明

      在TEZ引擎模式下,通過命令set hive.tez.auto.reducer.parallelism = true;,TEZ將會根據vertice的輸出大小動態預估調整Reduce Task的數量。

      同Map一樣,啟動和初始化Reduce也會消耗時間和資源。另外,有多少個Reduce,就會有多少個輸出檔案,如果產生了很多個小檔案,並且這些小檔案作為下一個任務的輸入,則會出現小檔案過多的問題。

並行運行

並行運行表示同步執行Hive的多個階段。Hive在執行過程中,將一個查詢轉化成一個或者多個階段。某個特定的Job可能包含多個階段,而這些階段可能並非完全相互依賴的,也就是可以並行啟動並執行,這樣可以使得整個Job的已耗用時間縮短。

您可以通過設定以下參數,控制不同的作業是否可以同時運行。

參數

描述

hive.exec.parallel

預設值為false。設定true時,表示允許任務並行運行。

hive.exec.parallel.thread.number

預設值為8。表示允許同時運行線程的最大值。

Fetch task

您可以通過設定以下參數,在執行查詢等語句時,不執行MapReduce程式,以減少等待時間。

參數

描述

hive.fetch.task.conversion

預設值為none。參數取值如下:

  • none:關閉Fetch task最佳化。

    在執行語句時,執行MapReduce程式。

  • minimal:只在SELECT、FILTER和LIMIT的語句上進行最佳化。

  • more:在minimal的基礎上更強大,SELECT不僅僅是查看,還可以單獨選擇列,FILTER也不再局限於分區欄位,同時支援虛擬列(別名)。

開啟向量化

您可以通過設定以下參數,在執行查詢時啟用向量化執行,以提升查詢效能。

參數

描述

hive.vectorized.execution.enabled

預設值為true。開啟向量化查詢的開關。

hive.vectorized.execution.reduce.enabled

預設值為true。表示是否啟用Reduce任務的向量化執行模式。

合并小檔案

大量小檔案容易在檔案儲存體端造成瓶頸,影響處理效率。對此,您可以通過合并Map和Reduce的結果檔案來處理。

您可以通過設定以下參數,合并小檔案。

參數

描述

hive.merge.mapfiles

預設值為true。表示是否合并Map輸出檔案。

hive.merge.mapredfiles

預設值為false。表示是否合并Reduce輸出檔案。

hive.merge.size.per.task

預設值為256000000,單位位元組。表示合并檔案的大小。

配置樣本

SET hive.merge.mapfiles = true;
SET hive.merge.mapredfiles = true;
SET hive.merge.size.per.task = 536870912; -- 512 MB