Apache Celeborn is an intermediate data service open-sourced by Alibaba Cloud. It helps improve the performance, stability, and flexibility of big data compute engines. Celeborn is independent of specific engines, supports Spark, Flink, MapReduce, and Tez, and is the most popular implementation of a Remote Shuffle Service (RSS).
Architecture
Celeborn adopts a core design based on push-based shuffle and partition data aggregation. It offloads shuffle data to remote storage and supports various throttling and isolation strategies, addressing issues of the traditional shuffle mechanism, such as low I/O efficiency, low resource elasticity, and poor overall stability. The following figure shows the architecture of Celeborn. The Celeborn cluster on the left is composed of a highly available multi-master architecture and shuffle workers. The engine side on the right shows that a Celeborn cluster can support a variety of compute engines.
Core features
Celeborn not only aims for high performance and reliability, but also makes optimizations in maintainability, observability, and stability.
Core performance
Push-based shuffle and partition data aggregation: eliminates the random-read issue of traditional shuffle and improves data processing efficiency.
Full-link asynchronous processing: implements all data push, refresh, fetch, and submission processes asynchronously, further improving system performance.
Small packet merging: merges small packets to reduce the overhead of frequent small packet transmissions in the network, significantly improving data transmission efficiency.
Stability
High-availability (HA) architecture: supports a multi-master HA design, capable of tolerating master node failures. Shuffle data is stored with dual replicas to ensure continued operation even in the event of a single machine failure.
Throttling: uses a TCP slowstart-like throttling policy combined with Celeborn worker memory management to prevent workers from being overwhelmed by excessive traffic.
Data skew prevention: avoids overly large individual partition files, successfully mitigating data skew issues in Celeborn workers.
Fault isolation: actively isolates affected shuffle workers when facing excessive node pressure or disk faults. This ensures that the isolated resources are no longer used in subsequent shuffle operations until the issue is resolved.
Quota control: allows you to limit the amount of data to be shuffled. This prevents the impact of abnormal jobs on the stability of clusters.
Observability
Comprehensive monitoring metrics: supports a wide range of master and worker performance metrics, including RPC, data push, fetch, memory usage, and disk I/O metrics. These metrics help O&M personnel assess the status and issues of clusters at the earliest opportunity.
Maintainability
O&M APIs: provides a series of RESTful APIs that support graceful shutdown of shuffle workers, cleanup of abnormal applications, and querying of cluster worker information. This enhances O&M and management efficiency.
Rolling upgrade: implements the rolling upgrade feature, which allows for upgrades without impacting the stability of running jobs. The system can automatically handle potential exceptions during the upgrade by using mechanisms such as retries and partition recovery.
Spark features
Celeborn supports the following key features of Spark:
Adaptive Query Execution (AQE): When handling skewed partitions in Sub-Reducers, Celeborn automatically sorts the data and supports reading partition data based on the Map range of Spark.
Stage recomputation: During the reading of shuffle data, if data loss or shuffle worker exceptions occur, Celeborn can proactively trigger the recomputation of Spark stages to ensure the smooth execution of the job.
Dynamic resource allocation: Shuffle data is stored on the Celeborn server, allowing Spark executors to scale elastically as needed. This way, computing resources are decoupled from shuffle storage resources.
Enterprise-level features
The Apache Celeborn service is integrated into E-MapReduce (EMR) Serverless Spark. Users do not need to be aware of the shuffle service. Based on the community version of Celeborn, Celeborn in EMR Serverless Spark provides multiple enterprise-level features:
Data encryption and isolation
Celeborn supports encrypting shuffle data. Once encrypted, the shuffle data is only accessible to the specific Spark job, and no other means can decrypt the data.
Celeborn supports multi-tenancy. The server authenticates the tenant identity and implements isolated storage and access of shuffle data based on the tenant identity. Data between different tenants is invisible to each other.
Enhanced shuffle quota control
Celeborn implements enhanced control over quotas for the number of application tasks, RPC QPS, tenant-level shuffle size, and tenant application-level shuffle size. This prevents abnormal jobs from disrupting the stability of the Celeborn cluster.
Auto scaling
The Celeborn server performs auto scaling based on the cluster status to adjust the number of Celeborn shuffle workers. This ensures that the overall shuffle performance of the cluster is not affected by the amount of shuffle data or the concurrency of applications on the engine side.
AQE shuffle read performance optimization
Serverless Spark, in combination with AQE, optimizes data read operations by converting the data read method from MapRange to Partition Split. This eliminates the I/O overhead caused by additional index file creation and sorting, thereby further improving shuffle read efficiency in data skew scenarios.
Cross-zone HA
Multiple sets of shuffle services constitute a primary/secondary cluster. In the event of a disaster, the proxy can automatically switch to a healthy instance, ensuring high availability of the service.
Comparison of community-version capabilities and enterprise-level features
Feature | Community-version capability | Enterprise-level feature |
HA architecture | ✔ | ✔ |
Throttling | ✔ | ✔ |
Sort/Hash-based partition writer | ✔ | ✔ |
PushMerge | ✔ | ✔ |
Partition data splitting | ✔ | ✔ |
Asynchronous processing | ✔ | ✔ |
Quota | ✔ | ✔ |
Support for multiple engines, including Tez, Spark, Flink, and MapReduce | ✔ | ✔ |
AQE | ✔ | ✔ |
Rolling upgrade | ✔ | ✔ |
Fault isolation | ✔ | ✔ |
Multi-tier storage | ✔ | ✔ |
Shuffle data encryption and isolation | - | ✔ |
Shuffle quota control | - | ✔ |
Auto scaling | - | ✔ |
AQE optimization | - | ✔ |
In-memory storage | - | ✔ |
Cross-zone HA | - | ✔ |