E-MapReduce (EMR) 上で Hive ジョブを最適化するには、コードパターンを調整し、メモリ、vCPU、タスク数パラメーターを調整します。
ジョブ最適化ソリューション
カテゴリ | 最適化 |
コードの最適化 | |
パラメーターチューニング | メモリ、vCPU、タスク数、並列実行、フェッチタスク、ベクトル化クエリ実行、小さいファイルの結合 |
コードの最適化
データのクレンジング
Hive がコストのかかる操作を実行する前に、処理するデータ量を削減します。
パーティションテーブルから読み込む際に、パーティションでフィルタリングします。これにより、全表スキャンを回避できます。
JOIN 操作の前にデータをフィルタリングし、後にはフィルタリングしません。
繰り返し使用される一時的な中間結果は中間テーブルに保存し、冗長な計算を回避します。
複数 DISTINCT オペレーターの置き換え
単一クエリ内の複数 DISTINCT オペレーターは、データ拡張を引き起こす可能性があります。これらを2つの GROUP BY 句に置き換えます。内部 GROUP BY で重複排除し、外部 GROUP BY で集約します。
最適化前
SELECT k,
COUNT(DISTINCT CASE WHEN a > 1 THEN user_id END) user1,
COUNT(DISTINCT CASE WHEN a > 2 THEN user_id END) user2,
COUNT(DISTINCT CASE WHEN a > 3 THEN user_id END) user3,
COUNT(DISTINCT CASE WHEN a > 4 THEN user_id END) user4
FROM t
GROUP BY k;最適化後
内部クエリは (k, user_id) でグループ化して重複排除し、データ量を削減します。外部クエリは k でグループ化して最終的なカウントを生成します。
SELECT k,
SUM(CASE WHEN user1 > 0 THEN 1 ELSE 0 END) AS user1,
SUM(CASE WHEN user2 > 0 THEN 1 ELSE 0 END) AS user2,
SUM(CASE WHEN user3 > 0 THEN 1 ELSE 0 END) AS user3,
SUM(CASE WHEN user4 > 0 THEN 1 ELSE 0 END) AS user4
FROM
(SELECT k,
user_id,
COUNT(CASE WHEN a > 1 THEN user_id END) user1,
COUNT(CASE WHEN a > 2 THEN user_id END) user2,
COUNT(CASE WHEN a > 3 THEN user_id END) user3,
COUNT(CASE WHEN a > 4 THEN user_id END) user4
FROM t
GROUP BY k, user_id
) tmp
GROUP BY k;GROUP BY 操作におけるデータスキューの処理
スキューしたキーを処理するには、以下のいずれかの方法を使用します。
GROUP BY におけるスキューしたキー
マップステージでの集約を有効にして、リデュースステージに転送されるデータを削減します。
set hive.map.aggr=true; set hive.groupby.mapaggr.checkinterval=100000; -- マップステージで集約されるエントリ数キーをランダムに分散し、複数パスで集約します。
hive.groupby.skewindataをtrueに設定すると、2つの MapReduce ジョブが生成されます。最初のジョブ: マップ出力はリデュースタスクにランダムに分散され、初期集約が行われます。同じ GROUP BY キーを持つデータエントリは、異なるリデュースタスクに送られる可能性があり、負荷分散が実現されます。
2番目のジョブ: 中間結果はキーによってリデュースタスクに分散されるため、同じ GROUP BY キーを持つデータエントリは最終集約のために同じリデュースタスクに送られます。
set hive.groupby.skewindata=true;
2つの大きなテーブルを結合する際のスキューしたキー
null またはスキューした値をランダム化して、異なるリデュースタスクに分散させます。たとえば、log テーブルに user_id の null 値が多く含まれており、bmw_users テーブルには含まれていない場合:
SELECT * FROM log a LEFT OUTER
JOIN bmw_users b ON
CASE WHEN a.user_id IS NULL THEN CONCAT('dp_hive',RAND()) ELSE a.user_id=b.user_id END;小さいテーブルと大きなテーブルを結合する際のスキューしたキー
MAP JOIN を使用して小さいテーブルをすべてのマップタスクにブロードキャストし、スキューが発生しやすいリデュースステージを回避します。
メモリパラメーターのチューニング
メモリ設定は、各マップまたはリデュースタスクが受け取るヒープメモリとプロセスメモリの量を制御します。メモリ不足はメモリ不足エラーを引き起こし、過剰なメモリはクラスターリソースを浪費します。
マップステージメモリ
| パラメーター | 説明 | 例 |
|---|---|---|
mapreduce.map.java.opts | 必須。マップタスク用の JVM ヒープメモリ。 | -Xmx2048m |
mapreduce.map.memory.mb | 必須。JVM プロセスメモリの合計 (ヒープ + 非ヒープ)。計算式: ヒープメモリ + 非ヒープメモリ。例: 2048 + 256。 | 2304 |
リデュースステージメモリ
| パラメーター | 説明 | 例 |
|---|---|---|
mapreduce.reduce.java.opts | 必須。リデュースタスク用の JVM ヒープメモリ。 | -Xmx2048m |
mapreduce.reduce.memory.mb | 必須。JVM プロセスメモリの合計 (ヒープ + 非ヒープ)。計算式: ヒープメモリ + 非ヒープメモリ。例: 2048 + 256。 | 2304 |
vCPU パラメーターのチューニング
| パラメーター | 説明 |
|---|---|
mapreduce.map.cpu.vcores | マップタスクあたりの最大 vCPU 数。 |
mapreduce.reduce.cpu.vcores | リデュースタスクあたりの最大 vCPU 数。 |
mapreduce.reduce.cpu.vcores はフェアキューイングシナリオでは効果がありません。このパラメーターは、主に大規模クラスターでユーザーまたはアプリケーションごとの vCPU 使用量を制限するために使用されます。
タスク数の調整
マップタスク
Hadoop 分散ファイルシステム (HDFS) では、各ファイルはデータブロックとして保存されます。データブロック数は、実行されるマップタスク数を決定する要因の1つです。ほとんどの場合、各マップタスクは1つのデータブロックを読み込みます。
小さなファイルが多すぎる場合: マップタスク数を削減して、リソース利用率を向上させます。
大きなファイルが少ない場合: マップタスク数を増加させて、タスクごとのワークロードを削減します。
主要パラメーターは mapred.map.tasks、mapred.min.split.size、および dfs.block.size です。
マップタスク数の計算方法
デフォルトマッパー数。 総データサイズを HDFS ブロックサイズで割った値:
default_mapper_num = total_size / dfs.block.sizeデフォルト分割サイズ。 最小および最大分割サイズ設定によって決定されます。
mapred.min.split.sizeは最小分割サイズであり、mapred.max.split.sizeは Hive ジョブの最大分割サイズです。default_split_size = max(mapred.min.split.size, min(mapred.max.split.size, dfs.block.size))分割数。 総データサイズをデフォルト分割サイズで割った値:
split_num = total_size / default_split_size最終マップタスク数:
map_task_num = min(split_num, max(mapred.map.tasks, default_mapper_num))
マップタスク数を増やすには、mapred.min.split.size を削減 (これにより default_split_size が下がり、split_num が上がります) するか、mapred.map.tasks を増加させます。
Hive on Tez と Hive on MapReduce は異なる計算メカニズムを使用します。同じデータに対する同じクエリでも、両方のエンジンは著しく異なるマップタスク数を生成します。Tez は入力分割をグループに結合し、入力分割ごとに1つのマップタスクではなく、グループごとに1つのマップタスクを生成します。
リデュースタスク
リデュースタスク数を制御する2つのアプローチがあります。
リデューサーあたりのバイト数を設定。
hive.exec.reducers.bytes.per.reducerを使用して、Hive にカウントを自動的に計算させます。reducer_num = min(total_size / hive.exec.reducers.bytes.per.reducer, hive.exec.reducers.max)カウントを直接設定。
mapred.reduce.tasksを使用して、明示的なリデュースタスク数を指定します。
Tez エンジンでは、自動リデューサー並列処理を有効にして、Tez が頂点出力サイズに基づいてリデュースタスク数を動的に調整できるようにします: set hive.tez.auto.reducer.parallelism = true;
リデュースタスクの開始と初期化には時間とリソースを消費します。各リデュースタスクは1つの出力ファイルを生成します。過剰なリデュースタスク数は多くの小さいファイルにつながり、これらのファイルがダウンストリームタスクの入力になるとカスケードする可能性があります。
ステージの並列実行
Hive はクエリを1つ以上のステージに変換します。ステージが互いに独立している場合、それらを並列実行することで、全体のジョブ実行時間を短縮できます。
| パラメーター | デフォルト | 説明 |
|---|---|---|
hive.exec.parallel | false | 独立したステージを並列実行するには、true に設定します。 |
hive.exec.parallel.thread.number | 8 | 並列実行されるスレッドの最大数。 |
クエリのフェッチタスクへの変換
フェッチタスクは、MapReduce ジョブを起動せずにクエリ結果を直接返し、実行時間を短縮します。
| パラメーター | デフォルト | 説明 |
|---|---|---|
hive.fetch.task.conversion | none | どのクエリをフェッチタスクに変換するかを制御します。有効な値: none、minimal、more。 |
有効な値:
none: クエリはフェッチタスクに変換されません。すべてのステートメントは MapReduce を起動します。
minimal: SELECT、FILTER、および LIMIT ステートメントのみがフェッチタスクを使用します。
more:
minimalの範囲に加えて、指定されたカラムでの SELECT、非パーティションキー列での FILTER、および仮想カラム (エイリアス) をサポートします。
ベクトル化クエリ実行の有効化
ベクトル化クエリ実行は、一度に1行ではなく行のバッチでデータを処理し、クエリパフォーマンスを向上させます。
| パラメーター | デフォルト | 説明 |
|---|---|---|
hive.vectorized.execution.enabled | true | ベクトル化クエリ実行を有効にします。 |
hive.vectorized.execution.reduce.enabled | true | リデュースタスクのベクトル化クエリ実行を有効にします。 |
小さいファイルの結合
多数の小さい出力ファイルは、ストレージパフォーマンスとデータ処理効率を低下させます。マップおよびリデュースタスクの出力ファイルを結合して、総ファイル数を削減します。
| パラメーター | デフォルト | 説明 |
|---|---|---|
hive.merge.mapfiles | true | マップタスクの出力ファイルを結合します。 |
hive.merge.mapredfiles | false | リデュースタスクの出力ファイルを結合します。 |
hive.merge.size.per.task | 256000000 (bytes) | 各結合ファイルのターゲットサイズ。 |
例:
SET hive.merge.mapfiles = true;
SET hive.merge.mapredfiles = true;
SET hive.merge.size.per.task = 536870912; -- 512 MB