All Products
Search
Document Center

E-MapReduce:ESS (available only for existing users)

Last Updated:Jul 13, 2023

EMR Remote Shuffle Service (ESS) is an extension provided by E-MapReduce (EMR) to optimize shuffle operations of compute engines.

Background information

The current shuffle solution has the following disadvantages:

  • A data overflow occurs if a large amount of data exists in a shuffle write task. This causes write amplification.

  • A large number of small-size network packets exist in a shuffle read task. This causes connection reset.

  • A large number of small-size I/O requests and random reads exist in a shuffle read task. This causes high disk and CPU loads.

  • If thousands of mappers (M) and reducers (N) are used, a large number of connections are generated, which makes it difficult for jobs to run. The number of connections is calculated by using the following formula: M × N.

  • The Spark shuffle service runs on NodeManager. If the amount of data involved in shuffling is extremely large, NodeManager will be restarted. This affects the stability of YARN-based task scheduling.

The shuffle-based ESS service provided by EMR can optimize the shuffle solution. ESS has the following advantages:

  • Reduces the memory pressure that is caused by mappers by using push-style shuffle instead of pull-style shuffle.

  • Supports I/O aggregation, reduces the number of connections in a shuffle read task from M × N to N, and converts random read to sequential read.

  • Uses a two-replica mechanism to reduce the probability of fetch failures.

  • Supports compute-storage separation. The shuffle service can be deployed in a separated hardware environment.

  • Eliminates the dependency on local disks when Spark on Kubernetes is used.

The following figure shows the architecture of ESS. ESS

Limits

This topic applies only to EMR V4.X.X, a minor version earlier than EMR V3.39.1, or a minor version earlier than EMR V5.5.0. If you want to use ESS in EMR V3.39.1 or a later minor version, or EMR V5.5.0 or a later minor version, see RSS.

Create a cluster

EMR V4.5.0 is used as an example. You can create a cluster with ESS deployed by using one of the following methods:

  • Create an EMR Shuffle Service cluster. SHUFFLE

  • Create an EMR Hadoop cluster. ESS

For more information about how to create a cluster, see Create a cluster.

Use ESS

If ESS is required when you use Spark, you must add the parameters that are described in the following table when you submit a Spark job. For more information about the parameter configurations, see Edit jobs.

For more information about Spark-related parameters, see Spark Configuration.

Parameter

Description

spark.shuffle.manager

Set the value to org.apache.spark.shuffle.ess.EssShuffleManager.

spark.ess.master.address

Specify this parameter in the format of <ess-master-ip>:<ess-master-port>.

where:

  • <ess-master-ip> specifies the public IP address of the master node.

  • <ess-master-port> specifies the port of the master node. Set it to the fixed value 9097.

spark.shuffle.service.enabled

Set the value to false.

To use the ESS service provided by EMR, you must disable the original external shuffle service.

spark.shuffle.useOldFetchProtocol

Set the value to true.

ESS is compatible with the original shuffle protocol.

spark.sql.adaptive.enabled

Set the value to false.

ESS does not support adaptive execution.

spark.sql.adaptive.skewJoin.enabled

Parameters

You can view the settings of all parameters for ESS on the ESS service configuration page.

Parameter

Description

Default value

ess.push.data.replicate

Specifies whether to enable the two-replica feature. Valid values:

  • true: The two-replica feature is enabled.

  • false: The two-replica feature is disabled.

Note

We recommend that you enable the two-replica feature in the production environment.

true

ess.worker.flush.queue.capacity

The number of flush buffers in each directory.

Note

You can configure multiple disks to improve performance. To improve the overall read and write throughput, we recommend that you configure a maximum of two directories on each disk.

The heap memory used by flush buffers in each directory is calculated by using the following formula: ess.worker.flush.buffer.size × ess.worker.flush.queue.capacity. Example: 256 KB × 512 = 128 MB. The number of slots provided in each directory is half the value of this parameter. For example, a total number of 28 directories are configured. The total amount of memory that is used is 3.5 GB (128 MB × 28). The total number of slots is 7,168 (512 × 28/2).

512

ess.flush.timeout

The timeout period for flushing data to the storage layer. Unit: seconds.

240s

ess.application.timeout

The heartbeat timeout period of your application. Unit: seconds. After the heartbeat timeout period elapses, application-related resources are cleared.

240s

ess.worker.flush.buffer.size

The size of a flush buffer. Unit: KB. If the size of a flush buffer exceeds the upper limit, flushing is triggered.

256k

ess.metrics.system.enable

Specifies whether to enable monitoring. Valid values:

  • true: Enable monitoring.

  • false: Disable monitoring.

false

ess_worker_offheap_memory

The size of off-heap memory of a core node. Unit: GB.

4g

ess_worker_memory

The size of heap memory of a core node. Unit: GB.

4g

ess_master_memory

The size of heap memory of the master node. Unit: GB.

4g