All Products
Search
Document Center

E-MapReduce:Celeborn

Last Updated:Mar 26, 2026

When Spark jobs run large shuffles, the standard shuffle implementation can cause write amplification, connection storms, and NodeManager instability. Celeborn is a remote shuffle service that offloads intermediate shuffle data to a dedicated storage layer, replacing the standard shuffle with a push-based model that improves stability, reduces resource contention, and supports compute-storage separation.

How Celeborn works

The standard shuffle implementation has several limitations:

  • Write amplification: Large shuffle write tasks can cause data overflow, leading to write amplification.

  • Connection reset: Shuffle read tasks generate many small network packets, which can trigger connection resets.

  • High disk and CPU load: Shuffle read tasks produce many small I/O requests and random reads, increasing disk and CPU load.

  • Connection explosion: With thousands of mappers (M) and reducers (N), the number of connections grows to M x N, making jobs difficult to run.

  • YARN instability: When shuffle data volume is extremely large, NodeManager restarts, disrupting YARN-based task scheduling.

Celeborn addresses these issues by using a push-based shuffle model:

  • Reduces mapper memory pressure by using push-based shuffle instead of pull-based shuffle.

  • Supports I/O aggregation, reducing read-side connections from M x N to N and converting random reads to sequential reads.

  • Uses a two-replica mechanism to reduce fetch failures.

  • Supports compute-storage separation, so the shuffle service can run on dedicated hardware.

  • Eliminates dependency on local disks for Spark on Kubernetes workloads.

The following figure shows the Celeborn architecture.

Celeborn

Prerequisites

Before you begin, ensure that you have:

  • An E-MapReduce (EMR) DataLake cluster or a custom cluster with the Celeborn service selected. For more information, see Create a cluster.

Supported versions

Celeborn is supported on the following cluster types and versions:

Cluster Minimum version
DataLake cluster EMR V3.45.0 or later, EMR V5.11.0 or later
Custom cluster EMR V3.45.0 or later, EMR V5.11.0 or later

Enable Celeborn for Spark

You can enable Celeborn for Spark in two ways: through the EMR console with one click (recommended), or by manually setting Spark parameters.

Enable with one click (recommended)

For EMR V5.11.1 or later and EMR V3.45.1 or later:

On the Status tab of the Spark service page, go to the Service Overview section and turn on enableCeleborn.

For EMR V5.11.0 and EMR V3.45.0:

  1. On the Status tab of the Spark service page, go to the Components section and find the SparkThriftServer component.

  2. In the Actions column, move the pointer over the more icon and select enableCeleborn.

After you click enableCeleborn, EMR automatically updates the Spark parameters described in the next section, restarts the SparkThriftServer component, and modifies the spark-defaults.conf and spark-thriftserver.conf configuration files.

  • enableCeleborn: all Spark jobs in the cluster use Celeborn.

  • disableCeleborn: all Spark jobs in the cluster stop using Celeborn.

Configure Spark parameters manually

To enable Celeborn manually, add the following parameters to your Spark configuration:

# Shuffle manager — class name changed in Celeborn 0.4.0:
# Celeborn 0.3.x or earlier: org.apache.spark.shuffle.celeborn.RssShuffleManager
# Celeborn 0.4.x or later:  org.apache.spark.shuffle.celeborn.SparkShuffleManager
spark.shuffle.manager org.apache.spark.shuffle.celeborn.SparkShuffleManager

# Kryo serializer is required — Java serializer does not support relocation
spark.serializer org.apache.spark.serializer.KryoSerializer

# Disable the external shuffle service — required to use Celeborn
# Note: Setting this to true means Celeborn is NOT used.
# Celeborn does not affect Spark dynamic allocation.
spark.shuffle.service.enabled false

# Shuffle writer mode:
#   hash (default): higher memory usage when partition count is large
#   sort: fixed memory usage, better for workloads with many partitions
spark.celeborn.shuffle.writer hash

# Two-replica replication — reduces fetch failures
spark.celeborn.push.replicate.enabled true

# Celeborn master endpoint
# Format: <celeborn-master-ip>:9097 (use the public IP address of the master node)
# For high-availability clusters, specify all master node IPs
spark.celeborn.master.endpoints <celeborn-master-ip>:9097

# Adaptive Query Execution (AQE) settings
# Disable local shuffle reader for better Celeborn performance
spark.sql.adaptive.enabled true
spark.sql.adaptive.localShuffleReader.enabled false
spark.sql.adaptive.skewJoin.enabled true
Note

Alibaba Cloud Spark and open source Spark 3.5 are compatible with Celeborn.

Spark parameter reference

Parameter Description Default
spark.shuffle.manager Shuffle manager class. Use org.apache.spark.shuffle.celeborn.SparkShuffleManager for Celeborn 0.4.x or later, and org.apache.spark.shuffle.celeborn.RssShuffleManager for Celeborn 0.3.x or earlier.
spark.serializer Must be set to org.apache.spark.serializer.KryoSerializer.
spark.shuffle.service.enabled Must be set to false. If set to true, Celeborn is not used.
spark.celeborn.shuffle.writer Write mode. hash: variable memory proportional to partition count. sort: fixed memory, stable for large partition counts. hash
spark.celeborn.push.replicate.enabled Enables two-replica replication to reduce fetch failures. Valid values: true (default) and false. true
spark.celeborn.master.endpoints Celeborn master endpoint in the format <celeborn-master-ip>:9097, where <celeborn-master-ip> is the public IP address of the master node. For high-availability clusters, specify all master node IPs.
spark.sql.adaptive.enabled Enables Adaptive Query Execution (AQE).
spark.sql.adaptive.localShuffleReader.enabled Set to false to disable the local shuffle reader and get full Celeborn performance.
spark.sql.adaptive.skewJoin.enabled Enables skew join optimization in AQE.

Configure Celeborn parameters

View and modify Celeborn parameters on the Configure tab of the Celeborn service page.

Important

Parameter values vary by node group.

Parameter Description Default
celeborn.worker.flusher.threads Number of threads used when writing data to disk. HDD: 1, SSD: 8
CELEBORN_WORKER_OFFHEAP_MEMORY Off-heap memory size for each worker node. Calculated from cluster settings
celeborn.application.heartbeat.timeout Heartbeat timeout for your application. After this period elapses, application resources are released. 120s
celeborn.worker.flusher.buffer.size Flush buffer size. When the buffer reaches this size, flushing is triggered. 256K
celeborn.metrics.enabled Enables monitoring. true
CELEBORN_WORKER_MEMORY Heap memory size for each core node. 1g
CELEBORN_MASTER_MEMORY Heap memory size for the master node. 2g

Restart the CelebornMaster component

After modifying Celeborn parameters, restart the CelebornMaster component to apply the changes.

  1. On the Status tab of the Celeborn service page, find the CelebornMaster component. In the Actions column, move the pointer over the more icon and select restart_clean_meta.

    Note

    For non-high-availability clusters, click Restart in the Actions column instead.

  2. In the dialog box, turn off Rolling Execution, set the Execution Reason, and click OK.

  3. In the confirmation dialog box, click OK.