EMR Remote Shuffle Service (ESS) is an extension provided by E-MapReduce (EMR) to optimize shuffle operations of compute engines.

Background information

The current shuffle solution has the following disadvantages:
  • A data overflow occurs if a large amount of data exists in a shuffle write task. This causes write amplification.
  • A large number of small-size network packets exist in a shuffle read task. This causes connection reset.
  • A large number of small-size I/O requests and random reads exist in a shuffle read task. This causes high disk and CPU loads.
  • If thousands of mappers (M) and reducers (N) are used, a large number of connections are generated, which makes it difficult for jobs to run. The number of connections is calculated by using the following formula: M × N.
  • The Spark shuffle service runs on NodeManager. If the amount of data involved in shuffling is extremely large, NodeManager will be restarted. This affects the stability of YARN-based task scheduling.
The shuffle-based ESS service provided by EMR can optimize the shuffle solution. ESS has the following advantages:
  • Uses push-style shuffle, instead of pull-style shuffle, to reduce the memory pressure caused by mappers.
  • Supports I/O aggregation, reduces the number of connections in a shuffle read task from M × N to N, and converts random read to sequential read.
  • Uses a two-replica mechanism to reduce the probability of fetch failures.
  • Supports compute-storage separation. The shuffle service can be deployed in a separated hardware environment.
  • Eliminates the dependency on local disks when Spark on Kubernetes is used.
The following figure shows the architecture of ESS. ESS

Create a cluster

EMR V4.5.0 is used as an example. You can create a cluster with ESS deployed by using one of the following methods:
  • Create an EMR Shuffle Service cluster. SHUFFLE
  • Create an EMR Hadoop cluster. ESS

For more information about how to create a cluster, see Create a cluster.

Use ESS

If ESS is required when you use Spark, you must add the parameters described in the following table when you submit a Spark job. For more information about the parameter configurations, see Edit jobs.

For more information about Spark-related parameters, see Spark Configuration.

Parameter Description
spark.shuffle.manager Set this parameter to org.apache.spark.shuffle.ess.EssShuffleManager.
spark.ess.master.address Set this parameter in the format of <ess-master-ip>:<ess-master-port>.
where:
  • <ess-master-ip> specifies the public IP address of the master node.
  • <ess-master-port> specifies the port of the master node. Set it to the fixed value 9097.
spark.shuffle.service.enabled Set this parameter to false.

To use the ESS service provided by EMR, you must disable the original external shuffle service.

spark.shuffle.useOldFetchProtocol Set this parameter to true.

ESS is compatible with the original shuffle protocol.

spark.sql.adaptive.enabled Set this parameter to false.

ESS does not support adaptive execution.

spark.sql.adaptive.skewJoin.enabled

Parameters

You can view the settings of all parameters for ESS on the ESS service configuration page.
Parameter Description Default value
ess.push.data.replicate Specifies whether to enable the two-replica feature. Valid values:
  • true
  • false
Note We recommend that you enable the two-replica feature in the production environment.
true
ess.worker.flush.queue.capacity The number of flush buffers in each directory.
Note You can configure multiple disks to improve performance. To improve the overall read and write throughput, we recommend that you configure a maximum of two directories on each disk.

The heap memory used by flush buffers in each directory is calculated by using the following formula: ess.worker.flush.buffer.size × ess.worker.flush.queue.capacity. Example: 256 KB × 512 = 128 MB. The number of slots provided in each directory is half the value of this parameter. For example, a total number of 28 directories are configured. The overall memory that is used is 3.5 GB (128 MB × 28). The total number of slots is 7,168 (512 × 28/2).

512
ess.flush.timeout The timeout period of data flushing to the storage layer. Unit: seconds. 240s
ess.application.timeout The heartbeat timeout period of your application. Unit: seconds. After the heartbeat timeout period elapses, application-related resources are cleared. 240s
ess.worker.flush.buffer.size The size of a flush buffer. Unit: KB. If the size of a flush buffer exceeds the upper limit, flushing is triggered. 256k
ess.metrics.system.enable Specifies whether to enable monitoring. Valid values:
  • true
  • false
false
ess_worker_offheap_memory The size of off-heap memory of a core node. Unit: GB. 4g
ess_worker_memory The size of heap memory of a core node. Unit: GB. 4g
ess_master_memory The size of heap memory of the master node. Unit: GB. 4g