Spark+Celeborn: faster, more stable, more flexible

1. Problems with traditional Shuffle
Apache Spark is a popular big data processing engine, and it has many usage scenarios: Spark SQL, batch processing, stream processing, MLLIB, GraphX, etc. Under all components is a unified RDD abstraction, and RDD blood relationship is described by two kinds of dependencies, narrow dependencies and wide dependencies. Among them, wide dependency is the key to supporting complex operators (Join, Agg, etc.), and the wide dependency implementation mechanism is Shuffle.
The traditional Shuffle implementation is shown in the middle part of the figure above. Each Mapper sorts the Shuffle Output data according to the Partition ID, and then writes the sorted data and indexes to the local disk. In the Shuffle Read phase, Reducer reads its own Partition data from all Mapper Shuffle files. But this implementation has the following drawbacks:
• First, rely on large-capacity local disks or cloud disks to store Shuffle data, and the data needs to reside until consumption is complete. This limits the separation of storage and computing, because under the architecture of separation of storage and computing, computing nodes usually do not want to have a large-capacity local disk, and hope that the node can be released after the calculation is completed.
• Second, Mapper sorting takes up a lot of memory, and even triggers off-heap sorting, which introduces additional disk IO.
• Third, Shuffle Read has a large number of network connections, and the number of logical connections is m×n.
• Fourth, there are a large number of random reads. Assuming that the shuffle data of a Mapper is 128M, and the concurrency of the Reducer is 2000, then each file will be read 2000 times, and only 64k is randomly read each time, which can easily reach the bottleneck of disk IOPS.
• Fifth, there is a single copy of data, and the fault tolerance is not high.
The above five deficiencies eventually lead to insufficient efficiency, insufficient stability, and insufficient flexibility.
2. Apache Celeborn (Incubating)
Apache Celeborn (Incubating) is a Remote Shuffle Service developed by our team early to solve the above problems, and it has been donated to the Apache Foundation in October 2022. Celeborn is positioned as a unified intermediate data service for big data engines. It is engine-independent, and in addition to supporting Shuffle, it will also support Spilled data in the future, so that computing nodes can truly relieve their dependence on large-capacity local disks.
Before formally introducing the Celeborn design, a little history. Celeborn was first born in 2020, when it was called Remote Shuffle Service, mainly to meet customer needs, and it was officially open-sourced in December 2021.
We will officially enter the Apache incubator in October 2022. Up to now, we have accumulated 600+ commits, 32 contributors, and 330+ stars. We also hope that more interested developers will participate in the joint construction.
3. The performance, stability and flexibility of Celeborn
Celeborn's design for performance improvement mainly includes core design, how to connect to Spark AQE, columnar Shuffle, and multi-tier storage.
1. Performance
Celeborn adopts the core design of Push Shuffle + Partition data aggregation. To put it simply, each Mapper maintains a Buffer inside to cache Shuffle data. When the Buffer exceeds the threshold, a push is triggered, and the Mapper pushes the data belonging to the same Partition to the pre-allocated Worker.
As shown in the figure above, the data of Partition1 and Partition2 are pushed to Worker1, the data of Partition3 is pushed to Worker2, and each Partition will eventually generate a file. In the Shuffle Read phase, the Reducer only needs to read its own data from a Worker. Under this design, Shuffle data is not placed on the disk, and no sorting is required. At the same time, Shuffle Read is converted from random read to sequential read, and the number of network connections has also changed from a multiplier relationship to a linear relationship. This solves the main drawback of the traditional Shuffle.
The design motivation of Partition segmentation is that for large jobs or data with data skew, a Partition file will become very large. When we encounter a situation where a single Partition exceeds 100 G, it is easy to blow up the disk, and it will also cause the disk load to be unbalanced.
For this situation, Celeborn implements Partition segmentation. Specifically, Worker will dynamically monitor the size of each Partition file, and will return a Split mark to Client when the threshold is exceeded. After the Client receives the Split token, it will apply for a new Worker asynchronously, and when the new Worker is Ready, the Client will push data to the new Worker. This can ensure that the split file of a single Partition will not be too large, and the two split files will be read during Shuffle Read.
Next, we will introduce how Celeborn supports Spark AQE. AQE is the most important optimization of Apache Spark in recent years. It mainly has three scenarios, Partition merging, Join Strategy switching, and Skew Join optimization. AQE's requirement for the Shuffle module is to be able to read according to the range of Partition and Mapper. It is more natural to read according to the range of Partition. As shown in the upper right corner of the above figure, Reducer1 directly reads the data of Partition2, 3, and 4.
However, according to the range read of Mapper, it is a little more complicated to implement, and can be divided into the following three steps:
• The first step, Split. Skew Join means that the data is skewed, and there is a high probability that Partition splitting will be triggered. For example, Partition1 is split into Split1 and Split2.
• The second step, Sort On Read. When a Partition Split file is read for the first time, Sort On Read will be triggered, and the Worker will sort the file according to the Partition ID. After sorting, Mapper's range reads will change from random reads before sorting to sequential reads. For example, if I want to read the data from Mapper1 to Mapper2, if it is the file before sorting, I need to seek four times for this file, but if it is after sorting, I only need to seek once.
• The third step, Range Read. Sub Reducer sequentially reads the data belonging to its own Mapper range from these two Partitions. At the same time, the Split file will record its own Mapper list, so that unnecessary Split files can be cropped.
Next, introduce Celeborn's columnar Shuffle. As we all know, row storage and column storage are two common data layout methods. The advantage of column storage is that the same type of data is put together, which is easy to encode, such as dictionary encoding, run-length encoding, delta encoding, prefix encoding, etc., which can greatly reduce the amount of data. In the past, column storage was mainly used to store source table data, while row storage was mostly used for intermediate data in computing engine operators, because the implementation of operators in the past was mostly based on row storage data.
However, in recent years, vectorized engines have become more and more popular, including Velox, ClickHouse, DuckDB, etc. They all use vectorized operators to implement, so the intermediate data of operators also uses column storage. Although Databricks' photon engine uses vectorization technology, Apache Spark is still a row-based engine.
In order to implement columnar shuffle in Apache Spark, Celeborn introduces row-column conversion and code generation, which converts row-stored data into column-stored data during Shuffle Write, and converts columns into row-stored data during Shuffle Read. At the same time, in order to improve the efficiency of row-column conversion, Celeborn introduces code generation technology to eliminate the overhead of interpretation and execution. After columnar shuffle is enabled in the 3T TPCDS test, the overall shuffle size can be reduced by 40%, and the overhead of row-column conversion is less than 5%.
Next, introduce Celeborn's multi-tier storage. The design goal of multi-tier storage is to allow Celeborn to flexibly adapt to a variety of hardware environments, and to store data in the fastest storage tier as much as possible. Celeborn defines three storage media: memory, local disk, and distributed storage (OSS/HDFS). Users can choose 1-3 types of storage at will, for example, they can only use local disks, or only memory and OSS.
The figure above shows the storage mechanism for selecting three media at the same time. First, the memory will be divided into two logical areas, Push Data Region and Cache Region. The data pushed by the Map will first fall in the Push Data Region. When the data of a Partition exceeds the preset threshold, Flush will be triggered. At this time, Celeborn will judge the target storage layer of the Partition. If it is a local disk (P3), this part of the data will be It is flushed to the local; if it is a memory cache (p4), this part of the data will be logically divided into Cache Region (there will be no real memory copy).
When the Cache Region is full, Celeborn will move the largest Partition Evict to the next layer of storage, for example, P4 will be flushed to the local disk. Once the data of a Partition is flushed, its subsequent data will not be moved to the Cache Region.
When the local disk is full, we have two strategies. The first is to evict local files to OSS. The second type does not touch local files, and the data is directly flushed from the memory to OSS.
Multi-layer storage can not only improve the performance of small Shuffles through memory, but also use the massive storage space of OSS to support super-large Shuffles. It can also make Celeborn independent of local disks. For example, if only memory and OSS are selected, Celeborn will have no local disks. , so that the Celeborn service itself can be better resiliency.
2. Stability
Celeborn's design for the stability of the service itself mainly includes in-place upgrades, congestion control, and load balancing.
First introduce in-place upgrade. Availability is a requirement that services must meet. Although the blue-green switching method can meet most scenarios, it requires more manual intervention and temporary resource expansion. Celeborn implements in-place upgrades without any sense of application through protocol forward compatibility and graceful restart. For forward compatibility, we implement it through PB of the protocol, and we use the feature of Partition active segmentation for graceful restart. The above figure shows the process of graceful restart.
First, the external system triggers a graceful restart. After receiving the signal, the Worker marks itself as a graceful shutdown state and reports it to the Master. After that, the Master will not allocate new slots to the Worker. Then Worker marks the return of PushData with a HardSplit mark. After receiving this mark, Client will not continue to push data to this Worker, and at the same time, it will send a CommitFile message to this Worker. When all Partition data cached in memory on Worker completes CommitFile After that, the Worker will serialize the state of the memory and save it to the local LevelDB, and then restart. Then read and restore the state from LevelDB, and finally re-register with the Master.
From this process, we can see that due to the existence of the active split mechanism, Celeborn's graceful restart is more efficient than other systems, and can basically be completed in seconds without affecting job running at all.
Next, we will introduce Celeborn's congestion control in the Shuffle Write phase. In order to avoid overwhelming the Worker memory due to large instantaneous jobs, Celeborn refers to the TCP congestion control mechanism, including slow start, congestion avoidance, and congestion control.
Pusher is initially in a slow start state, and the rate of pushing data is very slow, but this rate will increase exponentially, and when it reaches a certain threshold, it will enter the congestion avoidance phase. At this time, the increase rate of the push rate will slow down and become a fixed slope. At this time, if the Worker memory reaches the warning line, congestion control will be triggered, and a mark will be sent to each Client. After the Client receives it, it will return to the initial slow start state, and the Pusher rate will be reduced to a very low level accordingly.
Another common design of flow control is Credit Based flow control. Simply put, before I push data, I need to get a certain amount of credit from the Worker, which means that the Worker will reserve part of the memory for me. I only need to Can push the data that does not exceed the Credit in my hand. This mechanism can ensure To ensure precise control of memory, but its Tradeoff is to increase the control flow, which has a certain impact on performance.
The TCP-like congestion control adopted by Celeborn in the Shuffle Write phase can take into account the peak of instantaneous traffic and the performance of steady state at the same time. At the same time, Celeborn adopts the Credit Based design in supporting Flink's Shuffle Read stage.
Next, introduce Celeborn's load balancing design. Currently, Celeborn focuses on load balancing mainly on disks. The design goal is to isolate bad disks and try to distribute the load to faster disks with more space. Specifically, the Worker monitors the status of each available local disk, including health, disk refresh rate, and predicted future usage. These status information are sent to the Master with heartbeats. The Master maintains the state information of all available disks in the entire cluster, and groups the disks according to an algorithm model. Groups with higher levels will allocate more workloads. If they belong to the same group, they will try to allocate them to disks with larger available capacity. Celeborn's load balancing design has more stable performance in heterogeneous environments.
3. Elasticity
Celeborn's elastic design mainly includes the Spark on K8s + Celeborn solution.
In the Yarn scenario, the External Shuffle Service is the prerequisite for Spark to enable dynamic resource scaling. After the Shuffle data is entrusted to the ESS, the Executor can be released.
However, there is no ESS in the Spark on K8s scenario. In order to serve the subsequent Shuffle Read, the Pod cannot be released even if it is idle. In order to optimize this scenario, the open source solution adds a parameter spark.dynamicAllocation.shuffleTracking.enabled, which determines whether to release by tracking whether the Shuffle file is read. But according to our tests, this parameter has limited effect. After integrating Celeborn, Shuffle data is hosted on the Celeborn cluster, and Pods can be released immediately after being idle, so as to achieve true elasticity.
4. Typical scenarios
Celeborn has the following three typical scenarios.
• The first is full mix. That is, HDFS, Yarn, and Celeborn are distributed in the same cluster, and its main benefit is that it can improve performance and stability.
• The second is the independent deployment of Celeborn, with HDFS and Yarn mixed. In addition to improving performance and stability, it can also isolate the IO of source table data and the preemption of disk by IO of Shuffle data, providing certain resource isolation and partial elasticity of Celeborn cluster.
• The third is the separation of deposit and calculation. The data of the source table exists in object storage, the computing nodes run on the K8s or Yarn cluster, and the Celeborn cluster is also deployed independently. In this scenario, both the computing cluster and the Celeborn cluster can enjoy complete elasticity.
5. Evaluation
Next, I will share two cases. The first one is the case of mixed departments. A user mixed Celeborn into a computing cluster, and the overall scale of Celeborn deployment reached more than 1,000 machines, but the resources given to each Worker were relatively limited.
The user's daily Shuffle data volume can reach 4PB after compression, which greatly improves the stability of big data. As can be seen from the figure above, there are more than 80,000 concurrent jobs, and a single Shuffle has a 16T-scale job, which can also run stably in the HDD environment. Before Celeborn, this job could not run.
The second is a case of separation of deposit and calculation. A user adopts a completely separated storage and computing architecture. Its computing nodes run on K8s, the source table data exists in OSS, and the Celeborn cluster is deployed independently. Their computing nodes have tens of thousands of pods per day. Spark's dynamic resource scaling function is enabled by default, which has very good elasticity. In addition, performance and stability have also been significantly improved.
The picture above shows our test results in the mixed environment of the standard test set TPCDS 3T. Without consuming additional machine resources, Celeborn's single copy has a 20% performance improvement over the External Shuffle Service, and the double copy has a 13% improvement.

Related Articles

Explore More Special Offers

  1. Short Message Service(SMS) & Mail Service

    50,000 email package starts as low as USD 1.99, 120 short messages start at only USD 1.00

phone Contact Us