Celeborn is a service that processes intermediate data, such as shuffle data and spill data. Celeborn can improve the stability, flexibility, and performance of big data compute engines. This topic describes how to use the Celeborn service.

Background information

The current shuffle solution has the following disadvantages:
  • A data overflow occurs if a large amount of data exists in a shuffle write task. This causes write amplification.
  • A large number of small-size network packets exist in a shuffle read task. This causes connection reset.
  • A large number of small-size I/O requests and random reads exist in a shuffle read task. This causes high disk and CPU loads.
  • If thousands of mappers (M) and reducers (N) are used, a large number of connections are generated, which makes it difficult for jobs to run. The number of connections is calculated by using the following formula: M × N.
  • The Spark shuffle service runs on NodeManager. If the amount of data involved in shuffling is extremely large, NodeManager will be restarted. This affects the stability of YARN-based task scheduling.
The Celeborn service can optimize the Shuffle solution. The Celeborn service has the following advantages:
  • Reduces the memory pressure that is caused by mappers by using push-style shuffle instead of pull-style shuffle.
  • Supports I/O aggregation, reduces the number of connections in a shuffle read task from M × N to N, and converts random read to sequential read.
  • Uses a two-replica mechanism to reduce the probability of fetch failures.
  • Supports compute-storage separation. The shuffle service can be deployed in a separated hardware environment.
  • Eliminates the dependency on local disks when Spark on Kubernetes is used.
The following figure shows the architecture of Celeborn. Celeborn


An E-MapReduce (EMR) DataLake cluster or a custom cluster is created, and the Celeborn service is selected for the cluster. For more information about how to create a cluster, see Create a cluster.


This topic applies only to the clusters described in the following table.

Cluster typeVersion
DataLake clusterEMR V3.45.0 or a later minor version, and EMR V5.11.0 or a later minor version
Custom clusterEMR V3.45.0 or a later minor version, and EMR V5.11.0 or a later minor version


Spark parameters

spark.shuffle.managerSet the value to org.apache.spark.shuffle.celeborn.RssShuffleManager.
spark.serializerSet the value to org.apache.spark.serializer.KryoSerializer.
spark.celeborn.push.replicate.enabledSpecifies whether to enable the two-replica feature. Valid values:
  • true: enables the two-replica feature. This is the default value.
  • false: disables the two-replica feature.
spark.shuffle.service.enabledChange the value of this parameter to false.

To use Celeborn, you must disable the external shuffle service. Celeborn does not affect the use of the dynamic allocation feature of Spark.

  • If you set the spark.shuffle.service.enabled parameter to true, Celeborn is not used.
  • Alibaba Cloud Spark and open source Spark 3.5 are adaptive to Celeborn.
spark.celeborn.shuffle.writerThe write mode of Celeborn.
  • hash: A large amount of memory is consumed when an excessively large number of partitions are processed in parallel. This is the default value.
  • sort: A fixed amount of memory is consumed when an excessively large number of partitions are processed in parallel. Partition processing is stable.
spark.celeborn.master.endpointsSpecify a value for this parameter in the <celeborn-master-ip>:<celeborn-master-port> format.
  • <celeborn-master-ip> specifies the public IP address of the master node.
  • <celeborn-master-port> specifies the port of the master node. Set the value to 9097.

If you create a high-availability cluster, we recommend that you configure the IP addresses of all master nodes.


You can enable adaptive execution for the Celeborn service. You can disable the local shuffle reader to ensure high shuffle performance.

You must set the spark.sql.adaptive.enabled parameter to true, the spark.sql.adaptive.localShuffleReader.enabled parameter to false, and the spark.sql.adaptive.skewJoin.enabled parameter to true.

You can enable the Celeborn service in Spark with one click. Spark Celeborn
Note If the Celeborn service is not deployed in your cluster, the options to enable and disable Celeborn are not displayed.
If the Celeborn service is deployed in your cluster, you can perform the following operations to enable or disable Celeborn in Spark: Go to the Status tab of the Spark service page in the EMR console. On the Status tab, find the SparkThriftServer component, move the pointer over the more icon in the Actions column, and then select enableCeleborn or disableCeleborn. After you click enableCeleborn or disableCeleborn, the Spark parameters that are described in the preceding table are automatically modified, and the SparkThriftServer component is restarted. The spark-defaults.conf and spark-thriftserver.conf configuration files are also automatically modified.
  • If you click enableCeleborn, all Spark jobs in the cluster use the Celeborn service.
  • If you click disableCeleborn, all Spark jobs in the cluster do not use the Celeborn service.

Celeborn parameters

You can view or modify the configurations of all Celeborn parameters on the Configure tab of the Celeborn service page. The following table describes the parameters.
Important The values of the parameters vary based on the node group.
ParameterDescriptionDefault value
celeborn.worker.flusher.threadsThe number of threads when data is written to a hard disk (HDD) or a solid-state disk (SSD) disk.
  • The default value for an HDD is 1.
  • The default value for an SSD is 8.
CELEBORN_WORKER_OFFHEAP_MEMORYThe size of the off-heap memory of a worker node. The default value is calculated based on the cluster settings.
celeborn.application.heartbeat.timeoutThe heartbeat timeout period of your application. Unit: seconds. After the heartbeat timeout period elapses, application-related resources are cleared. 120s
celeborn.worker.flusher.buffer.sizeThe size of a flush buffer. If the size of a flush buffer exceeds the upper limit, flushing is triggered. 256K
celeborn.metrics.enabledSpecifies whether to enable monitoring. Valid values:
  • true: enables monitoring.
  • false: disables monitoring.
CELEBORN_WORKER_MEMORYThe size of the heap memory of a core node. 1g
CELEBORN_MASTER_MEMORYThe size of the heap memory of the master node. 2g

Restart the CelebornMaster component

  1. On the Status tab of the Celeborn service page, find the CelebornMaster component, move the pointer over the more icon in the Actions column, and then select restart_clean_meta.
    Note If the cluster is a non-high-availability cluster, you can click Restart in the Actions column of the CelebornMaster component.
  2. In the dialog box that appears, turn off Rolling Execution, configure the Execution Reason parameter, and then click OK.
  3. In the Confirm message, click OK.