すべてのプロダクト
Search
ドキュメントセンター

E-MapReduce:Hive ジョブの最適化

最終更新日:Mar 01, 2026

E-MapReduce (EMR) 上で Hive ジョブを最適化するには、コードパターンを調整し、メモリ、vCPU、タスク数パラメーターを調整します。

ジョブ最適化ソリューション

カテゴリ

最適化

コードの最適化

データクレンジング、複数 DISTINCT の置き換え、データスキューの処理

パラメーターチューニング

メモリ、vCPU、タスク数、並列実行、フェッチタスク、ベクトル化クエリ実行、小さいファイルの結合

コードの最適化

データのクレンジング

Hive がコストのかかる操作を実行する前に、処理するデータ量を削減します。

  1. パーティションテーブルから読み込む際に、パーティションでフィルタリングします。これにより、全表スキャンを回避できます。

  2. JOIN 操作の前にデータをフィルタリングし、後にはフィルタリングしません。

  3. 繰り返し使用される一時的な中間結果は中間テーブルに保存し、冗長な計算を回避します。

複数 DISTINCT オペレーターの置き換え

単一クエリ内の複数 DISTINCT オペレーターは、データ拡張を引き起こす可能性があります。これらを2つの GROUP BY 句に置き換えます。内部 GROUP BY で重複排除し、外部 GROUP BY で集約します。

最適化前

SELECT k,
       COUNT(DISTINCT CASE WHEN a > 1 THEN user_id END) user1,
       COUNT(DISTINCT CASE WHEN a > 2 THEN user_id END) user2,
       COUNT(DISTINCT CASE WHEN a > 3 THEN user_id END) user3,
       COUNT(DISTINCT CASE WHEN a > 4 THEN user_id END) user4
FROM t
GROUP BY k;

最適化後

内部クエリは (k, user_id) でグループ化して重複排除し、データ量を削減します。外部クエリは k でグループ化して最終的なカウントを生成します。

SELECT k,
       SUM(CASE WHEN user1 > 0 THEN 1 ELSE 0 END) AS user1,
       SUM(CASE WHEN user2 > 0 THEN 1 ELSE 0 END) AS user2,
       SUM(CASE WHEN user3 > 0 THEN 1 ELSE 0 END) AS user3,
       SUM(CASE WHEN user4 > 0 THEN 1 ELSE 0 END) AS user4
FROM
        (SELECT k,
                user_id,
                COUNT(CASE WHEN a > 1 THEN user_id END) user1,
                COUNT(CASE WHEN a > 2 THEN user_id END) user2,
                COUNT(CASE WHEN a > 3 THEN user_id END) user3,
                COUNT(CASE WHEN a > 4 THEN user_id END) user4
        FROM t
        GROUP BY k, user_id
        ) tmp
GROUP BY k;

GROUP BY 操作におけるデータスキューの処理

スキューしたキーを処理するには、以下のいずれかの方法を使用します。

GROUP BY におけるスキューしたキー

  1. マップステージでの集約を有効にして、リデュースステージに転送されるデータを削減します。

       set hive.map.aggr=true;
       set hive.groupby.mapaggr.checkinterval=100000;  -- マップステージで集約されるエントリ数
  2. キーをランダムに分散し、複数パスで集約します。hive.groupby.skewindatatrue に設定すると、2つの MapReduce ジョブが生成されます。

    • 最初のジョブ: マップ出力はリデュースタスクにランダムに分散され、初期集約が行われます。同じ GROUP BY キーを持つデータエントリは、異なるリデュースタスクに送られる可能性があり、負荷分散が実現されます。

    • 2番目のジョブ: 中間結果はキーによってリデュースタスクに分散されるため、同じ GROUP BY キーを持つデータエントリは最終集約のために同じリデュースタスクに送られます。

       set hive.groupby.skewindata=true;

2つの大きなテーブルを結合する際のスキューしたキー

null またはスキューした値をランダム化して、異なるリデュースタスクに分散させます。たとえば、log テーブルに user_id の null 値が多く含まれており、bmw_users テーブルには含まれていない場合:

SELECT * FROM log a LEFT OUTER
JOIN bmw_users b ON
CASE WHEN a.user_id IS NULL THEN CONCAT('dp_hive',RAND()) ELSE a.user_id=b.user_id END;

小さいテーブルと大きなテーブルを結合する際のスキューしたキー

MAP JOIN を使用して小さいテーブルをすべてのマップタスクにブロードキャストし、スキューが発生しやすいリデュースステージを回避します。

メモリパラメーターのチューニング

メモリ設定は、各マップまたはリデュースタスクが受け取るヒープメモリとプロセスメモリの量を制御します。メモリ不足はメモリ不足エラーを引き起こし、過剰なメモリはクラスターリソースを浪費します。

マップステージメモリ

パラメーター説明
mapreduce.map.java.opts必須。マップタスク用の JVM ヒープメモリ。-Xmx2048m
mapreduce.map.memory.mb必須。JVM プロセスメモリの合計 (ヒープ + 非ヒープ)。計算式: ヒープメモリ + 非ヒープメモリ。例: 2048 + 256。2304

リデュースステージメモリ

パラメーター説明
mapreduce.reduce.java.opts必須。リデュースタスク用の JVM ヒープメモリ。-Xmx2048m
mapreduce.reduce.memory.mb必須。JVM プロセスメモリの合計 (ヒープ + 非ヒープ)。計算式: ヒープメモリ + 非ヒープメモリ。例: 2048 + 256。2304

vCPU パラメーターのチューニング

パラメーター説明
mapreduce.map.cpu.vcoresマップタスクあたりの最大 vCPU 数。
mapreduce.reduce.cpu.vcoresリデュースタスクあたりの最大 vCPU 数。
説明

mapreduce.reduce.cpu.vcores はフェアキューイングシナリオでは効果がありません。このパラメーターは、主に大規模クラスターでユーザーまたはアプリケーションごとの vCPU 使用量を制限するために使用されます。

タスク数の調整

マップタスク

Hadoop 分散ファイルシステム (HDFS) では、各ファイルはデータブロックとして保存されます。データブロック数は、実行されるマップタスク数を決定する要因の1つです。ほとんどの場合、各マップタスクは1つのデータブロックを読み込みます。

  • 小さなファイルが多すぎる場合: マップタスク数を削減して、リソース利用率を向上させます。

  • 大きなファイルが少ない場合: マップタスク数を増加させて、タスクごとのワークロードを削減します。

主要パラメーターは mapred.map.tasksmapred.min.split.size、および dfs.block.size です。

マップタスク数の計算方法

  1. デフォルトマッパー数。 総データサイズを HDFS ブロックサイズで割った値:

       default_mapper_num = total_size / dfs.block.size
  2. デフォルト分割サイズ。 最小および最大分割サイズ設定によって決定されます。mapred.min.split.size は最小分割サイズであり、mapred.max.split.size は Hive ジョブの最大分割サイズです。

       default_split_size = max(mapred.min.split.size, min(mapred.max.split.size, dfs.block.size))
  3. 分割数。 総データサイズをデフォルト分割サイズで割った値:

       split_num = total_size / default_split_size
  4. 最終マップタスク数:

       map_task_num = min(split_num, max(mapred.map.tasks, default_mapper_num))

マップタスク数を増やすには、mapred.min.split.size を削減 (これにより default_split_size が下がり、split_num が上がります) するか、mapred.map.tasks を増加させます。

重要

Hive on Tez と Hive on MapReduce は異なる計算メカニズムを使用します。同じデータに対する同じクエリでも、両方のエンジンは著しく異なるマップタスク数を生成します。Tez は入力分割をグループに結合し、入力分割ごとに1つのマップタスクではなく、グループごとに1つのマップタスクを生成します。

リデュースタスク

リデュースタスク数を制御する2つのアプローチがあります。

  • リデューサーあたりのバイト数を設定。 hive.exec.reducers.bytes.per.reducer を使用して、Hive にカウントを自動的に計算させます。

      reducer_num = min(total_size / hive.exec.reducers.bytes.per.reducer, hive.exec.reducers.max)
  • カウントを直接設定。 mapred.reduce.tasks を使用して、明示的なリデュースタスク数を指定します。

説明

Tez エンジンでは、自動リデューサー並列処理を有効にして、Tez が頂点出力サイズに基づいてリデュースタスク数を動的に調整できるようにします: set hive.tez.auto.reducer.parallelism = true;

リデュースタスクの開始と初期化には時間とリソースを消費します。各リデュースタスクは1つの出力ファイルを生成します。過剰なリデュースタスク数は多くの小さいファイルにつながり、これらのファイルがダウンストリームタスクの入力になるとカスケードする可能性があります。

ステージの並列実行

Hive はクエリを1つ以上のステージに変換します。ステージが互いに独立している場合、それらを並列実行することで、全体のジョブ実行時間を短縮できます。

パラメーターデフォルト説明
hive.exec.parallelfalse独立したステージを並列実行するには、true に設定します。
hive.exec.parallel.thread.number8並列実行されるスレッドの最大数。

クエリのフェッチタスクへの変換

フェッチタスクは、MapReduce ジョブを起動せずにクエリ結果を直接返し、実行時間を短縮します。

パラメーターデフォルト説明
hive.fetch.task.conversionnoneどのクエリをフェッチタスクに変換するかを制御します。有効な値: noneminimalmore

有効な値:

  • none: クエリはフェッチタスクに変換されません。すべてのステートメントは MapReduce を起動します。

  • minimal: SELECT、FILTER、および LIMIT ステートメントのみがフェッチタスクを使用します。

  • more: minimal の範囲に加えて、指定されたカラムでの SELECT、非パーティションキー列での FILTER、および仮想カラム (エイリアス) をサポートします。

ベクトル化クエリ実行の有効化

ベクトル化クエリ実行は、一度に1行ではなく行のバッチでデータを処理し、クエリパフォーマンスを向上させます。

パラメーターデフォルト説明
hive.vectorized.execution.enabledtrueベクトル化クエリ実行を有効にします。
hive.vectorized.execution.reduce.enabledtrueリデュースタスクのベクトル化クエリ実行を有効にします。

小さいファイルの結合

多数の小さい出力ファイルは、ストレージパフォーマンスとデータ処理効率を低下させます。マップおよびリデュースタスクの出力ファイルを結合して、総ファイル数を削減します。

パラメーターデフォルト説明
hive.merge.mapfilestrueマップタスクの出力ファイルを結合します。
hive.merge.mapredfilesfalseリデュースタスクの出力ファイルを結合します。
hive.merge.size.per.task256000000 (bytes)各結合ファイルのターゲットサイズ。

例:

SET hive.merge.mapfiles = true;
SET hive.merge.mapredfiles = true;
SET hive.merge.size.per.task = 536870912; -- 512 MB