Analysis principle of EMR StarRocks extreme speed data lake
In order to meet the needs of more users for extremely fast data analysis, and to apply StarRocks' powerful analysis capabilities to a wider range of data sets, Alibaba Cloud's open-source big data OLAP team and the community work together to enhance StarRocks' data lake analysis capabilities. It can not only analyze the data stored locally in StarRocks, but also analyze the data stored in open source data lakes or data warehouses such as Apache Hive, Apache Iceberg and Apache Hudi with the same excellent performance.
This article will focus on the technical inside story, performance and future planning behind StarRocks' rapid data lake analysis capability.
1、 Overall architecture
In the scenario of data lake analysis, StarRocks is mainly responsible for the calculation and analysis of data, while the data lake is mainly responsible for the storage, organization and maintenance of data. The figure above depicts the completed technology stack composed of StarRocks and Data Lake.
The architecture of StarRocks is very simple. The core of the whole system is only FE (Frontend) and BE (Backend) processes. It does not rely on any external components, which is convenient for deployment and maintenance. FE is mainly responsible for parsing query statements (SQL), optimizing query and query scheduling, while BE is mainly responsible for reading data from the data lake and completing a series of operations such as Filter and Aggregate.
The data lake itself is a collection of technical concepts. Common data lakes usually contain three modules: Table Format, File Format and Storage. Table Format is the "UI" of the data lake. Its main role is to organize structured, semi-structured, or even unstructured data, so that it can be stored in distributed file systems such as HDFS or object storage such as OSS and S3, and to expose the relevant semantics of table structure. Table Format contains two major genres. One is to organize metadata into a series of files and store them in the distributed file system or object storage together with the actual data, such as Apache Iceberg, Apache Hudi and Delta Lake; Another way is to use a customized metadata service to store metadata separately, such as StarRocks local table, Snowflake and Apache Hive.
The main function of File Format is to provide a convenient and efficient way to retrieve and compress data units. At present, the common open source file formats include columnar Apache Parquet and Apache ORC, and columnar Apache Avro.
Storage is the module that stores data in the data lake. At present, the most commonly used storage in the data lake is the distributed file system HDFS, object storage OSS, S3, etc.
The main function of FE is to convert SQL statements into fragments that BE can recognize. If the BE cluster is regarded as a distributed thread pool, then the fragment is the task in the thread pool. From SQL text to distributed physical execution plan, the main work of FE needs to go through the following steps:
• SQL Parse: Convert SQL text into an AST (Abstract Syntax Tree)
• SQL Analyze: syntax and semantic analysis based on AST
• SQL Logical Plan: Convert AST into logical plan
• SQL Optimize: rewrite and transform the logical plan based on relational algebra, statistics and cost model, and select the "lowest" physical execution plan of cost
• Generate Plan Fragment: Convert the physical execution plan selected by Optimizer into a Plan Fragment that can be directly executed by BE.
• Scheduling of execution plans
Backend is the back-end node of StarRocks, responsible for data storage and SQL calculation execution.
The BE nodes of StarRocks are completely equal. FE allocates data to the corresponding BE nodes according to certain policies. During data import, the data will be written directly to the BE node and will not be transferred through FE. BE is responsible for writing the imported data into the corresponding format and generating the relevant index. When performing SQL calculation, a SQL statement will first be planned into logical execution units according to the specific semantics, and then split into specific physical execution units according to the distribution of data. The physical execution unit will execute on the node of the data storage, which can avoid data transmission and copying, and thus achieve the ultimate query performance.
2、 Technical details
Why is StarRocks so fast
Generally, the more complex SQL is, the more tables are joined, and the larger the amount of data is, the greater the significance of the query optimizer is, because the performance difference of different execution methods may be hundreds or thousands of times. The StarRocks optimizer is mainly implemented based on Cascades and ORCA papers, and is deeply customized, optimized and innovated in combination with the StarRocks actuator and scheduler. It fully supports 99 SQL statements of TPC-DS, and implements important functions and optimizations such as public expression reuse, related subquery rewriting, Laterial Join, CTE reuse, Join Router, Join distributed execution strategy selection, Runtime Filter push down, low cardinality dictionary optimization.
One of the key points of the CBO optimizer is whether the cost estimate is accurate, and one of the key points of the cost estimate is whether the statistical information is collected timely and accurately. StarRocks currently supports table-level and column-level statistics, and supports automatic collection and manual collection. Both automatic and manual collection support full collection and sampling collection.
MPP (massively parallel processing) is the abbreviation of massively parallel computing. The core approach is to split the query plan into many computing instances that can be executed on a single node, and then multiple nodes can execute in parallel. Each node does not share CPU, memory, and disk resources. The query performance of the MPP database can be continuously improved with the horizontal expansion of the cluster.
As shown in the figure above, StarRocks will logically divide a query into multiple query fragments. Each query fragment can have one or more fragment execution instances. Each fragment execution instance will be scheduled to be executed on a BE of the cluster. As shown in the figure above, a fragment can include one or more operators. The fragment in the figure includes Scan, Filter and Aggregate. As shown in the figure above, each fragment can have different parallelism.
As shown in the above figure, multiple fragments will be executed in parallel in memory in a pipeline manner, rather than in Stage By Stage as in the batch engine.
As shown in the figure above, Shuffle operation is the key to the continuous improvement of the query performance of the MPP database with the horizontal expansion of the cluster, and also the key to the realization of high-base aggregation and large table join.
Vectorization execution engine
As the bottleneck of database execution gradually shifted from IO to CPU, StarRocks re-implemented the entire execution engine based on vectorization technology in order to give full play to the execution performance of CPU. The core of vectorization of operators and expressions is batch execution by column. Compared with single row execution, there are fewer virtual function calls and fewer branch judgments; Execute by column is more friendly to CPU Cache and easier to SIMD optimization than execute by row.
Vectorization is not only the vectorization of all operators and expressions in the database, but also a huge and complex performance optimization project, including the column-based organization of data in disk, memory and network, the redesign of data structure and algorithm, the redesign of memory management, SIMD instruction optimization, CPU Cache optimization, C++optimization, etc. Compared with the previous execution by line, the overall performance of vectorization is improved by 5 to 10 times.
How StarRocks optimizes data lake analysis
In the field of big data analysis, data is not only stored in the data warehouse, but also stored in the data lake. The traditional data lake implementation scheme includes Hive/HDFS. In recent years, the concept of LakeHouse has become more popular. Common implementation schemes include Iceberg/Hudi/Delta. So can StarRocks help users better explore the data value in the data lake? The answer is yes.
In the previous content, we introduced how StarRocks can achieve extreme speed analysis. If these capabilities are applied to the data lake, it will definitely bring better data lake analysis experience. In this section, we will introduce how StarRocks realizes the analysis of the extremely fast data lake.
Let's take a look at the global architecture. The main modules related to StarRocks and data lake analysis are shown in the figure below. Data management is provided by data lake, and data storage is provided by object storage OSS/S3 or distributed file system HDFS.
At present, the data lake analysis capabilities that StarRocks has supported can be summarized into the following parts:
• Support Iceberg v1 table query https://github.com/StarRocks/starrocks/issues/1030
• Support Hive appearance query external table @ External_ table @ StarRocks Docs (dorisdb.com)
• Support Hudi COW table query https://github.com/StarRocks/starrocks/issues/2772
Next, let's look at how StarRocks can give the ability of rapid analysis to the data lake from the aspects of query optimization and query execution.
The query optimization part is mainly realized by using the CBO optimizer described earlier. The data lake module needs to give the optimizer statistical information. Based on these statistics, the optimizer will use a series of strategies to optimize the query execution plan. Let's take a look at several common strategies through examples.
Let's take a look at the following example. In the generated execution plan, HdfsScanNode contains the display of cardnality, avgRowSize and other statistical information.
MySQL [hive_test]> explain select l_ quantity from lineitem;
These statistics will be calculated before officially entering the CBO optimizer. For example, for Hive, we have MetaData Cache to cache these information. For Iceberg, we use Iceberg's manifest information to calculate these statistics. After obtaining these statistics, the effect of subsequent optimization strategies will be greatly improved.
Partition clipping is an optimization method that can only be performed when the target table is a partition table. Partition clipping can significantly reduce the amount of data calculated by analyzing the filter conditions in the query statement, selecting only the partitions that may meet the conditions, and not scanning the unmatched partitions. For example, in the following example, we created a_ sold_ date_ Sk is the appearance of the partition column.
The query efficiency of the join of multiple tables is closely related to the order in which each table participates in the join. For example, select * from T0, T1, T2 where T0. a=T1. a and T2. a=T1. a, there are two possible execution sequences in this SQL:
• T0 and T1 do join first, and then do join with T2
• Join T1 and T2 first, and then join T0
According to the data volume and data distribution of T0 and T2, the two execution sequences will have different performance. In response to this situation, StarRocks implemented the Join Reorder mechanism based on DP and greed in the optimizer. At present, Hive's data analysis has supported Join Reorder, and support for other data sources is also under development.
The predicate push down pushes the filter expression calculation in the query statement down to the nearest place to the data source as far as possible, thus reducing the cost of data transmission or calculation. For the data lake scenario, we have implemented the push of Min/Max and other filtering conditions into Parquet. When reading Parquet files, we can quickly filter out unused Row Groups.
For example, for the following query, l_ The corresponding condition of count=1 will be pushed down to the Parquet side.
In addition to the strategies described above, we also adapted strategies such as Limit push down, TopN push down, and sub-query optimization for data lake analysis. It can further optimize query performance.
As mentioned earlier, the execution engine of StarRocks is omni-directional and MPP architecture, which will undoubtedly bring great improvement to our analysis of data in the data lake. Next, let's see how StarRocks schedules and executes data lake analysis queries.
The data of the data lake is generally stored on HDFS and OSS, taking into account the mixed and non-mixed conditions. We have implemented a set of load balancing algorithms for Fragment scheduling.
• After partition clipping, get all HDFS file blocks to query
• Construct THdfsScanRange for each block, where hosts contains the datanode address where all copies of the block are located, and finally get the List
• The coordinator maintains a map of the number of scan ranges currently allocated by all be, the map>of the number of blocks to be read allocated by the disk on each datanode, and the average number of scan ranges allocated by each be numScanRangePerBe
• If the datanode where the block copy is located has be (mixed part)
• Each scan range is assigned to the be with the least number of scan ranges in the be where the replica is located. If the number of scan ranges allocated by be is greater than numScanRangePerBe, select the one with the smallest number of scan ranges from the remote be
• If the number of scan ranges on more than one be is the same, consider the situation of the disk on the be, and select the be that has been allocated on the disk where the replica is located and has a small number of blocks to read
• If the datanode machine where the block replica is located does not have be (deployed separately or can be read remotely)
• Select the be with the smallest number of scan ranges
After scheduling to the BE end for execution, the whole execution process is vectorized. See the following example of Iceberg. The BE end corresponding to IcebergScanNode is currently the vectorization implementation of HdfsScanNode, and other operators are similar. The BE end is the vectorization implementation.
TPC-H is a test set developed by the TPC (Transaction Processing Performance Council) to simulate decision support applications. It consists of a suite of business oriented ad-hoc queries and concurrent data modifications.
TPC-H simulates the data warehouse of a sales system based on the real production and operation environment. The test includes 8 tables, and the data volume can be set from 1 GB to 3 TB. The benchmark test includes 22 queries, and the main evaluation index is the response time of each query, that is, the time from submitting the query to returning the results.
StarRocks uses local storage query and Hive appearance query to test. Among them, StarRocks On Hive and Trino On Hive query the same data, which is stored in ORC format and compressed in zlib format. The test environment is built using Alibaba Cloud EMR.
Finally, the total time of StarRocks local storage query is 21s, and the total time of StarRocks Hive appearance query is 92s. The total time of Trino query is 307s. It can be seen that the query performance of StarRocks On Hive is far better than that of Trino, but there is still a long way to go compared with the local storage query. The main reason is that the access to remote storage increases the network overhead, and the latency and IOPS of remote storage are generally lower than that of local storage. The later plan is to make up for the problem through mechanisms such as Cache, and further shorten the gap between the StarRocks local table and StarRocks On Hive.
For the specific test process, please refer to the StarRocks vs Trino TPCH performance test comparison report
4、 Future planning
Thanks to the core technologies such as the comprehensive vectorization execution engine, CBO optimizer and MPP execution framework, StarRocks has now achieved the extremely fast data lake analysis capability far superior to other similar products. In the long run, StarRocks' vision in data lake analysis is to provide users with extremely simple, easy-to-use and high-speed data lake analysis capabilities. In order to achieve this goal, StarRocks still has a lot of work to complete, including:
• Integrate pipeline execution engine to further reduce query response speed through pipeline execution mode of Push Based
• Automatic hierarchical storage of hot and cold data. Users can store frequently updated hot data on the StarRocks local table. StarRocks will automatically migrate cold data from the local table to the data lake on a regular basis
• Remove the step of explicitly creating the appearance, and users only need to create the resource corresponding to the data lake to realize the full automatic synchronization of the data lake database table
• Further improve StarRocks' support for data lake product features, including Apache Hudi's MOR table and Apache Iceberg's v2 table; Support direct writing of data lake; Support Time Travel query, improve the support of the catalog, etc
• Further improve the performance of data lake analysis through hierarchical cache
Knowledge Base Team
Knowledge Base Team
Knowledge Base Team
Knowledge Base Team
Explore More Special Offers
50,000 email package starts as low as USD 1.99, 120 short messages start at only USD 1.00