EMR Remote Shuffle Service (ESS) is an extension provided by E-MapReduce (EMR) to optimize shuffle operations of compute engines.
- A data overflow occurs if a large amount of data exists in a shuffle write task. This causes write amplification.
- A large number of small-size network packets exist in a shuffle read task. This causes connection reset.
- A large number of small-size I/O requests and random reads exist in a shuffle read task. This causes high disk and CPU loads.
- If thousands of mappers (M) and reducers (N) are used, a large number of connections are generated, which makes it difficult for jobs to run. The number of connections is calculated by using the following formula: M × N.
- The Spark shuffle service runs on NodeManager. If the amount of data involved in shuffling is extremely large, NodeManager will be restarted. This affects the stability of YARN-based task scheduling.
- Uses push-style shuffle, instead of pull-style shuffle, to reduce the memory pressure caused by mappers.
- Supports I/O aggregation, reduces the number of connections in a shuffle read task from M × N to N, and converts random read to sequential read.
- Uses a two-replica mechanism to reduce the probability of fetch failures.
- Supports compute-storage separation. The shuffle service can be deployed in a separated hardware environment.
- Eliminates the dependency on local disks when Spark on Kubernetes is used.
Create a cluster
- Create an EMR Shuffle Service cluster.
- Create an EMR Hadoop cluster.
For more information about how to create a cluster, see Create a cluster.
If ESS is required when you use Spark, you must add the parameters described in the following table when you submit a Spark job. For more information about the parameter configurations, see Edit jobs.
For more information about Spark-related parameters, see Spark Configuration.
|spark.shuffle.manager||Set this parameter to org.apache.spark.shuffle.ess.EssShuffleManager.|
|spark.ess.master.address||Set this parameter in the format of <ess-master-ip>:<ess-master-port>.
|spark.shuffle.service.enabled||Set this parameter to false.
To use the ESS service provided by EMR, you must disable the original external shuffle service.
|spark.shuffle.useOldFetchProtocol||Set this parameter to true.
ESS is compatible with the original shuffle protocol.
|spark.sql.adaptive.enabled||Set this parameter to false.
ESS does not support adaptive execution.
|ess.push.data.replicate||Specifies whether to enable the two-replica feature. Valid values:
Note We recommend that you enable the two-replica feature in the production environment.
|ess.worker.flush.queue.capacity||The number of flush buffers in each directory.
Note You can configure multiple disks to improve performance. To improve the overall read and write throughput, we recommend that you configure a maximum of two directories on each disk.
The heap memory used by flush buffers in each directory is calculated by using the following formula: ess.worker.flush.buffer.size × ess.worker.flush.queue.capacity. Example:
|ess.flush.timeout||The timeout period of data flushing to the storage layer. Unit: seconds.||240s|
|ess.application.timeout||The heartbeat timeout period of your application. Unit: seconds. After the heartbeat timeout period elapses, application-related resources are cleared.||240s|
|ess.worker.flush.buffer.size||The size of a flush buffer. Unit: KB. If the size of a flush buffer exceeds the upper limit, flushing is triggered.||256k|
|ess.metrics.system.enable||Specifies whether to enable monitoring. Valid values:
|ess_worker_offheap_memory||The size of off-heap memory of a core node. Unit: GB.||4g|
|ess_worker_memory||The size of heap memory of a core node. Unit: GB.||4g|
|ess_master_memory||The size of heap memory of the master node. Unit: GB.||4g|