Optimalkan pekerjaan Hive pada E-MapReduce (EMR) dengan menyesuaikan pola kode serta mengatur parameter memori, vCPU, dan jumlah task.
Solusi optimasi pekerjaan
Kategori | Optimasi |
Optimasi kode | Pembersihan data, penggantian multiple DISTINCT, penanganan data skew |
Penyetelan parameter | Memori, vCPU, jumlah task, eksekusi paralel, fetch task, eksekusi kueri tervektorisasi, penggabungan file kecil |
Optimalkan kode
Bersihkan data
Kurangi volume data yang diproses oleh Hive sebelum menjalankan operasi yang mahal.
Filter berdasarkan partisi saat membaca dari tabel partisi untuk menghindari full table scan.
Filter data sebelum operasi JOIN, bukan sesudahnya.
Simpan hasil sementara yang sering digunakan dalam tabel antara untuk menghindari komputasi berulang.
Ganti operator DISTINCT ganda
Penggunaan operator DISTINCT ganda dalam satu kueri dapat menyebabkan ekspansi data. Gantilah dengan dua klausa GROUP BY: GROUP BY dalam untuk menghapus duplikat dan GROUP BY luar untuk agregasi.
Sebelum optimasi
SELECT k,
COUNT(DISTINCT CASE WHEN a > 1 THEN user_id END) user1,
COUNT(DISTINCT CASE WHEN a > 2 THEN user_id END) user2,
COUNT(DISTINCT CASE WHEN a > 3 THEN user_id END) user3,
COUNT(DISTINCT CASE WHEN a > 4 THEN user_id END) user4
FROM t
GROUP BY k;Setelah optimasi
Kueri dalam melakukan grouping berdasarkan (k, user_id) untuk menghapus duplikat, sehingga mengurangi volume data. Kueri luar melakukan grouping berdasarkan k untuk menghasilkan jumlah akhir.
SELECT k,
SUM(CASE WHEN user1 > 0 THEN 1 ELSE 0 END) AS user1,
SUM(CASE WHEN user2 > 0 THEN 1 ELSE 0 END) AS user2,
SUM(CASE WHEN user3 > 0 THEN 1 ELSE 0 END) AS user3,
SUM(CASE WHEN user4 > 0 THEN 1 ELSE 0 END) AS user4
FROM
(SELECT k,
user_id,
COUNT(CASE WHEN a > 1 THEN user_id END) user1,
COUNT(CASE WHEN a > 2 THEN user_id END) user2,
COUNT(CASE WHEN a > 3 THEN user_id END) user3,
COUNT(CASE WHEN a > 4 THEN user_id END) user4
FROM t
GROUP BY k, user_id
) tmp
GROUP BY k;Tangani data skew dalam operasi GROUP BY
Gunakan salah satu metode berikut untuk menangani kunci yang miring (skewed).
Kunci yang tidak seimbang dalam GROUP BY
Aktifkan agregasi pada tahap map untuk mengurangi data yang ditransfer ke tahap reduce:
set hive.map.aggr=true; set hive.groupby.mapaggr.checkinterval=100000; -- Jumlah entri yang diagregasi di tahap mapDistribusikan kunci secara acak dan lakukan agregasi dalam beberapa tahap: Menyetel
hive.groupby.skewindataketrueakan menghasilkan dua pekerjaan MapReduce:Pekerjaan pertama: Output map didistribusikan secara acak ke task reduce untuk agregasi awal. Entri data dengan kunci GROUP BY yang sama dapat masuk ke task reduce yang berbeda, sehingga mencapai penyeimbangan beban.
Pekerjaan kedua: Hasil antara didistribusikan ke task reduce berdasarkan kunci, sehingga entri data dengan kunci GROUP BY yang sama masuk ke task reduce yang sama untuk agregasi akhir.
set hive.groupby.skewindata=true;
Kunci miring saat menggabungkan dua tabel besar
Acak nilai null atau nilai miring agar tersebar di berbagai task reduce. Misalnya, jika tabel log berisi banyak nilai null pada kolom user_id, sedangkan tabel bmw_users tidak:
SELECT * FROM log a LEFT OUTER
JOIN bmw_users b ON
CASE WHEN a.user_id IS NULL THEN CONCAT('dp_hive',RAND()) ELSE a.user_id=b.user_id END;Kunci miring saat menggabungkan tabel kecil dengan tabel besar
Gunakan MAP JOIN untuk menyiarkan (broadcast) tabel kecil ke semua task map, sehingga menghindari tahap reduce yang rentan terhadap kemiringan.
Atur parameter memori
Pengaturan memori mengontrol jumlah memori heap dan proses yang diterima setiap task map atau reduce. Memori yang tidak mencukupi menyebabkan error out-of-memory; memori berlebihan membuang resource kluster.
Memori tahap map
| Parameter | Deskripsi | Contoh |
|---|---|---|
mapreduce.map.java.opts | Wajib. Memori heap JVM untuk task map. | -Xmx2048m |
mapreduce.map.memory.mb | Wajib. Total memori proses JVM (heap + non-heap). Rumus: memori heap + memori non-heap. Contoh: 2048 + 256. | 2304 |
Memori tahap reduce
| Parameter | Deskripsi | Contoh |
|---|---|---|
mapreduce.reduce.java.opts | Wajib. Memori heap JVM untuk task reduce. | -Xmx2048m |
mapreduce.reduce.memory.mb | Wajib. Total memori proses JVM (heap + non-heap). Rumus: memori heap + memori non-heap. Contoh: 2048 + 256. | 2304 |
Atur parameter vCPU
| Parameter | Deskripsi |
|---|---|
mapreduce.map.cpu.vcores | Jumlah maksimum vCPU per task map. |
mapreduce.reduce.cpu.vcores | Jumlah maksimum vCPU per task reduce. |
mapreduce.reduce.cpu.vcores tidak berlaku dalam skenario antrian fair (fair queuing). Parameter ini terutama digunakan untuk membatasi penggunaan vCPU per pengguna atau aplikasi dalam kluster besar.
Atur jumlah task
Tugas Pemetaan
Dalam Hadoop Distributed File System (HDFS), setiap file disimpan sebagai blok data. Jumlah blok data merupakan salah satu faktor yang menentukan berapa banyak task map yang dijalankan. Dalam kebanyakan kasus, setiap task map membaca satu blok data.
Terlalu banyak file kecil: Kurangi jumlah task map untuk meningkatkan pemanfaatan resource.
Sedikit file besar: Tingkatkan jumlah task map untuk mengurangi beban kerja per task.
Parameter utama adalah mapred.map.tasks, mapred.min.split.size, dan dfs.block.size.
Cara menghitung jumlah task map
Jumlah mapper default. Ukuran total data dibagi ukuran blok HDFS:
default_mapper_num = total_size / dfs.block.sizeUkuran split default. Ditentukan oleh pengaturan ukuran split minimum dan maksimum:
mapred.min.split.sizeadalah ukuran split minimum danmapred.max.split.sizeadalah ukuran split maksimum untuk pekerjaan Hive.default_split_size = max(mapred.min.split.size, min(mapred.max.split.size, dfs.block.size))Jumlah split. Ukuran total data dibagi ukuran split default:
split_num = total_size / default_split_sizeJumlah akhir task map:
map_task_num = min(split_num, max(mapred.map.tasks, default_mapper_num))
Untuk meningkatkan jumlah task map, kurangi mapred.min.split.size (yang menurunkan default_split_size dan meningkatkan split_num) atau tingkatkan mapred.map.tasks.
Hive on Tez dan Hive on MapReduce menggunakan mekanisme komputasi yang berbeda. Untuk kueri yang sama pada data yang sama, kedua engine menghasilkan jumlah task map yang sangat berbeda. Tez menggabungkan input split menjadi kelompok dan menghasilkan satu task map per kelompok, bukan satu task map per input split.
Mengurangi Tugas
Dua pendekatan mengontrol jumlah task reduce:
Tetapkan byte per reducer. Gunakan
hive.exec.reducers.bytes.per.reduceragar Hive menghitung jumlah secara otomatis:reducer_num = min(total_size / hive.exec.reducers.bytes.per.reducer, hive.exec.reducers.max)Tetapkan jumlah secara langsung. Gunakan
mapred.reduce.tasksuntuk menentukan jumlah eksplisit task reduce.
Dengan engine Tez, aktifkan auto reducer parallelism agar Tez secara dinamis menyesuaikan jumlah task reduce berdasarkan ukuran output vertex: set hive.tez.auto.reducer.parallelism = true;
Menjalankan dan menginisialisasi task reduce membutuhkan waktu dan resource. Setiap task reduce menghasilkan satu file output. Jumlah task reduce yang berlebihan menghasilkan banyak file kecil, yang dapat berdampak buruk ketika file tersebut menjadi input untuk task selanjutnya.
Jalankan tahapan secara paralel
Hive menerjemahkan kueri menjadi satu atau beberapa tahapan. Ketika tahapan tidak saling bergantung, menjalankannya secara paralel mengurangi waktu eksekusi keseluruhan pekerjaan.
| Parameter | Default | Deskripsi |
|---|---|---|
hive.exec.parallel | false | Setel ke true untuk menjalankan tahapan independen secara paralel. |
hive.exec.parallel.thread.number | 8 | Jumlah maksimum thread yang berjalan secara paralel. |
Konversi kueri menjadi fetch task
Fetch task mengembalikan hasil kueri secara langsung tanpa meluncurkan pekerjaan MapReduce, sehingga mengurangi waktu eksekusi.
| Parameter | Default | Deskripsi |
|---|---|---|
hive.fetch.task.conversion | none | Mengontrol kueri mana yang dikonversi menjadi fetch task. Nilai yang valid: none, minimal, more. |
Nilai yang valid:
none: Tidak ada kueri yang dikonversi menjadi fetch task. Semua pernyataan menjalankan MapReduce.
minimal: Hanya pernyataan SELECT, FILTER, dan LIMIT yang menggunakan fetch task.
more: Selain cakupan
minimal, mendukung SELECT pada kolom tertentu, FILTER pada kolom non-partition key, dan kolom virtual (alias).
Aktifkan eksekusi kueri tervektorisasi
Eksekusi kueri tervektorisasi memproses data dalam batch baris daripada satu baris dalam satu waktu, sehingga meningkatkan performa kueri.
| Parameter | Default | Deskripsi |
|---|---|---|
hive.vectorized.execution.enabled | true | Aktifkan eksekusi kueri tervektorisasi. |
hive.vectorized.execution.reduce.enabled | true | Aktifkan eksekusi kueri tervektorisasi untuk task reduce. |
Gabungkan file kecil
Jumlah besar file output kecil menurunkan performa penyimpanan dan efisiensi pemrosesan data. Gabungkan file output dari task map dan reduce untuk mengurangi jumlah total file.
| Parameter | Default | Deskripsi |
|---|---|---|
hive.merge.mapfiles | true | Gabungkan file output task map. |
hive.merge.mapredfiles | false | Gabungkan file output task reduce. |
hive.merge.size.per.task | 256000000 (byte) | Ukuran target setiap file yang digabung. |
Contoh:
SET hive.merge.mapfiles = true;
SET hive.merge.mapredfiles = true;
SET hive.merge.size.per.task = 536870912; -- 512 MB