メモリ、vCPU リソース、およびタスク数を調整して、Hive ジョブを最適化できます。このトピックでは、Hive ジョブを最適化する方法について説明します。
最適化ソリューション
カテゴリ | 最適化ソリューション |
コードの最適化 | |
パラメーターの変更 |
コードの最適化
データのクレンジング
パーティション テーブルからデータを読み取る場合は、パーティションでデータをフィルタリングします。これにより、全表スキャンが回避されます。
JOIN 操作を実行する前にデータをフィルタリングします。
繰り返し計算を避けるため、中間テーブルを作成して、繰り返し使用される一時的な計算結果を格納します。
複数の DISTINCT オペレーターを含むコードの最適化
最適化前のコード
複数の DISTINCT オペレーターを使用すると、データの拡張が発生する可能性があります。
SELECT k, COUNT(DISTINCT CASE WHEN a > 1 THEN user_id END) user1, COUNT(DISTINCT CASE WHEN a > 2 THEN user_id END) user2, COUNT(DISTINCT CASE WHEN a > 3 THEN user_id END) user3, COUNT(DISTINCT CASE WHEN a > 4 THEN user_id END) user4 FROM t GROUP BY k;/*複数の DISTINCT オペレーターを使用すると、データの拡張が発生する可能性があります。*/最適化後のコード
2 つの GROUP BY 句を使用して DISTINCT オペレーターを置き換えます。内部クエリで 1 つの GROUP BY 句を使用してデータを重複排除し、データ量を削減します。外部クエリで別の GROUP BY 句を使用して合計を取得します。
SELECT k, SUM(CASE WHEN user1 > 0 THEN 1 ELSE 0 END) AS user1, SUM(CASE WHEN user2 > 0 THEN 1 ELSE 0 END) AS user2, SUM(CASE WHEN user3 > 0 THEN 1 ELSE 0 END) AS user3, SUM(CASE WHEN user4 > 0 THEN 1 ELSE 0 END) AS user4 FROM (SELECT k, user_id, COUNT(CASE WHEN a > 1 THEN user_id END) user1, COUNT(CASE WHEN a > 2 THEN user_id END) user2, COUNT(CASE WHEN a > 3 THEN user_id END) user3, COUNT(CASE WHEN a > 4 THEN user_id END) user4 FROM t GROUP BY k, user_id /*内部クエリで GROUP BY 句を使用してデータを重複排除し、データ量を削減します。*/ ) tmp GROUP BY k;/*外部クエリで GROUP BY 句を使用して合計を取得します。*/
GROUP BY 操作でのデータ スキューの処理
次のいずれかの方法を使用して、スキュー キーを処理します。
GROUP BY 操作でスキュー キーが表示される場合は、次の操作を実行します。
マップ ステージでの集約の有効化: Hive パラメーターを構成してマップ ステージでの集約を有効にし、reduce ステージに転送できるデータ量を削減します。
set hive.map.aggr=true; set hive.groupby.mapaggr.checkinterval=100000; --マップ ステージで集約されるデータ エントリの数。/*マップ ステージでの集約を有効にし、reduce ステージに転送できるデータ量を削減します。*/キーをランダムに分散し、複数回集約します。または、reducer を直接指定します。
set hive.groupby.skewindata=true;/*キーをランダムに分散し、複数回集約します。*/hive.groupby.skewindata を true に設定すると、生成されるクエリ プランには 2 つの MapReduce ジョブが含まれます。最初の MapReduce ジョブでは、map タスクの出力結果がランダムに reduce タスクに分散されます。次に、reduce タスクはデータを集約し、出力を生成します。これにより、GROUP BY 操作で同じキーを持つデータ エントリが異なる reduce タスクに分散され、負荷分散が実現されます。2 番目の MapReduce ジョブでは、前処理の結果がキーに基づいて reduce タスクに分散されます。これにより、GROUP BY 操作で同じキーを持つデータ エントリが同じ reduce タスクに分散され、最終的な集約結果が生成されます。
2 つの大きなテーブルを結合するときにスキュー キーが表示される場合は、スキュー キーをランダム化します。
たとえば、log と bmw_users という名前のテーブルが結合されているとします。テーブル log の user_id 列には多数の null 値が含まれており、テーブル bmw_users の user_id 列には null 値がありません。この場合、値を関連付ける前に null 値をランダム化できます。これにより、null 値が異なる reduce タスクに分散されます。例:
SELECT * FROM log a LEFT OUTER JOIN bmw_users b ON CASE WHEN a.user_id IS NULL THEN CONCAT('dp_hive',RAND()) ELSE a.user_id=b.user_id END;/*値を関連付ける前に null 値をランダム化できます。これにより、null 値が異なる reduce タスクに分散されます。*/小さなテーブルと大きなテーブルを結合するときに GROUP BY 操作でスキュー キーが表示される場合は、MAP JOIN を使用します。
メモリ パラメーターの変更
次の表に示すパラメーターを変更して、map ステージと reduce ステージのメモリ リソースを最適化できます。
Map ステージ
パラメーター
説明
例
mapreduce.map.java.opts
必須。Java 仮想マシン ( JVM ) ヒープ メモリ。
-Xmx2048m
mapreduce.map.memory.mb
必須。 JVM プロセスによって占有されるメモリ空間。次の式を使用して値を計算できます。ヒープ メモリ + 非ヒープ メモリ。例: 2048 + 256。
2304
Reduce ステージ
パラメーター
説明
例
mapreduce.reduce.java.opts
必須。 JVM ヒープ メモリ。
-Xmx2048m
mapreduce.reduce.memory.mb
必須。 JVM プロセスによって占有されるメモリ空間。次の式を使用して値を計算できます。ヒープ メモリ + 非ヒープ メモリ。例: 2048 + 256。
2304
vCPU パラメーターの変更
次の表に示すパラメーターを変更して、map ステージと reduce ステージの vCPU リソースを最適化できます。
パラメーター | 説明 |
mapreduce.map.cpu.vcores | 各 map タスクで使用できる vCPU の最大数。 |
mapreduce.reduce.cpu.vcores | 各 reduce タスクで使用できる vCPU の最大数。 説明 このパラメーターは、フェア キューイング シナリオでは有効になりません。ほとんどの場合、このパラメーターは、大規模クラスターでユーザーまたは アプリケーション が使用できる vCPU の数を制限するために使用されます。 |
タスク数の調整
map タスク数の調整
分散コンピューティング システム では、生データのデータ ブロックの数が、map タスクの数を決定する要因の 1 つです。ほとんどの場合、各 map タスクは 1 つのデータ ブロックを読み取ります。ただし、一部の シナリオ では手動調整が必要です。多数の小さなファイルが存在する場合は、map タスクの数を減らしてリソース使用率を向上させることができます。ファイルの数が少ないが各ファイルのサイズが大きい場合は、map タスクの数を増やして各 map タスクの ワークロード を削減できます。
map タスクの数を決定するパラメーターは、mapred.map.tasks、mapred.min.split.size、および dfs.block.size です。
ほとんどの Hive ファイルは HDFS に格納され、HDFS のすべてのファイルはデータ ブロックとして格納されます。したがって、Hive ファイルが分割されるデータ ブロックの数は、Hive ファイルの初期 map タスクの デフォルト の数と同じになる場合があります。default_mapper_num パラメーターは、初期 map タスクの デフォルト の数を指定します。合計データ サイズを HDFS の各データ ブロックの デフォルト の最大サイズで割ると、default_mapper_num パラメーターの値になります。
式:
default_mapper_num = total_size/dfs.block.sizeシステム は、次の式に基づいて デフォルト の分割サイズを計算します。
default_split_size = max(mapred.min.split.size, min(mapred.max.split.size, dfs.block.size))上記の式で、mapred.min.split.size は Hive ジョブの最小分割サイズを指定し、mapred.max.split.size は Hive ジョブの最大分割サイズを指定します。
システム は、次の式に基づいてデータをデータ ブロックに分割します。
split_num = total_size/default_split_size;システム は、次の式に基づいて map タスクの数を計算します。
map_task_num = min(split_num, max(mapred.map.tasks, default_mapper_num))この複雑な計算プロセスでは、さまざまな要因を使用して、map タスクの数が非常に大きくも小さくもならないようにします。map タスクの数を増やす場合は、mapred.min.split.size の値を小さくすることができます。これにより、default_split_size の値が小さくなり、さらに split_num の値が大きくなります。mapred.map.tasks の値を大きくすることもできます。
重要Hive on Tez ジョブと Hive on MapReduce ジョブでは、計算メカニズムが異なります。たとえば、Hive on Tez ジョブと Hive on MapReduce ジョブを使用して同じデータに対して同じクエリを実行した場合、ジョブによって生成される map タスクの数は大きく異なります。主な理由は、Tez が入力分割をグループに結合し、各入力分割ではなく各グループに対して 1 つの map タスクを生成するためです。
reduce タスク数の調整
hive.exec.reducers.bytes.per.reducer パラメーターを使用して、各 reduce タスクで処理されるバイト数を決定します。
システム は、次の式に基づいて reduce タスクの数を計算します。
reducer_num = min(total_size/hive.exec.reducers.bytes.per.reducers, hive.exec.reducers.max)mapred.reduce.tasks パラメーターを使用して、reduce タスクの数を指定します。
説明Tez エンジンを使用する場合は、
set hive.tez.auto.reducer.parallelism = true;コマンドを実行して、自動 reducer 並列処理を有効にすることができます。これにより、Tez は頂点の出力サイズに基づいて reduce タスクの数を動的に調整します。reduce タスクの開始と初期化の操作は、map タスクの開始と初期化の操作と同様に、時間とリソースを消費します。さらに、各 reduce タスクは 1 つの出力ファイルを生成します。多数の小さなファイルが生成され、他のタスクの入力として使用されると、過剰な数の小さなファイルがさらに生成されます。
異なるステージのタスクを並列実行する
ジョブの異なるステージのタスクを並列実行できます。Hive はクエリを 1 つ以上のステージに変換します。ジョブには複数のステージが含まれている場合があり、ステージは互いに独立している可能性があります。異なるステージの独立したタスクが並列実行される場合、ジョブの実行に使用される全体的な時間が短縮されます。
次の表に示すパラメーターを構成して、タスクの並列実行を有効にし、並列実行できるタスクの最大数を指定できます。
パラメーター | 説明 |
hive.exec.parallel | デフォルト値: false。hive.exec.parallel パラメーターを true に設定すると、異なるステージの独立したタスクを並列実行できます。 |
hive.exec.parallel.thread.number | デフォルト値: 8。並列実行できるスレッドの最大数。 |
フェッチ タスク
hive.fetch.task.conversion パラメーターを構成して、Hive クエリをフェッチ タスクに変換できます。これは、MapReduce プログラムの起動のオーバーヘッドを防ぎ、 文 の実行時間を短縮するのに役立ちます。
パラメーター | 説明 |
hive.fetch.task.conversion | デフォルト値: none。有効な値:
|
ベクトル化クエリ実行の有効化
次の表に示すパラメーターを構成して、ベクトル化クエリ実行を有効にすることができます。これは、クエリ パフォーマンスの向上に役立ちます。
パラメーター | 説明 |
hive.vectorized.execution.enabled | デフォルト値: true。値 true は、ベクトル化クエリ実行が有効になっていることを示します。 |
hive.vectorized.execution.reduce.enabled | デフォルト値: true。値 true は、reduce タスクに対してベクトル化クエリ実行が有効になっていることを示します。 |
小さなファイルのマージ
多数の小さなファイルが生成されると、ファイル ストレージ パフォーマンスとデータ処理効率に影響します。map タスクと reduce タスクの出力ファイルをマージして、小さなファイルの数を減らすことができます。
次の表に示すパラメーターを構成して、小さなファイルをマージできます。
パラメーター | 説明 |
hive.merge.mapfiles | デフォルト値: true。map タスクの出力ファイルをマージするかどうかを指定します。 |
hive.merge.mapredfiles | デフォルト値: false。reduce タスクの出力ファイルをマージするかどうかを指定します。 |
hive.merge.size.per.task | デフォルト値: 256000000。単位: バイト。小さなファイルがマージされるファイルのサイズ。 |
構成例
SET hive.merge.mapfiles = true;
SET hive.merge.mapredfiles = true;
SET hive.merge.size.per.task = 536870912; -- 512 MB/*小さなファイルをマージする例*/