您可以通過調整記憶體、CPU和Task個數等,實現對Hive作業的調優。本文為您介紹如何調優Hive作業。
作業調優方案
作業調優方向 | 調優方案 |
代碼最佳化 | |
參數調優 |
代碼最佳化
資料清洗
讀取表時分區過濾,避免全表掃描。
資料過濾之後再JOIN。
重複使用資料時,避免重複計算,構建中間表,重複使用中間表。
多DISTINCT最佳化
最佳化前代碼
多個DISTINCT時,資料會出現膨脹。
SELECT k, COUNT(DISTINCT CASE WHEN a > 1 THEN user_id END) user1, COUNT(DISTINCT CASE WHEN a > 2 THEN user_id END) user2, COUNT(DISTINCT CASE WHEN a > 3 THEN user_id END) user3, COUNT(DISTINCT CASE WHEN a > 4 THEN user_id END) user4 FROM t GROUP BY k;最佳化後代碼
通過兩次GROUP BY的方式代替DISTINCT操作,通過內層的GROUP BY去重並降低資料量,通過外層的GROUP BY取SUM,即可實現DISTINCT的效果。
SELECT k, SUM(CASE WHEN user1 > 0 THEN 1 ELSE 0 END) AS user1, SUM(CASE WHEN user2 > 0 THEN 1 ELSE 0 END) AS user2, SUM(CASE WHEN user3 > 0 THEN 1 ELSE 0 END) AS user3, SUM(CASE WHEN user4 > 0 THEN 1 ELSE 0 END) AS user4 FROM (SELECT k, user_id, COUNT(CASE WHEN a > 1 THEN user_id END) user1, COUNT(CASE WHEN a > 2 THEN user_id END) user2, COUNT(CASE WHEN a > 3 THEN user_id END) user3, COUNT(CASE WHEN a > 4 THEN user_id END) user4 FROM t GROUP BY k, user_id ) tmp GROUP BY k;
GROUP BY 操作中的資料扭曲
熱點Key處理方式如下:
當執行GROUP BY操作時如果發現存在熱點索引值(即某些索引值對應的記錄數遠超其他),可以通過以下步驟來最佳化:
啟用Map端彙總:首先,通過設定Hive參數開啟Map階段的部分彙總功能,減少傳輸到Reduce階段的資料量。
set hive.map.aggr=true; set hive.groupby.mapaggr.checkinterval=100000; -- 用於設定Map端進行彙總操作的條目數可以對Key隨機化打散,多次彙總,或者直接設定。
set hive.groupby.skewindata=true;當選項設定為true時,產生的查詢計劃有兩個MapReduce任務。在第一個MapReduce中,Map的輸出結果集合會隨機分布到Reduce中, 每個部分進行彙總操作,並輸出結果。這樣處理的結果是,相同的Group By Key有可能分發到不同的Reduce中,從而達到負載平衡的目的;第二個MapReduce任務再根據前置處理過的資料結果按照Group By Key分布到Reduce中(這個過程可以保證相同的Group By Key分布到同一個Reduce中),最後完成最終的彙總操作。
如果兩個大表進行JOIN操作時,出現熱點,則使用熱點Key隨機化。
例如,log表存在大量user_id為null的記錄,但是表bmw_users中不會存在user_id為空白,則可以把null隨機化再關聯,這樣就避免null值都分發到一個Reduce Task上。程式碼範例如下。
SELECT * FROM log a LEFT OUTER JOIN bmw_users b ON CASE WHEN a.user_id IS NULL THEN CONCAT('dp_hive',RAND()) ELSE a.user_id=b.user_id END;如果大表和小表進行JOIN操作時,出現熱點,則使用MAP JOIN。
記憶體參數
您可以通過設定以下參數,對Map和Reduce階段的記憶體進行調優:
Map階段
參數
描述
樣本
mapreduce.map.java.opts
預設參數,表示JVM堆記憶體。
-Xmx2048m
mapreduce.map.memory.mb
預設參數,表示整個JVM進程佔用的記憶體,計算方法為
堆記憶體+堆外記憶體=2048+256。2304
Reduce階段
參數
描述
樣本
mapreduce.reduce.java.opts
預設參數,表示JVM堆記憶體。
-Xmx2048m
mapreduce.reduce.memory.mb
預設參數,表示整個JVM進程佔用的記憶體,計算方法為
堆記憶體+堆外記憶體=2048+256。2304
CPU參數
您可以通過設定以下參數,對Map和Reduce任務的CPU進行調優。
參數 | 描述 |
mapreduce.map.cpu.vcores | 每個Map任務可用的最多的CPU Core數目。 |
mapreduce.reduce.cpu.vcores | 每個Reduce任務可用的最多的CPU Core數目。 說明 此設定在公平隊列是不生效的,通常vCores用於較大的叢集,以限制不同使用者或應用程式的CPU。 |
Task數量最佳化
Map Task數量最佳化
在分散式運算系統中,Map數量的決定因素之一是未經處理資料塊的數量,因為通常每個Task會讀取一個資料區塊;然而,這不是絕對的,當檔案數量多且檔案較小時,可以減少初始Map Task的數量以節約資源,而當檔案數量少但檔案較大時,可以增加Map Task的數量以減輕單個Task的負擔。
通常,Map Task數量是由mapred.map.tasks、mapred.min.split.size和dfs.block.size決定的。
Hive的檔案基本上都是儲存在HDFS上,而HDFS上的檔案,都是分塊的,所以具體的Hive資料檔案在HDFS上分多少塊,可能對應的是預設Hive起始的Task的數量,使用default_mapper_num參數表示。使用資料總大小除以dfs預設的最大塊大小來決定初始預設資料分區數。
初始預設的Map Task數量,具體公式如下。
default_mapper_num = total_size/dfs.block.size計算Split的size,具體公式如下。
default_split_size = max(mapred.min.split.size, min(mapred.max.split.size, dfs.block.size))上述公式中的mapred.min.split.size和mapred.max.split.size,分別為Hive計算的時Split的最小值和最大值。
將資料按照計算出來的size劃分為資料區塊,具體公式如下。
split_num = total_size/default_split_size;計算的Map Task數量,具體公式如下。
map_task_num = min(split_num, max(mapred.map.tasks, default_mapper_num))從上面的過程來看,Task的數量受各個方面的限制,不至於Task的數量太多,也不至於Task的數量太少。如果需要提高Task數量,就要降低mapred.min.split.size的數值,在一定的範圍內可以減小default_split_size的數值,從而增加split_num的數量,也可以增大mapred.map.tasks的數量。
重要Hive on TEZ和Hive on MR使用是有差異的。例如,在Hive中執行一個Query時,可以發現Hive的執行引擎在使用Tez與MR時,兩者產生的mapper數量差異較大。主要原因在於Tez中對inputSplit做了grouping操作,可以將多個inputSplit組合成更少的groups,然後為每個group產生一個mapper任務,而不是為每個inputSplit產生一個mapper任務。
Reduce Task數量最佳化
通過hive.exec.reducers.bytes.per.reducer參數控制單個Reduce處理的位元組數。
Reduce的計算方法如下。
reducer_num = min(total_size/hive.exec.reducers.bytes.per.reducers, hive.exec.reducers.max)。通過mapred.reduce.tasks參數來設定Reduce Task的數量。
說明在TEZ引擎模式下,通過命令
set hive.tez.auto.reducer.parallelism = true;,TEZ將會根據vertice的輸出大小動態預估調整Reduce Task的數量。同Map一樣,啟動和初始化Reduce也會消耗時間和資源。另外,有多少個Reduce,就會有多少個輸出檔案,如果產生了很多個小檔案,並且這些小檔案作為下一個任務的輸入,則會出現小檔案過多的問題。
並行運行
並行運行表示同步執行Hive的多個階段。Hive在執行過程中,將一個查詢轉化成一個或者多個階段。某個特定的Job可能包含多個階段,而這些階段可能並非完全相互依賴的,也就是可以並行啟動並執行,這樣可以使得整個Job的已耗用時間縮短。
您可以通過設定以下參數,控制不同的作業是否可以同時運行。
參數 | 描述 |
hive.exec.parallel | 預設值為false。設定true時,表示允許任務並行運行。 |
hive.exec.parallel.thread.number | 預設值為8。表示允許同時運行線程的最大值。 |
Fetch task
您可以通過設定以下參數,在執行查詢等語句時,不執行MapReduce程式,以減少等待時間。
參數 | 描述 |
hive.fetch.task.conversion | 預設值為none。參數取值如下:
|
開啟向量化
您可以通過設定以下參數,在執行查詢時啟用向量化執行,以提升查詢效能。
參數 | 描述 |
hive.vectorized.execution.enabled | 預設值為true。開啟向量化查詢的開關。 |
hive.vectorized.execution.reduce.enabled | 預設值為true。表示是否啟用Reduce任務的向量化執行模式。 |
合并小檔案
大量小檔案容易在檔案儲存體端造成瓶頸,影響處理效率。對此,您可以通過合并Map和Reduce的結果檔案來處理。
您可以通過設定以下參數,合并小檔案。
參數 | 描述 |
hive.merge.mapfiles | 預設值為true。表示是否合并Map輸出檔案。 |
hive.merge.mapredfiles | 預設值為false。表示是否合并Reduce輸出檔案。 |
hive.merge.size.per.task | 預設值為256000000,單位位元組。表示合并檔案的大小。 |
配置樣本
SET hive.merge.mapfiles = true;
SET hive.merge.mapredfiles = true;
SET hive.merge.size.per.task = 536870912; -- 512 MB