EMR Remote Shuffle Service (ESS) is an extension provided by E-MapReduce (EMR) to optimize shuffle operations of computing engines.

Background information

The current shuffle solution has the following disadvantages:
  • A data overflow occurs when a large amount of data exists in a shuffle write task. This causes write amplification.
  • A large number of small-size network packets exist in a shuffle read task. This causes connection reset.
  • A large number of small-size I/O requests and random reads exist in a shuffle read task. This causes high disk and CPU loads.
  • When thousands of mappers (M) and reducers (N) are used, a large number of connections are generated, which makes it difficult for jobs to run. The number of connections is calculated by using the following formula: Number of connections = M × N.
  • The Spark shuffle service runs on NodeManager. If the amount of data involved in shuffling is extremely large, NodeManager will be restarted. This affects the stability of YARN-based task scheduling.
The shuffle-based ESS service provided by EMR can optimize the shuffle solution. ESS has the following advantages:
  • Uses push-style shuffle, instead of pull-style shuffle, to reduce the memory pressure caused by mappers.
  • Supports I/O aggregation, reduces the number of connections in a shuffle read task from M × N to N, and converts random read to sequential read.
  • Uses a two-replica mechanism to reduce the probability of fetch failures.
  • Supports compute-storage separation. The shuffle service can be deployed in a separated hardware environment.
  • Eliminates the dependency on local disks when Spark on Kubernetes is used.
The following figure shows the architecture of ESS.ESS

Create a cluster

EMR V4.5.0 is used as an example. You can create a cluster with ESS deployed by using one of the following methods:
  • Create an EMR Shuffle Service cluster, in which ESS is a required service.SHUFFLE
  • Create an EMR Hadoop cluster, in which ESS is a required service.ESS

For more information about how to create a cluster, see Create a cluster.

Use ESS

If ESS is required when you use Spark, you need only to add the parameters described in the following table when you submit a Spark job. For more information, see Edit jobs.
Parameter Description
spark.shuffle.manager This parameter is set to a fixed value org.apache.spark.shuffle.ess.EssShuffleManager.
spark.ess.master.address This parameter is specified in the format of <ess-master-ip>:<ess-master-port>.
where:
  • <ess-master-ip> specifies the public IP address of the master node.
  • <ess-master-port> is set to a fixed value 9097.

Parameters

You can view the settings of all parameters for ESS on the ESS service configuration page.
Parameter Description Default value
ess.push.data.replicate Specifies whether to enable the two-replica feature. Valid values:
  • true
  • false
Note We recommend that you enable the two-replica feature in the production environment.
true
ess.worker.flush.queue.capacity The number of flush buffers in each directory.
Note You can configure multiple disks to improve performance. To improve the overall read and write throughput, we recommend that you configure a maximum of two directories on each disk.

The heap memory used by flush buffers in each directory is calculated by using the following formula: Used heap memory = ess.worker.flush.buffer.size × ess.worker.flush.queue.capacity, for example, 256 KB × 512 = 128 MB. The number of slots provided in each directory is half the value of this parameter. For example, a total of 28 directories are configured. The overall memory that is used is 3.5 GB (128 MB × 28). The total number of slots is 7168 (512 × 28/2).

512
ess.flush.timeout The timeout period of data flushing to the storage layer. Unit: seconds. 240
ess.application.timeout The heartbeat timeout period of your application. Unit: seconds. After the heartbeat timeout period elapses, application-related resources are cleared. 240
ess.worker.flush.buffer.size The size of a flush buffer. Unit: KB. If the size of a flush buffer exceeds the upper limit, flushing is triggered. 256
ess.metrics.system.enable Specifies whether to enable monitoring. Valid values:
  • true
  • false
false
ess_worker_offheap_memory The size of off-heap memory of a core node. Unit: GB. 4
ess_worker_memory The size of heap memory of a core node. Unit: GB. 4
ess_master_memory The size of heap memory of the master node. Unit: GB. 4