Alibaba Cloud EMR StarRocks Extreme Data Lake Analysis

In order to meet the needs of more users for rapid analysis of data, and to apply StarRocks' powerful analysis capabilities to a wider range of data sets, Alibaba Cloud EMR OLAP team and StarRocks community began to cooperate in 2021.

Both parties jointly enhance the data lake analysis capability of StarRocks, so that it can not only analyze the data stored locally in StarRocks, but also analyze the data stored in open source data lakes or data warehouses such as Apache Hive (hereinafter referred to as Hive), Apache Iceberg (hereinafter referred to as Iceberg) and Apache Hudi (hereinafter referred to as Hudi) with the same excellent performance.

Alibaba Cloud EMR StarRocks is an open source OLAP product authorized by StarRocks to Alibaba Cloud, and is committed to building a unified analysis experience at a high speed to meet the multiple data analysis scenarios of enterprise users. This article will mainly explain the work that Alibaba Cloud EMR StarRocks has done in the direction of data lake, the actual results, and the planning of StarRocks in the direction of data lake analysis.

Alibaba Cloud EMR StarRocks overall architecture

At the storage layer, Alibaba Cloud's object storage OSS is used as the unified storage of the data lake, which can store common file formats such as Parquet/ORC/CSV.

At the lake management and optimization level, EMR will conduct metadata management and integrated construction of the overall data lake through data lake construction (DLF). At the same time, in the practice of data lake analysis, HDFS will have some performance problems compared with traditional Apache Hadoop (hereinafter referred to as Hadoop). To solve this problem, in Alibaba Cloud EMR, we have developed the Jindo FS system to accelerate and optimize the access to the data lake storage layer.

At the same time, it aims at common data lake storage formats, including Parquet and ORC formats. For example, Hudi and Iceberg have made optimization and enhancement in the aspects of index statistics version information, version maintenance, small file consolidation and life cycle. With storage and optimization for database management, you can build an analysis layer, that is, the data development and governance layer.

At the data development and governance level, StarRocks is divided into two roles in Alibaba Cloud EMR, one is a fixed node and the other is an elastic node. With the StarRocks data lake analysis engine, you can connect Apache Airflow (hereinafter referred to as Airflow) and Jupyter, which are open source on EMR, as well as Alibaba Cloud's Dataworks for data development and scheduling.

Implementation of StarRocks in Iceberg

StarRocks mainly consists of FE and BE components. They communicate with each other through RPC to achieve a series of tasks such as query scheduling and distribution, result summary, etc.

In order to support Iceberg's data lake analysis, we have made a lot of modifications on the FE side and the BE side. First is the FE side, which adds the appearance type IcebergTable; After the execution plan is generated, the relevant information of the execution plan is sent to the BE by modifying the RPC protocol (Thrift protocol); On the BE side, HDFS scanner is used to support the actual data scanning.

After doing the above series of research and development work, we conducted a performance comparison test based on TPCH and Trino. It can be seen that StarRocks performs very well compared to Trino.

So why is StarRocks so much better than Trino?

Performance analysis of StarRocks

With StarRocks' existing comprehensive vectorization execution engine and new CBO optimizer, these capabilities can greatly improve our performance at the single-table and multi-table levels. On this basis, we also added new optimization rules for the data lake analysis scenario.

First of all, in terms of optimizing rules, take a few simple examples, such as common predicate push down. By supporting predicate push down, col_ a> X and other predicates. In this way, the amount of data scanned can be reduced when scanning data.

If the predicate is not pushed down (as shown in the upper left corner of the figure above), the data will be scanned first through the overall scan, and then filtered through some filter operators upstream of the engine itself. This will cause a lot of IO overhead.

In order to further reduce the amount of scanned data, we also support partition clipping, as shown in the middle of the figure above. Before optimization, you need to scan three partitions. Through the optimization of partition clipping, the two unnecessary partitions can be cut off on the FE side. Just tell BE to scan the data of the remaining partition. In BE, we also support the Global Runtime Filter, which can greatly improve the performance of the Join scenario. With the excellent execution engine of StarRocks, it can perform well in the CPU-intensive data lake analysis scenario. However, during the implementation of some actual scenarios, some optimization rules based on FE side or the global Runtime Filter mentioned above can not completely reduce IO overhead.

How to reduce IO overhead is critical. In most cases, the data and computing nodes that need to be analyzed in the data lake will not be on the same physical machine. In the process of analysis, we are faced with very big network IO challenges. For this reason, StarRocks community has done a lot of optimization on IO, including delayed materialization, IO consolidation, support for Native Parquet/Orc Reader, and SDK optimization for object storage.

Next, I will introduce the actual optimization details through two examples.

IO Consolidation

Before IO consolidation, if you want to read the data related to a Parquet file, you first need to build a file reader for the file level based on the scan data path sent to BE on the FE side. When planning on the FE side, you can also tell which columns of data are actually scanned. In the actual customer landing process, small files lead to high IO consumption.

For ColumnReader, suppose that a SQL statement needs to read three columns at the same time. It is possible that the data volume of two columns will be relatively small. At this time, these two columns of IO can be combined. For example, the data of these two columns can be read at one time after the network IO was passed twice. For Row Group, IO consolidation can also be performed for small Row Groups to reduce the number of IO.

For the file itself, if the file is very small, we also support loading the file into memory at one time. In fact, in the test process, there will be a very obvious improvement in this scenario where there is a lot of small IO.

Delayed materialization

What is delayed materialization? What problems need to be solved for delayed materialization?

Before delaying materialization, return to the implementation principle of Parquet. For example, if you want to read three columns, you need to read these three columns at the same time, and then use some predicates, and then return to the upstream operator. An obvious problem can be seen here is that if there is no predicate for the third column, the third column does not need to read all the data.

You can look at the left part of the figure, because SQL has predicates for the first two columns c0 and c1. At this time, the two columns of data will be read into memory first. Then build a selection mask based on these two columns. These two masks are called tag arrays. With these two tag arrays, the third column will be defined as a Lazy column.

After getting the tag arrays of the first two columns, build a new filter tag array based on these two tag arrays. Then read the Lazy column based on the new filter tag array. In actual use, there may be multiple columns in the Lazy column, which can greatly reduce many unnecessary IO reads. Because of the previous engine empowerment, including comprehensive vectorization, CBO optimizer and optimized data lake analysis for IO itself, it has achieved a good performance in the process of testing and actual implementation.

In practice, another problem is metadata access. In the data lake scenario, the list operation of files may become the bottleneck of the entire network access. To solve this problem, a complete set of fine-grained intelligent cache scheme is designed on the FE side of StarRocks, which can cache Hive's partition information and file information.

In design cache, cache update is a big challenge. Based on event-driven mode, it can solve the problem of cache update, and on the basis of ensuring the performance of user queries, it can also have a very good use experience without manually updating the cache. At the same time, in order to accelerate the planning and scheduling of queries, the cache of statistical information is also supported.

Ecological analysis of StarRocks

In the early version, if you want to support new data sources, you need to do a lot of redundant development. Developers need to have a deep understanding of many other modules, and also need to create appearances when using them. How to solve this problem? Our solution is to design a new set of Connector framework.

In previous versions, assuming that a user has a library containing one or two hundred tables that need to be analyzed on StarRocks, he needs to manually create more than 100 appearances, then manage metadata through FE, and then let users use them. If the user makes some schema changes, the appearance may have to be reconstructed, which greatly increases the use burden.

In the design of the Connector framework, we introduced the concept of Catalog. Users no longer need to create appearances manually. For example, there are Hive Catalog and Iceberg Catalog now. Users do not need to create appearances. They only need to create a catalog to get the metadata information of the table in real time. We have provided complete support to Hive, Iceberg and Hudi. At the same time, DLF for metadata management, OSS, Max Compute and other products mentioned above have been integrated in the EMR product ecosystem.

Elastic Analysis of StarRocks

When introducing the product as a whole, we mentioned that one of our key product features is elasticity. How is elasticity achieved? In fact, the core solution is to support Compute Node (hereinafter referred to as CN) in StarRocks. The left part of the figure below is a fixed StarRocks cluster. These fixed BE nodes have actual SSD storage.

The green part is CN. CN and BE share the same set of execution engine code and are stateless nodes. CN can be deployed on K8S, and data can be stored on object storage or HDFS. Through the ability of K8S HPA, the CN is dynamically expanded when the cluster load is high, and the capacity is reduced when the cluster load is low.

Through the above transformation, EMR StarRocks can support elastic scaling, thus supporting maximum cost reduction. With flexibility, we need to solve another problem, that is, resource isolation. The query workloads on the data lake are usually various, including those that directly interface with BI to generate reports, and Ad-Hoc for analysts to query details. Generally, users want to achieve elastic isolation of small tenant resources through soft isolation rather than physical isolation. For example, when the cluster resources are idle, the query is allowed to make full use of the cluster resources, but when the cluster resources are tight, each tenant uses the resources according to its own resource restrictions. Therefore, StarRocks also implements resource isolation based on ResourceGroup, so that users can limit their use of CPU/MEM/IO and other resources from the user, query and IP levels.

Through the introduction of performance optimization, ecological integration flexibility and other aspects, we know how Alibaba Cloud EMR StarRocks did and to what extent in the data lake analysis scenario. To sum up, the core of Alibaba Cloud EMR StarRocks data analysis is the two keywords "speed" and "unity".

Extreme speed: Compared with Trino, the performance has been improved several times. The test data on this page above is for Hudi.

Unified: support a variety of data sources, including JDBC data sources not mentioned above. At present, there have been many landing practices in the migration from Trino to StarRocks, which can basically achieve painless migration.

Alibaba Cloud EMR StarRocks data lake planning

Through continuous communication and discussion with users, we believe that data lake analysis can become a popular data analysis technology only if it meets at least the following four requirements:

• Single Source of Truth 。 There is only one copy of data, and users do not need to display data flow.

• High performance. The query latency is close to the second level or even sub-second level.

• Elasticity. Decompose storage and computing architecture.

• Cost effective. Expand and expand on demand.

At present, there are three conditions that hinder the data lake analysis from meeting the above four requirements:

• The data lake storage system generally has poor IO performance, which cannot meet the user's requirements for low-latency queries.

• Data lake and data warehouse are clearly defined. Generally, in order to speed up data lake query, we also need to build a layer of data warehouse on it, which breaks the principle of Single Source of Truth.

• The complex data stack structure makes us unable to guarantee flexibility, high cost performance and ease of use.

After many times of thinking, open discussion and careful demonstration, we put forward a new way of data lake analysis, hoping to overcome the above problems and achieve the ideal data lake analysis state through the new way of data lake analysis.

We believe that the new method of data lake analysis is equal to cache+materialized view.

Due to the poor IO performance of the data lake storage system, including OSS, etc., the bottleneck of data lake analysis usually falls on Scan data.

In order to further improve the performance of data lake analysis, we hope to use local disk or memory cache to speed up IO performance, so that remote storage will no longer become a performance bottleneck. The introduction of cache is transparent to users. Users can enjoy the benefits of cache acceleration without additional operation and maintenance work.

Compared with remote storage, local disk or memory are generally more expensive. We hope that good steel can be used on the cutting edge: only the column data needed for user analysis will enter the cache, and we will automatically eliminate the data that is getting cold, so as to improve the space utilization of the cache.

Similar to the CPU cache architecture, we also adopt the hierarchical cache strategy. The first level is memory, and the second level is local disk. For extremely hot data cached into memory, all reads can directly reference the memory of the cache itself, without the need for memory copy. In the scenario of constantly updated data, new data will usually cause cache miss, which will cause query delay jitter.

At present, we have done some POCs. POC shows that in the case of SSB multi-table performance test, the performance of cache is more than three times faster than that of non-cache, and it is basically close to the StarRocks local table. Caching helps us ensure that the single source of truth achieves high performance at the same time. Because of the characteristics of caching, users can truly achieve elastic scaling and cost effectiveness. For delay-sensitive scenarios, increase cache space to reduce query latency. For delay insensitive scenarios, reduce or do not use cache to save costs.

Users usually want to further process, pre-aggregate or model data to further meet the performance and quality requirements of business for data analysis, and at the same time, they can also save the cost of repeated calculation. However, whether it is Lambda architecture or Kappa architecture, users need to build complex data stacks for further processing data on the data lake. At the same time, users also need to maintain metadata and processed multiple copies of data separately to deal with the consistency of data.

In order to meet users' needs for data processing and modeling, and further integrate lakes and warehouses, we will bring users more powerful materialized view capabilities to solve the above problems.

First of all, materialized views are defined by SQL, and data processing and modeling become extremely simple. Secondly, the materialized view can integrate the metadata of different data and provide a unified view externally. Users can achieve transparent acceleration of query automatic routing without rewriting the query SQL. The view of StarRocks supports real-time incremental updates, providing users with more real-time analysis capabilities. Finally, materialized view, as the native capability of StarRocks, greatly reduces the operation and maintenance costs. Through the materialized view, the data lake can truly achieve the single source of truth, help users more easily process and model data on the data lake, break the dimensional walls of the lake and the warehouse, and simplify the architecture of the entire data stack.

Summary and outlook

The core of StarRocks data lake analysis is: fast, unified, simple and easy to use.

Through Connector and Data Catalogs, data source access becomes extremely simple. Through caching, the IO performance of the data lake storage system will no longer become a bottleneck. Through the materialized view, the flow of lake and warehouse data is more natural, the lake and warehouse views are consistent, the query can be transparent and accelerated, and the data stack architecture becomes more simple. Finally, with the flexibility of the cloud and K8S, StarRocks data lake analysis can achieve true flexibility and cost effectiveness.

Related Articles

Explore More Special Offers

  1. Short Message Service(SMS) & Mail Service

    50,000 email package starts as low as USD 1.99, 120 short messages start at only USD 1.00

phone Contact Us