EMR Spark-SQL performance optimization reveals Native Codegen Framework

background and motivation

SparkSQL's performance optimization over the years has focused on the two areas of Optimizer and Runtime. The purpose of the former is to obtain the optimal execution plan, and the purpose of the latter is to execute the established plan as quickly as possible.

Compared with Runtime, Optimizer is a more general, implementation-independent optimization. Whether it is the Java world (Spark, Hive) or the C++ world (Impala, MaxCompute), whether it is Batch-Based (Spark, Hive) or MPP-Based (Impala, Presto), or even whether it is the field of big data or traditional database or In the HTAP field (HyPer, ADB), the Optimizer level considers very similar issues: Stats collection, Cost evaluation, and plan selection; the optimization techniques used are also similar, such as JoinReorder, CTE, GroupKey Elimination, etc. Although there are differences in the construction of the Cost Model due to different contexts (such as whether there is an index), or different space search strategies are used in specific scenarios (such as genetic algorithm vs. dynamic programming), the methods are generally the same.

For a long time, the optimization work of Runtime basically focused on solving the hardware bottleneck at that time. For example, when MapReduce first came out, network bandwidth was the bottleneck, so Google did a lot of locality optimizations; when Spark first came out, the problem it solved was disk IO. A new bottleneck [1], so improving CPU performance has become an important optimization direction in the Runtime field in recent years.

The two mainstream technologies to improve CPU performance are Vectorized Processing technology represented by MonetDB/X100[2] (now evolved into VectorWise[3]) and code generation represented by HyPer[5][6] ( CodeGen) technology (Spark is followed by CodeGen[9]). To put it simply, the vectorization technology follows the volcano model, but instead of allowing the SQL operator to calculate one record at a time, the vectorization technology accumulates a batch of data before executing it. Batch-by-batch calculations have more room for optimization than item-by-item calculations, such as overhead allocation of virtual functions, SIMD optimization, and more cache-friendliness. The disadvantage of this technique is that the data transferred between operators changes from strips to batches, thus increasing the materialization overhead of intermediate data. CodeGen technology solves the problem of virtual function overhead and intermediate data materialization from another angle: operator fusion. To put it simply, the CodeGen framework "flattens" the volcano model by breaking the boundaries between operators, compressing the original iterator chain into a large for loop, and generating code with the same semantics (Java/C++/LLVM), Then use the corresponding tool chain to compile the generated code, and finally use the compiled class (Java) or so (C++, LLVM) to execute, thus converting interpretation execution into compilation execution. In addition, although it is still executed one by one, because the function call is erased, a Record is basically in the register from the initial operator (in Stage) to the end operator, and will not be materialized into memory. The disadvantage of CodeGen technology is that it is difficult to apply optimizations such as SIMD.

The two sects fell in love with each other, and after posting papers to verify that they were superior to each other[4][8], the two came to cooperate, and the cooperation produced a series of projects and papers, and the current mainstream view in the academic circle is that both Fusion is the optimal solution, and some projects that adopt the fusion method have emerged as the times require, such as the evolved version of HyPer[6], Pelonton[7], etc.

Although the academia has reached integration, the mainstream industry has no strong motivation to go to the path of integration. The main reason for exploring the reason is that the current integration method has no qualitative improvement compared with individual optimization; The unaccepted best practice is still in the exploratory stage; third, the industry has not yet exerted its maximum potential in a single technology. Taking SparkSQL as an example, from the first appearance of SparkSQL’s Expression-level Codegen in 2015, to the WholeStage Codegen implemented by referring to HyPer, after years of polishing, SparkSQL’s Codegen technology has matured, and its performance has also achieved two orders of magnitude. jump. However, perhaps due to maintainability or developer acceptance, SparkSQL's Codegen has been limited to generating Java code, and has not tried NativeCode (C/C++, LLVM). Although the performance of Java is already excellent, it still has a certain overhead compared to Native Code, and lacks SIMD (Java is doing this feature), Prefetch and other semantics. More importantly, Native Code directly operates bare metal, which is easy to achieve extreme Squeezing hardware performance, it is also more convenient to support some accelerators (such as GPU) or new hardware (such as AEP).

Based on the above motives, the EMR team explored and developed the SparkSQL Native Codegen framework, and changed the engine for SparkSQL. The new engine brought about 20% performance improvement, and made great contributions to EMR's winning the world's first again. This article introduces the Native Codegen framework in detail. .


key problem
When doing Native Codegen, there are three core issues:
1. What is generated?
2. How to generate?
3. How to integrate into Spark?

generate what
For what code to generate, combined with the results of the research and the technology stack of the development students, there are three candidates: C/C++, LLVM, Weld IR. The advantage of C/C++ is that it is relatively simple to implement. You only need to rewrite the logic of the Java code generated by Spark. The disadvantage is that the compilation time is too long. The following figure shows the evaluation data of HyPer. The compilation time of C++ is an order of magnitude higher than that of LLVM. compile time.jpg
Excessive compilation time is not friendly to small queries, and in extreme cases the compilation time is longer than the running time. Based on this consideration, we excluded the C/C++ option. From the figure above, it seems that the compilation time of LLVM is very friendly, and many Native CodeGen engines, such as HyPer, Impala, and Alibaba Cloud's self-developed big data engine MaxCompute, ADB, etc., all use LLVM as the target code. For us (not necessarily for you: D), the biggest disadvantage of LLVM is that it is too low-level, and the syntax is close to assembly. Just imagine how much work it would be to rewrite the SparkSQL operator with assembly. Most engines do not use LLVM to write the full amount of code. For example, HyPer only uses LLVM to generate the core logic of the operator, and other general functions (such as spill, complex data structure management, etc.) are written in C++ and compiled in advance. Even though LLVM+C++ saves a lot of work, it is still unacceptable to us, so we turned to the third option: Weld IR (Intermediate Representation).

First, a brief introduction to Weld follows. The author of Weld, Shoumik Palkar, is a student of Matei Zaharia, who must be familiar to everyone, the author of Spark. The problem that Weld originally wanted to solve was the overhead of data transmission when calling each other between different libs. For example, to call the numpy interface in pandas, first pandas writes the data into the memory, and then numpy reads the memory for calculation. For the extremely optimized lib In general, the memory write and read time may far exceed the calculation itself. To solve this problem, Weld has developed the Common Runtime and provided a set of IR, coupled with the feature of lazy evaluation, just (simply) modify the lib to conform to the Weld specification, and then different libs can share the Weld Runtime, Weld Runtime uses lazy evaluation to implement cross-lib pipelines, thereby eliminating the overhead of data materialization. Weld Runtime has also made several optimizations, such as loop fusion, loop unrolling, vectorization, adaptive execution, etc. In addition, Weld supports calling C code, which can easily call third-party libraries.

What we are interested in is the IR provided by Weld and the corresponding Runtime. Weld IR is designed for data analysis, so its semantics are very close to SQL, and it can better express operators. At the data structure level, the core data structures of Weld IR are vec and struct, which can better express SparkSQL's UnsafeRow Batch; based on struct and vec, a dict can be constructed, which can better express the Hash structure that is heavily used in SQL. At the operational level, Weld IR provides semantics of similar functional languages, such as map, filter, iterator, etc., and with builder semantics, it can conveniently express the semantics of operators such as Project, Filter, Agg, and BroadCastJoin. For example, the following IR expresses the Filter + Project semantics, which means that if the second column is greater than 10, return the first column:

From this, the advantages of Weld IR are obvious. It takes into account both performance (finally generating LLVM code) and ease of use (CodeGen Weld IR is much more convenient than LLVM and C++). Based on these considerations, we finally choose Weld IR as the object code.

There are two key issues with this process:

1. What is the transmission medium between operators?
2. How to deal with operators not supported by Weld?

Transmission medium
Different from Java, Weld IR does not provide a loop structure. Instead, it uses a vec structure and a generic iterator operation on it. Therefore, it is difficult for Weld IR to learn from Java Codegen to set a large loop outside the Stage, and then each operator processes a Record. Instead, each operator processes a batch of data, performs false materialization at the IR level, and then relies on Weld's Loop-Fusion optimization to eliminate materialization. For example, the aforementioned Filter is followed by Project, the IR generated by the Filter operator is as follows, and the data in the second column <=10 is filtered out:

|v:vec[{i32,i32}]| let res_fil = for(v,appender,|b,i,n| if(n.$1>10, merge(b,n), b)
The IR generated by the Project operator is as follows, returning the first column of data:

let res_proj = for(res_fil,appender,|b,i,n| merge(b,n.$0))
On the surface, it seems that the Filter operator will materialize the intermediate results. In fact, Weld's Loop-Fusion optimizer will eliminate this materialization. The optimized code is as follows:

|v: vec[{i32,i32}]| for(v,appender,|b,i,n| if(n.$1 > 10, merge(b,n.$0), b))
Although relying on Weld's Loop-Fusion optimization can greatly simplify CodeGen's logic, we found that the Loop-Fusion process is very time-consuming during development, and even complex SQL (more than 3 levels of nesting) cannot even give results within a limited time. At that time, we faced two choices: modify the implementation of Weld, or modify CodeGen to directly generate the code after Loop-Fusion, and we chose the latter. The code generated after refactoring is as follows. Lines 1, 2, and 11 are generated by the Scan operator, lines 3, 4, 5, 6, 8, 9, and 10 are generated by the Filter operator, and lines 7 are generated by the Project operator.

This optimization brings compilation times back to sub-second levels.


Fallback mechanism

Limited by Weld's current expressive ability, some operators cannot be implemented with Weld, such as SortMergeJoin, Rollup, etc. Even in the original Java CodeGen, some operators such as Outer Join do not support CodeGen, so how to do a good Fallback is the prerequisite to ensure correctness. The strategy we adopt is intuitive: if the current operator does not support Native CodeGen, Java CodeGen will take over. The key issue involved here is the granularity of Fallback: is it at the operator level or at the stage level?

Regardless of the difficulty of implementation, although the fallback of the operator granularity is more reasonable intuitively, it will actually lead to a more serious problem: the break of the pipeline inside the stage. As mentioned above, one of the advantages of CodeGen is to pipeline the logic of the entire stage and break the boundaries between operators. A single record is executed from the initial operator to the end operator, and there is no materialization in the whole process. However, the fallback of the operator granularity will cause part of the stage to use the Native Runtime and the other part to use the Java Runtime, so there will inevitably be intermediate data materialization at the connection between the two. This overhead is usually greater than the benefits brought by the Native Runtime.

Based on the above considerations, we chose Stage-level Fallback. Once an unsupported operator is encountered in the CodeGen stage, the entire Stage will fallback to Java CodeGen. Statistics show that the entire TPCDS Benchmark hits 80% of the Native CodeGen Stage.

Spark integration

After completing the code generation and Fallback mechanism, the final question is how to integrate with Spark. The execution of Spark's WholeStageCodegenExec can be understood as a black box. Whether the upstream is Table Scan, Shuffle Read, or BroadCast, there are only two types of input to the black box: RowBatch (upstream is Table Scan) or Row Iterator (upstream is not Table Scan) ), and the output of the black box is fixed as Row Iterator, as shown in the following figure:


As mentioned above, we chose the Stage-level Fallback, which also determines that the black box is either Java Runtime or Native Runtime, and there is no mixed situation. Therefore, we only need to care about how to convert Row Batch/Row Iterator to Weld. Memory layout, and how to convert the output of Weld into Row Iterator. In order to further simplify the problem, we noticed that although the input of Shuffle Reader/BroadCast is Row Iterator, the data structure of remote serialization is essentially Row Batch, but Spark deserializes it and converts it into Row Iterator before feeding it to CodeGen Module, RowBatch packaged into Row Iterator is very simple. Therefore, the input and output of Native Runtime can be unified into RowBatch.

The solution is imminent: convert RowBatch to Weld vec! But we go one step further, why not directly feed Row Batch to Weld to save memory conversion? In essence, Row Batch is just a byte stream that meets certain specifications. Spark also provides OffHeap mode to store memory directly outside the heap (only for Scan Stage. Shuffle data and Broadcast data need to be read out of the heap), and Weld can directly access it. The memory layout of Spark UnsafeRow is roughly as follows:

For a certain schema, the structure of null bitmap and fixed-length data is fixed and can be mapped into a struct. For var-length data, our approach is to copy these data to continuous memory addresses. In this way, for RowBatch without variable-length data, we directly feed the memory block to Weld; for data with variable-length data, we only need to make a large-grained memory copy (copy the fixed-length part and the variable-length part separately out) without doing column-level fine-grained copy transformations.

Continuing with the previous example of Filter+Project, a Record contains two int columns, and its UnsafeRow memory layout is as follows (for alignment, the fixed-length part in Spark uses at least 8 bytes).

This solves the problem of Input. The essence of converting Weld Output to RowBatch is the reverse operation of the above process, so I won't repeat it here.

The problem of data conversion between Java and Native is solved, and the rest is how to execute it. First of all, we decide whether to use Java Runtime or Native Runtime according to the Mode of the current Stage. In the Native branch, StageInit will be executed first to do stage-level initialization work, including initializing Weld, loading the compiled Weld Module, pulling Broadcast data (if any), etc.; followed by a loop, each loop reads a RowBatch (from Scan or Shuffle Reader) is fed to Native Runtime for execution, and Output is converted and fed to Shuffle Writer. As shown below:

Summarize
This article introduces the EMR team's exploration and practice in the direction of Spark Native Codegen. Due to space limitations, some technical points and optimizations have not been expanded. You can open another article for detailed explanations, for example:

1. Extreme native operator optimization
2. Detailed explanation of data conversion
3. Weld Dict optimization

Related Articles

Explore More Special Offers

  1. Short Message Service(SMS) & Mail Service

    50,000 email package starts as low as USD 1.99, 120 short messages start at only USD 1.00

phone Contact Us