EMR Remote Shuffle Service (ESS) is an extension provided by E-MapReduce (EMR) to optimize shuffle operations of computing engines.
- A data overflow occurs when a large amount of data exists in a shuffle write task. This causes write amplification.
- A large number of small-size network packets exist in a shuffle read task. This causes connection reset.
- A large number of small-size I/O requests and random reads exist in a shuffle read task. This causes high disk and CPU loads.
- When thousands of mappers (M) and reducers (N) are used, a large number of connections are generated, which makes it difficult for jobs to run. The number of connections is calculated by using the following formula: Number of connections = M × N.
- The Spark shuffle service runs on NodeManager. If the amount of data involved in shuffling is extremely large, NodeManager will be restarted. This affects the stability of YARN-based task scheduling.
- Uses push-style shuffle, instead of pull-style shuffle, to reduce the memory pressure caused by mappers.
- Supports I/O aggregation, reduces the number of connections in a shuffle read task from M × N to N, and converts random read to sequential read.
- Uses a two-replica mechanism to reduce the probability of fetch failures.
- Supports compute-storage separation. The shuffle service can be deployed in a separated hardware environment.
- Eliminates the dependency on local disks when Spark on Kubernetes is used.
Create a cluster
- Create an EMR Shuffle Service cluster, in which ESS is a required service.
- Create an EMR Hadoop cluster, in which ESS is a required service.
For more information about how to create a cluster, see Create a cluster.
|spark.shuffle.manager||This parameter is set to a fixed value org.apache.spark.shuffle.ess.EssShuffleManager.|
|spark.ess.master.address||This parameter is specified in the format of <ess-master-ip>:<ess-master-port>.
|ess.push.data.replicate||Specifies whether to enable the two-replica feature. Valid values:
Note We recommend that you enable the two-replica feature in the production environment.
|ess.worker.flush.queue.capacity||The number of flush buffers in each directory.
Note You can configure multiple disks to improve performance. To improve the overall read and write throughput, we recommend that you configure a maximum of two directories on each disk.
The heap memory used by flush buffers in each directory is calculated by using the
following formula: Used heap memory = ess.worker.flush.buffer.size × ess.worker.flush.queue.capacity, for example,
|ess.flush.timeout||The timeout period of data flushing to the storage layer. Unit: seconds.||240|
|ess.application.timeout||The heartbeat timeout period of your application. Unit: seconds. After the heartbeat timeout period elapses, application-related resources are cleared.||240|
|ess.worker.flush.buffer.size||The size of a flush buffer. Unit: KB. If the size of a flush buffer exceeds the upper limit, flushing is triggered.||256|
|ess.metrics.system.enable||Specifies whether to enable monitoring. Valid values:
|ess_worker_offheap_memory||The size of off-heap memory of a core node. Unit: GB.||4|
|ess_worker_memory||The size of heap memory of a core node. Unit: GB.||4|
|ess_master_memory||The size of heap memory of the master node. Unit: GB.||4|