EMR Spark-SQL performance optimization reveals the secret

Primer

Recently, the Alibaba Cloud E-MapReduce team submitted the latest results in the TPCDS-Perf list. Compared with the second place (in fact, it is also the record submitted by the EMR team in 2019), it has achieved more than 2 times in terms of performance and cost performance. Excellent result! See TPCDS Perf for details

tpcds

The Alibaba Cloud E-MapReduce team, in addition to investing a lot of R&D resources and energy in product, usability, security and other dimensions, has created a widely acclaimed big data product such as EMR; it has also invested in the engine level for a long time, The purpose of continuous deep cultivation is to maintain 100% compatibility of open source software while using the technical depth of the team to build technical barriers to products, so that customers can get more cost-effective when using open source software stacks, really The cost on the cloud is reduced to the extreme, so that customers can have no doubts and worries in the process of going to the cloud.

The achievements of the Alibaba Cloud E-MapReduce team in TPCDS Perf are also sufficient to verify the technical depth and technical strength of the team in the SPARK engine. Next, there will be a series of articles to introduce some optimization points in our 2020 ranking process. Thinking, spark engine developers or spark application developers in the community are welcome to pay attention to our series of articles, and welcome to communicate with us. The most important thing is, welcome to submit more resumes and join the Alibaba Cloud E-MapReduce team. We hope Virtue is thirsty! ! !

Flag for the third time to brush the list
From the above TPCDS Perf link, we can see that the EMR team actually submitted three results at the 10TB scale. The third time and this time to hit the list, there is a little story behind it. Because in the Perf page, TPCDS finally focuses on two indicators, one is the performance indicator and the other is the cost performance indicator. When this project was approved, we set a difficult flag for ourselves. Under the condition of keeping the physical hardware unchanged, we should improve the software optimization by 2 times+, so that the performance indicators and cost-effective indicators can be achieved. doubled.

Some comparative data with the open source Spark version
After submitting the scores, we used the open source Spark V2.4.3 version to conduct the TPCDS 99 Query test. The following is a comparison of performance data

About 3X performance improvement in Load phase
load

About 6X performance improvement in PT stage
pt

PS. Among them, Query 14 and Query 95 in the community Spark V2.4.3 version cannot run out due to OOM, and are not included in the calculation

The Query whose running time is greater than 200S in the community Spark version is compared separately
_200

PS. Query 78, the lowest among these queries, has a 3X performance improvement, and Query 57 has a performance improvement of nearly 100 times.

Overview of Optimization Points

optimizer
CTE materialization based on InMemoryTable Cache
To put it simply, it is to use the InMemoryTable Cache as far as possible to reduce unnecessary repeated calculations. For example, the scalar calculation in Query 23A/B is a very heavy operation, and must be repeated calculations, through the CTE optimization mode Match, identify time-consuming operations that require repeated calculations, and use InMemoryTable cache to reduce E2E time as a whole

More effective Filter-related optimization

Dynamic Partition Pruning This function is only available in the latest version 3.0 of the community
Small table broadcast multiplexing A small table with filtering, if you can filter two or more table data, you can reuse the filtering effect of this small table. Query 64 is a good example
BloomFilter before SMJ Before SMJ is actually implemented, by pre-BloomFilter, the data in the Join process is further reduced, and the problem of SpillDisk is eliminated to the greatest extent
PK/FK Constraint optimization provides more optimization suggestions to the optimizer through primary key and foreign key information

RI-Join removes the join between the fact table and the dimension table on the primary key and foreign key, but if the columns of the dimension table are not projected, there is no need to execute this join at all
GroupBy Keys removes non-primary key columns When GroupBy Keys includes both primary key columns and non-primary key columns, in fact, the non-primary key columns have no effect on the GroupBy result, because the primary key columns already imply Unique information
GroupBy Push Down before Join
Fast Decimal
Based on Table Analyze and Stat information at runtime, the optimizer can decide to optimize certain Decimal to Long or Int calculations, which will greatly improve, and there are a large number of Decimal calculations in TPCDS 99 Query

Runtime

In this optimization, there is another very interesting optimization, which is the Native Runtime we introduced. If the above-mentioned optimizer optimization is the killer feature of some special cases, Native Runtime is a broad-spectrum killer. According to our later statistics , the introduction of Native Runtime can universally improve the E2E time consumption of SQL Query by 15-20%, which is also a great performance improvement point in TPCDS Perf.

A brief introduction to Native Runtime
Based on the open source version of the WholeStageCodeGeneration framework, the original generated Java code is replaced with Weld IR to actually run. Weld detailed reference http://weld.stanford.edu/. In the whole project, the replacement of Weld IR is actually a very small part of the work. In order for Weld IR to run, we still need to do the following work

Expression Weld IR CodeGen (full support within TPCDS)

Operators Weld IR CodeGen (except SortMergeJoin implemented in C++, others can be replaced by Weld IR)

Unified memory layout (OffHeap UnsafeRow => C++ & Weld Runtime)

Batch-based execution framework (because if you run according to Java, each time a record flows in the generated code, the cost is too high in the time of NativeRuntime, JNI and WeldRuntime obviously cannot play like this)

Other high-performance native operators SortMergeJoin, PartitionBy, CSV Parsing, these operators cannot be directly implemented with the interface provided by Weld IR at present, we use C++ to realize the native execution of these operators

Related Articles

Explore More Special Offers

  1. Short Message Service(SMS) & Mail Service

    50,000 email package starts as low as USD 1.99, 120 short messages start at only USD 1.00

phone Contact Us