Alibaba Cloud RemoteShuffleService new features: AQE and flow control

RSS supports AQE

Introduction to AQE

Adaptive Query Execution (AQE) is an important function of Spark3 [2]. It dynamically adjusts the subsequent execution plan by collecting the runtime Stats to solve the problem that the generated execution plan is not good enough because the Optimizer cannot accurately predict Stats. AQE mainly has three optimization scenarios: Partition Coalescing, Switch Join Strategy, and Optimize Skew Join. These three scenarios all put forward new requirements for the ability of Shuffle framework.

Partition Merge

The purpose of partition merging is to make the amount of data processed by the reducer moderate and uniform as much as possible. First, Maper performs Shuffle Write according to the number of partitions. The AQE framework counts the size of each partition. If the data volume of multiple consecutive partitions is relatively small, these partitions will be merged into one and handed over to a reducer for processing. The process is shown below.

It can be seen from the above figure that the optimized Reducer2 needs to read the data originally belonging to Reducer2-4. The requirement for the Shuffle framework is that the ShuffleReader needs to support the range Partition.

Join policy switch

The purpose of the join policy switch is to correct the incorrect selection of SortMerge Join or ShuffleHash Join by Optimizer due to inaccurate Stats estimation. Specifically, after the Shuffle Write is completed for the two tables in the join, the AQE framework calculates the actual size. If it is found that the small table meets the conditions for Broadcast Join, the small table will be Broadcast and joined with the local Shuffle data of the large table. The process is as follows:

Join policy switching has two optimizations: 1 Rewritten as Broadcast Join; 2. The data of the large table is directly read locally through the LocalShuffleReader. The second new requirement for Shuffle framework is to support Local Read.

Tilt Join Optimization

The purpose of skewed join optimization is to allow more reducers to handle skewed partitions, so as to avoid long tails. Specifically, after the Shuffle Write, the AQE framework counts the size of each partition, and then judges whether there is a skew according to specific rules. If there is a skew, the partition is split into multiple splits, and each split is joined with the corresponding partition of another table. As shown below.

Partiton splitting is to accumulate their Shuffle Output Size in the order of MapId, and trigger splitting when the accumulated value exceeds the threshold. The new requirement for Shuffle framework is that ShuffleReader should be able to support the range MapId. Comprehensive Partitions merge and optimize the requirements for Range Partitions.

RSS architecture review

The core design of RSS is Push Shuffle+Partition data aggregation, that is, different mappers push data belonging to the same partition to the same worker for aggregation, and the Reducer directly reads the aggregated file. As shown in the figure below.

In addition to the core design, RSS also realizes multiple copies, full link fault tolerance, master HA, disk fault tolerance, adaptive pusher, rolling upgrade and other features. See [1] for details.

RSS supports partition merging

The requirement of partition merging for the Shuffle framework is to support range partitions. Each partition corresponds to a file in RSS, so it is naturally supported, as shown in the following figure.

RSS supports join policy switching

The requirement of the Join policy switch for the Shuffle framework is to support the LocalShuffleReader. Because of the remote attribute of RSS, data is stored in the RSS cluster, and only exists in the local area in the case of mixed RSS and computing clusters. Therefore, Local Read is not supported temporarily (mixed scenarios will be optimized and supported in the future). It should be noted that although Local Read is not supported, it does not affect the rewriting of Join. RSS supports the optimization of Join rewriting, as shown in the figure below.

RSS supports Join tilt optimization

Among the three scenarios of AQE, RSS support for Join tilt optimization is the most difficult point. The core design of RSS is partition data aggregation, which aims to transform the random reading of Shuffle Read into sequential reading, thus improving performance and stability. Multiple Mappers are pushed to the RSS Worker at the same time, and RSS is flushed after memory aggregation, so the data from different Mappers in the Partition file is unordered, as shown in the following figure.

Join skew optimization needs to read the range Map, such as the data of Map1-2. There are two general methods:

1. Read the complete file and discard the data outside the range.

2. Import the index file, record the location and MapId of each block, and only read the data within the range.

The problems of these two approaches are obvious. Method 1 will cause a large number of redundant disk reads; Method 2 essentially falls back to random reading, losing the core advantage of RSS, and creating index files becomes a general Overhead, even for non-sloped data (it is difficult to accurately predict whether there is skew during Shuffle Write).

To solve the above two problems, we propose a new design: Active Split+Sort On Read.

Active Split

The approximate size of skewed partitions is very large. In extreme cases, the disk will be directly destroyed. Even in non-skewed scenes, the probability of large partitions is still not small. Therefore, from the perspective of disk load balancing, it is very necessary to monitor the size of the partition file and perform an active split (the default threshold is 256m).

When a split occurs, RSS will reassign a pair of workers (primary replica) for the current partition, and the subsequent data will be pushed to the new worker. In order to avoid the impact of Split on the running Mapper, we put forward the method of Soft Split, that is, when the Split is triggered, RSS asynchronously prepares the new Worker, and then hot updates the Mapper's PartitionLocation information after Ready, so it will not cause any interference to the Mapper's PushData. The overall process is shown in the figure below.

Sort On Read

In order to avoid the problem of random reading, RSS adopts the strategy of Sort On Read. Specifically, the first range read of File Split will trigger the sort (non-range read will not trigger), and the sorted files will be written back to the disk with their location index. Subsequent Range reads can ensure sequential reads. As shown in the figure below.

In order to avoid multiple Sub-Reducers waiting for the sorting of the same File Split, we break up the order of each Sub-Reducer reading the Split, as shown in the following figure.

Sort optimization

Sort On Read can effectively avoid redundant and random reads, but it needs to sort the Split File (256m). This section discusses the implementation and cost of sorting. File sorting includes three steps: reading files, sorting MapIds, and writing files. The default number of RSS blocks is 256k, and the number of blocks is about 1000. Therefore, the sorting process is very fast, and the main overhead is file reading and writing. There are roughly three schemes for the whole sorting process:

1. Pre-allocate memory of file size, read the file as a whole, parse and sort MapIds, and write the blocks back to disk in MapId order.

2. Do not allocate memory. Seek to the location of each block, parse and sort MapIds, and transfer the blocks of the original file to the new file in MapId order.

3. Allocate small blocks of memory (such as 256k), read the entire file in order, parse and sort the MapId, and transfer the Block of the original file to the new file in the MapId order.

From the perspective of IO, at first glance, Scheme 1 does not have sequential read and write by using sufficient memory; Scheme 2 has random reading and random writing; Scheme 3 has random writing; Visually, Scheme 1 has better performance. However, due to the existence of PageCache, the approximate rate of the original file in Scheme 3 is cached in PageCache when writing the file, so the performance of Scheme 3 is better, as shown in the figure below.

At the same time, Scheme 3 does not need to occupy additional memory of the process, so RSS uses the algorithm of Scheme 3. At the same time, we also tested the comparison between Sort On Read and the above random read method of non-sort and index only, as shown in the figure below.

Overall process

The overall process of RSS support for join tilt optimization is shown in the figure below.

RSS flow control

The main purpose of flow control is to prevent the RSS Worker memory from exploding. There are usually two ways of flow control:

1. The client reserves memory for the worker before each PushData, and the push is triggered only after the reservation is successful.

2. Back pressure at the worker end.

Because PushData is a very high frequency and performance-critical operation, it would be too expensive to conduct an additional RPC interaction for each push, so we adopted the backpressure strategy. From the perspective of worker, there are two sources of inflow data:

1. Data pushed by the client

2. Data sent by master copy

As shown in the figure below, Worker2 receives both the data of Partition3 pushed by Mapper and the replica data of Partition1 sent by Worker1, and sends the data of Partition3 to the corresponding slave replica.

Among them, data from Mapper will release memory only if and only if the following conditions are met simultaneously:

1. Replication execution succeeded

2. Data writing succeeded

Data pushed from the primary replica will release memory only if and only if the following conditions are met:

1. Data writing succeeded

When designing the flow control strategy, we should not only consider the flow restriction (reducing the inflow of data), but also consider the flow discharge (memory can be released in time). Specifically, we define two memory thresholds for high water level (corresponding to 85% and 95% memory usage respectively), and only one memory threshold for low water level (50% memory usage). When the first threshold of high water level is reached, the flow control will be triggered, the data pushed by Mapper will be suspended, and the disk will be forced to be flushed at the same time, so as to achieve the goal of drainage. Limiting only the inflow from Mapper does not control the flow from the primary replica. Therefore, we define the second level of high water level. When this threshold is reached, the data sent by the primary replica will be suspended at the same time. When the water level is lower than the low water level, it will return to the normal state. The overall process is shown in the figure below.

performance testing

We compared the performance of RSS and native External Shufle Service (ESS) to enable AQE in Spark 3.2.0. RSS adopts a mixed mode, which does not occupy any additional machine resources. In addition, the memory used by RSS is 8g, accounting for only 2.3% of the machine memory (352g of machine memory). The specific environment is as follows.

testing environment


Header machine group 1x ecs.g5.4xlarge

Worker machine group 8x ecs.d2c.24xlarge, 96 CPU, 352 GB, 12x 3700GB HDD.

Spark AQE related configuration:

spark.sql.adaptive.enabled true

spark.sql.adaptive.coalescePartitions.enabled true

spark.sql.adaptive.coalescePartitions.initialPartitionNum 1000

spark.sql.adaptive.skewJoin.enabled true

spark.sql.adaptive.localShuffleReader.enabled false

RSS related configuration:




TPCDS 10T test set

We tested 10T TPCDS, and E2E showed that ESS took 11734s, and RSS single copy/two copies took 8971s/10110s, respectively, 23.5%/13.8% faster than ESS, as shown in the figure below. We observed that the network bandwidth reached the upper limit when RSS enabled two replicas, which is also the main factor that two replicas are lower than a single replica.

Related Articles

Explore More Special Offers

  1. Short Message Service(SMS) & Mail Service

    50,000 email package starts as low as USD 1.99, 120 short messages start at only USD 1.00

phone Contact Us