Adaptive execution practice of Spark SQL on 100TB

Date: Oct 25, 2022

Related Tags:1. Data Lake Analytics - Spark SQL
2.Data Lake Analytics - Spark Streaming

Abstract:Spark SQL is the most widely used component of Apache Spark. It provides a very friendly interface to process structured data in a distributed manner. It has successful production practices in many application fields. There are still quite a few usability and scalability challenges.

To address these challenges, Intel's big data technology team and engineers from Baidu's Big Data Infrastructure Department improved and implemented an adaptive execution engine based on the Spark community version. This article first discusses the challenges that Spark SQL encounters on large-scale datasets, then introduces the background and basic architecture of adaptive execution, and how adaptive execution addresses these problems with Spark SQL. Finally, we will compare adaptive execution with existing communities. Challenges and performance differences encountered by version Spark SQL on the 100 TB scale TPC-DS benchmark, and the use of adaptive execution on the Baidu Big SQL platform.



Challenge 1: About the number of shuffle partitions





In Spark SQL, the number of shufflepartitions can be set through the parameter spark.sql.shuffle.partition, and the default value is 200. This parameter determines the number of tasks in each reduce phase of the SQL job, which has a great impact on the overall query performance. Assuming that E Executors are applied for before a query runs, and each Executor contains C cores (the number of concurrent execution threads), then the number of tasks that the job can execute in parallel at runtime is equal to E x C, or the concurrency of the job. The number is E x C. Assuming that the number of shuffle partitions is P, except that the number of tasks of the map stage is related to the number and size of the original data files, the number of tasks of each subsequent reduce stage is P. Since Spark job scheduling is preemptive, Ex C concurrent task execution units will preempt and execute P tasks, "those who can do more work", and enter the next stage until all tasks are completed. However, in this process, if the task execution time is too long due to the large amount of processing data (for example: data skew causes a large amount of data to be divided into the same reducer partition) or other reasons, the execution time of the entire stage will change. On the other hand, most of the Ex C concurrent execution units may be in an idle waiting state, and the overall utilization of cluster resources drops sharply.



So what is the appropriate parameter for spark.sql.shuffle.partition? If the setting is too small, the amount of data allocated to each reduce task will be larger, and in the case of limited memory size, it has to spill to the local disk of the computing node. Spill will cause additional disk reads and writes, affecting the performance of the entire SQL query, and worse, it may cause serious GC problems or even OOM. Conversely, if the shuffle partition is set too large. First, the amount of data processed by each reduce task is small and ends quickly, which in turn leads to a larger burden of Spark task scheduling. Second, each mapper task must divide its own shuffle output data into P hash buckets, that is, determine which reduce partition the data belongs to. When there are too many shuffle partitions, the amount of data in the hash bucket will be very small, and the number of concurrent jobs will be very high. When the size is large, the shuffle of the reduce task will cause a certain degree of random small data read operations. When the mechanical hard disk is used as the temporary access to the shuffle data, the performance degradation will be more obvious. Finally, when the last stage saves data, P files are written out, which may also result in a large number of small files in the HDFS file system.



From the above, the setting of shuffle partition can neither be too small nor too large. In order to achieve the best performance, it often takes many trials to determine the best shuffle partition value for a SQL query. However, in the production environment, SQL often processes data in different time periods in the form of scheduled jobs, and the size of the data may vary greatly, and we cannot do time-consuming manual tuning for each SQL query, which also means that these SQL Jobs are difficult to run in an optimally performant manner.



Another problem with shuffle partitions is that the same shuffle partition number setting will apply to all stages. When Spark executes a SQL job, it will be divided into multiple stages. Under normal circumstances, the data distribution and size of each stage may be different. The global shuffle partition setting can only be optimal for one or some stages at most, and there is no way to optimize the global settings for all stages.



This series of performance and ease-of-use challenges for shufflepartition prompted us to think about a new approach: can we automatically set each stage based on the amount of shuffle data obtained at runtime, such as data block size, number of recorded rows, etc. Appropriate shuffle partition value?



Challenge 2: Spark SQL optimal execution plan





Before Spark SQL executes SQL, it parses the SQL or Dataset program into a logical plan, then undergoes a series of optimizations, and finally determines an executable physical plan. The difference in the final physical plan chosen can have a large impact on performance. How to choose the best execution plan is the core work of Spark SQL's Catalyst optimizer. Catalyst was mainly a rule-based optimizer (RBO) in the early days, and cost-based optimization (CBO) was added in Spark 2.2. The current implementation plan is determined at the planning stage and will not be changed once it is confirmed. However, during runtime, when we get more runtime information, we will likely get a better execution plan.



Taking join operation as an example, the most common strategies in Spark are BroadcastHashJoin and SortMergeJoin. BroadcastHashJoin is a map side join. The principle is that when the storage space of one of the tables is smaller than the broadcast threshold, Spark chooses to broadcast this small table to each Executor, and then in the map phase, each mapper reads a shard of the large table , and join with the entire small table, the whole process avoids shuffling the data of the large table in the cluster. In SortMergeJoin, in the map stage, the two data tables are shuffled according to the same partition method. In the reduce stage, each reducer pulls the data of the two tables belonging to the corresponding partition into the same task for join. RBO optimizes the join operation as BroadcastHashJoin as much as possible according to the size of the data. Spark uses the parameter spark.sql.autoBroadcastJoinThreshold to control the threshold for selecting BroadcastHashJoin. The default is 10MB. However, for complex SQL queries, it may use the intermediate results as the input of the join. In the planning stage, Spark cannot accurately know the size of the two tables in the join or will estimate their sizes incorrectly, so that the use of the BroadcastHashJoin strategy is missed. to optimize the chance of join execution. But at runtime, we can dynamically select BroadcastHashJoin using the information written from shuffle. The following is an example. The input size on the join side is only 600K, but Spark is still planned as SortMergeJoin.







This prompted us to think about the second question: Can we dynamically adjust the execution plan with the information collected at runtime?



Challenge 3: Data Skew





Data skew is a common problem that leads to poor Spark SQL performance. Data skew means that the amount of data in one partition is much larger than that in other partitions, resulting in the running time of individual tasks being much longer than other tasks, thus dragging down the running time of the entire SQL. In actual SQL jobs, data skew is very common. The hash bucket corresponding to the join key always has an uneven number of records. In extreme cases, the number of records corresponding to the same join key is particularly large, and a large amount of data must be divided. to the same partition, resulting in serious skewing of the data. As shown in Figure 2, it can be seen that most tasks are completed in about 3 seconds, while the slowest task takes 4 minutes, and the amount of data processed is several times that of other tasks.




At present, some common methods for dealing with data skew when joining are as follows: (1) Increase the number of shuffle partitions. It is expected that the data originally divided into the same partition can be dispersed into multiple partitions, but it has no effect on the data of the same key. (2) Increase the threshold of BroadcastHashJoin. In some scenarios, SortMergeJoin can be converted into BroadcastHashJoin to avoid data skew caused by shuffle. (3) Manually filter the skewed keys, and add random prefixes to these data. In another table, the data corresponding to these keys are also expanded accordingly, and then join. To sum up, these methods have their own limitations and involve a lot of human processing. Based on this, we thought about the third question: Can Spark automatically handle data skew in joins at runtime?



Adaptive Execution Background and Introduction



As early as 2015, the Spark community proposed the basic idea of ​​adaptive execution, adding an interface for submitting a single map stage to Spark's DAGScheduler, and trying to adjust the number of shuffle partitions at runtime. However, this implementation has certain limitations at present. In some scenarios, more shuffles will be introduced, that is, more stages. It cannot handle the situation where three tables are joined in the same stage. Therefore, this function has been in the experimental stage, and the configuration parameters are not mentioned in the official documentation.



Based on the work of these communities, Intel's big data technology team has redesigned adaptive execution to implement a more flexible adaptive execution framework. Under this framework, we can add additional rules to achieve more functions. At present, the implemented features include: automatically setting the number of shuffle partitions, dynamically adjusting the execution plan, dynamically handling data skew, and more.



Adaptive execution architecture



In Spark SQL, after Spark determines the final physical execution plan, it generates a DAG graph of RDD according to the transformation definition of each operator to RDD. After that, Spark statically divides the stage based on the DAG graph and submits it for execution, so once the execution plan is determined, it cannot be updated during the running stage. The basic idea of ​​adaptive execution is to divide the stages in the execution plan in advance, then submit the execution according to the stage, collect the shuffle statistics of the current stage at runtime, so as to optimize the execution plan of the next stage, and then submit and execute the subsequent stages. stage.






From Figure 3, we can see the working method of adaptive execution. First, use the Exchange node as the boundary to divide the execution plan tree into multiple QueryStages (Exchange node represents shuffle in Spark SQL). Each QueryStage is an independent subtree and an independent execution unit. When adding QueryStage, we also add a leaf node of QueryStageInput as the input of the parent QueryStage. For example, for the execution plan of the join of the two tables in the figure, we will create 3 QueryStages. The execution plan in the last QueryStage is the join itself, which has 2 QueryStageInputs representing its inputs, pointing to the 2 children's QueryStages. When executing a QueryStage, we first submit its child stages and collect information about the runtime of these stages. When these child stages have finished running, we can know their size and other information to judge whether the plan in QueryStage can optimize the update. For example, when we know that the size of a table is 5M, which is less than the broadcast threshold, we can convert SortMergeJoin into BroadcastHashJoin to optimize the current execution plan. We can also dynamically adjust the number of reducers of the stage according to the amount of shuffle data generated by the child stage. After completing a series of optimization processes, we finally generate the DAG graph of the RDD for the QueryStage and submit it to the DAG Scheduler for execution.



Automatically set the number of reducers



Suppose we set the number of shufflepartitions to 5. After the map stage ends, we know that the size of each partition is 70MB, 30MB, 20MB, 10MB and 50MB. Suppose we set the target amount of data processed by each reducer to be 64MB, then at runtime, we can actually use 3 reducers. The first reducer handles partition 0 (70MB), the second reducer handles contiguous partitions 1 to 3 for a total of 60MB, and the third reducer handles partition 4 (50MB),





In the framework of adaptive execution, because each QueryStage knows all its child stages, so when adjusting the number of reducers, all stage inputs can be considered. In addition, we can also use the number of records as a target value for reducer processing. Because shuffle data is often compressed, sometimes the amount of data in a partition is not large, but the number of records after decompression is indeed much larger than that of other partitions, resulting in uneven data. Therefore, considering the data size and the number of records at the same time can better determine the number of reducers.



Dynamically adjust the execution plan



Currently, we support the dynamic adjustment of the join strategy at runtime. If the conditions are met, that is, a table is smaller than the Broadcast threshold, SortMergeJoin can be converted into BroadcastHashJoin. Since the partitions output by SortMergeJoin and BroadcastHashJoin are not the same, arbitrary conversion may introduce additional shuffle operations in the next stage. Therefore, when we dynamically adjust the join strategy, we follow a rule that the conversion is performed without introducing additional shuffle.



What are the benefits of converting SortMergeJoin to BroadcastHashJoin? Because the data has been shuffled to disk, we still need to shuffle to read the data. We can look at the example in Figure 5. Assuming that table A and table B are joined, each of the two tables in the map stage has two map tasks, and the number of shuffle partitions is five. If you do SortMergeJoin, you need to start 5 reducers in the reduce phase, and each reducer reads its own data through network shuffle. However, when we find that table B can be broadcast at runtime and convert it to BroadcastHashJoin, we only need to start 2 reducers, each of which reads the entire shuffle output file of a mapper. When we schedule these two reducer tasks, we can prioritize them on the Executor running the mapper, so the entire shuffle read becomes a local read, and no data is transmitted over the network. And the sequential reading of reading a file is more efficient than the random small file reading during the original shuffle. In addition, different degrees of data skew often occur during the SortMergeJoin process, which slows down the overall running time. After converting to BroadcastHashJoin, the amount of data is generally more uniform, which avoids tilting. We can see more specific information in the experimental results below.





Dynamically handle data skew



Under the framework of adaptive execution, we can easily detect partitions with skewed data at runtime. When a stage is executed, we collect the shuffle data size and number of records of each mapper in the stage. If the amount of data or the number of records in a partition exceeds N times the median and is greater than a pre-configured threshold, we consider it to be a partition with skewed data, and special processing is required.





Suppose we do an inner join between table A and table B, and the 0th partition in table A is a skewed partition. Under normal circumstances, the data of partition 0 in table A and table B will be shuffled to the same reducer for processing. Since this reducer needs to pull a large amount of data through the network and process it, it will become the slowest task and slow down the whole. performance. Under the adaptive execution framework, once we find that partition 0 of table A is skewed, we then use N tasks to process the partition. Each task only reads the shuffle output files of several mappers, and then reads the data of partition 0 of table B for join. Finally, we combine the results of N task joins through the Union operation. In order to implement such processing, we have also changed the interface of shuffle read, allowing it to read only the data of a certain partition in part of the mapper. In such processing, partition 0 of table B will be read N times. Although this adds a certain extra cost, the benefit of processing skewed data through N tasks is still greater than this cost. If partition 0 in table B is also skewed, for inner join, we can also divide partition 0 of table B into several blocks, join with partition 0 of table A, and finally union. But for other join types such as Left Semi Join, we do not currently support splitting partition 0 of table B.



Performance comparison of adaptive execution and Spark SQL on 100TB



We use 99 machines to build a cluster, and use Spark 2.2 to conduct experiments on the TPC-DS 100TB dataset to compare the performance of the original Spark and adaptive execution. Here are the details of the cluster:








The experimental results show that under the adaptive execution mode, 92 of the 103 SQLs have been significantly improved in performance, of which 47 SQLs have improved performance by more than 10%, and the largest performance improvement has reached 3.8 times, and there is no performance improvement. falling situation. In addition, in the original Spark, there were 5 SQLs that could not run smoothly due to OOM and other reasons. We also optimized these problems in the adaptive mode, so that all 103 SQLs ran successfully on the TPC-DS 100TB data set. The following are the specific performance improvement ratios and several SQLs with the most obvious performance improvement.







By carefully analyzing these performance-enhancing SQLs, we can see the benefits of adaptive execution. The first is to automatically set the number of reducers. The original Spark uses 10976 as the number of shuffle partitions. During adaptive execution, the number of reducers of the following SQL is automatically adjusted to 1064 and 1079. It can be seen that the execution time has also improved a lot. This is precisely because the scheduling burden and task startup time are reduced, and disk IO requests are reduced.

Related Articles

Explore More Special Offers

  1. Short Message Service(SMS) & Mail Service

    50,000 email package starts as low as USD 1.99, 120 short messages start at only USD 1.00

phone Contact Us