EMR Remote Shuffle Service (ESS) adalah ekstensi yang disediakan oleh E-MapReduce (EMR) untuk mengoptimalkan operasi shuffle dari mesin komputasi.
Informasi latar belakang
Solusi shuffle saat ini memiliki kekurangan berikut:
Terjadi overflow data jika ada sejumlah besar data dalam tugas penulisan shuffle, menyebabkan amplifikasi tulis.
Sejumlah besar paket jaringan berukuran kecil dalam tugas pembacaan shuffle dapat menyebabkan reset koneksi.
Sejumlah besar permintaan I/O berukuran kecil dan pembacaan acak dalam tugas pembacaan shuffle menyebabkan beban disk dan CPU yang tinggi.
Jika ribuan mapper (M) dan reducer (N) digunakan, sejumlah besar koneksi dihasilkan, membuat pekerjaan sulit dijalankan. Jumlah koneksi dihitung menggunakan rumus berikut: M × N.
Layanan shuffle Spark berjalan pada NodeManager. Jika jumlah data yang terlibat dalam pengacakan sangat besar, NodeManager akan di-restart, memengaruhi stabilitas penjadwalan tugas berbasis YARN.
Layanan ESS berbasis shuffle yang disediakan oleh EMR dapat mengoptimalkan solusi shuffle. ESS memiliki keunggulan berikut:
Mengurangi tekanan memori yang disebabkan oleh mapper dengan menggunakan shuffle gaya push alih-alih shuffle gaya pull.
Mendukung agregasi I/O, mengurangi jumlah koneksi dalam tugas pembacaan shuffle dari M × N menjadi N, serta mengubah pembacaan acak menjadi pembacaan berurutan.
Menggunakan mekanisme dua replika untuk mengurangi kemungkinan kegagalan fetch.
Mendukung pemisahan komputasi-penyimpanan. Layanan shuffle dapat diterapkan di lingkungan perangkat keras yang terpisah.
Menghilangkan ketergantungan pada disk lokal saat menggunakan Spark on Kubernetes.
Gambar berikut menunjukkan arsitektur ESS. 
Batasan
Topik ini hanya berlaku untuk EMR V4.X.X, versi minor lebih awal dari EMR V3.39.1, atau versi minor lebih awal dari EMR V5.5.0. Jika Anda ingin menggunakan ESS di EMR V3.39.1 atau versi minor lebih baru, atau EMR V5.5.0 atau versi minor lebih baru, lihat RSS.
Buat kluster
EMR V4.5.0 digunakan sebagai contoh. Anda dapat membuat kluster dengan ESS yang diterapkan menggunakan salah satu metode berikut:
Buat kluster EMR Shuffle Service.

Buat kluster EMR Hadoop.

Untuk informasi lebih lanjut tentang cara membuat kluster, lihat Buat Kluster.
Gunakan ESS
Jika ESS diperlukan saat Anda menggunakan Spark, Anda harus menambahkan parameter yang dijelaskan dalam tabel berikut saat Anda mengirimkan pekerjaan Spark. Untuk informasi lebih lanjut tentang konfigurasi parameter, lihat Edit Jobs.
Untuk informasi lebih lanjut tentang parameter terkait Spark, lihat Konfigurasi Spark.
Parameter | Deskripsi |
spark.shuffle.manager | Atur nilainya menjadi org.apache.spark.shuffle.ess.EssShuffleManager. |
spark.ess.master.address | Tentukan parameter ini dalam format <ess-master-ip>:<ess-master-port>. di mana:
|
spark.shuffle.service.enabled | Atur nilainya menjadi false. Untuk menggunakan layanan ESS yang disediakan oleh EMR, Anda harus menonaktifkan layanan shuffle eksternal asli. |
spark.shuffle.useOldFetchProtocol | Atur nilainya menjadi true. ESS kompatibel dengan protokol shuffle asli. |
spark.sql.adaptive.enabled | Atur nilainya menjadi false. ESS tidak mendukung eksekusi adaptif. |
spark.sql.adaptive.skewJoin.enabled |
Parameter
Anda dapat melihat pengaturan semua parameter untuk ESS di halaman konfigurasi layanan ESS.
Parameter | Deskripsi | Nilai default |
ess.push.data.replicate | Menentukan apakah fitur dua replika diaktifkan. Nilai valid:
Catatan Kami merekomendasikan Anda mengaktifkan fitur dua replika di lingkungan produksi. | true |
ess.worker.flush.queue.capacity | Jumlah buffer flush di setiap direktori. Catatan Anda dapat mengonfigurasi beberapa disk untuk meningkatkan performa. Untuk meningkatkan throughput baca dan tulis secara keseluruhan, kami merekomendasikan Anda mengonfigurasi maksimal dua direktori di setiap disk. Memori heap yang digunakan oleh buffer flush di setiap direktori dihitung dengan menggunakan rumus berikut: ess.worker.flush.buffer.size × ess.worker.flush.queue.capacity. Contoh: | 512 |
ess.flush.timeout | Periode timeout untuk flush data ke lapisan penyimpanan. Unit: detik. | 240s |
ess.application.timeout | Periode timeout denyut jantung aplikasi Anda. Unit: detik. Setelah periode timeout denyut jantung berakhir, sumber daya terkait aplikasi dibersihkan. | 240s |
ess.worker.flush.buffer.size | Ukuran buffer flush. Unit: KB. Jika ukuran buffer flush melebihi batas atas, flushing dipicu. | 256k |
ess.metrics.system.enable | Menentukan apakah pemantauan diaktifkan. Nilai valid:
| false |
ess_worker_offheap_memory | Ukuran memori off-heap dari node inti. Unit: GB. | 4g |
ess_worker_memory | Ukuran memori heap dari node inti. Unit: GB. | 4g |
ess_master_memory | Ukuran memori heap dari Node master. Unit: GB. | 4g |