EMR creates efficient cloud native data analysis engine

Build a data analysis platform on the cloud based on an open source system

The main reasons why customers choose open source solutions are as follows:

• Flexible and diverse business scenarios: At present, even a small enterprise may store various data, such as business data, log data, and graph data. In this case, a highly customized system is required to Connect different business scenarios in series;
• Have professional operation and maintenance capabilities: the open source system has sufficient talent reserves, rich online information and strong backing of open source, which can ensure the smooth development of the company's business;
• Multiple business needs vs. cost pressure: Each cloud product has its own usage scenarios. For small and medium-sized enterprises, purchasing multiple cloud products will cause great cost pressure. While the required components in the user's business can be reduced, the user's cost can be reduced.

The figure below is the product architecture diagram of Alibaba EMR system. There are two main ways for users to go to the cloud. One is to purchase ECS resources and build an open source system by themselves; the other is to directly choose Alibaba's EMR system. The first method involves many open source system components, including Spark, Hive, Flink, and TensorFlow. It is very complicated for users to build a complete big data system from scratch, especially if the cluster size of hundreds or thousands is also difficult for operators. Dimensions pose a great challenge. The use of EMR systems has the following advantages:

1) The Alibaba Cloud EMR system can help users to automatically deploy and configure related components with one click, and it can be used out of the box. At the same time, it can automatically recommend and adjust parameters according to the user's machine type.

2) The Alibaba Cloud EMR system is connected with other Alibaba Cloud products. For example, the data is stored in OSS, and the EMR system can easily read the data on OSS without additional authentication configuration;

3) Alibaba Cloud EMR system integrates many self-developed plug-ins, which are not available in other products;

4) All components of the Alibaba Cloud EMR system are compatible with open source but better than open source. For example, Flink integrates Alibaba Cloud’s self-developed Blink and TensorFlow (PAI). To the internal technology of Alibaba Cloud;

5) Alibaba Cloud EMR system provides a platform-wide job diagnosis and alarm component APM to realize automatic operation and maintenance, which greatly reduces the complexity of cluster operation and maintenance;

6) The Alibaba Cloud EMR system is also connected to DataWorks, and users can use DataWorks as an entry point to use the EMR system in a foolish way.

There are three main goals of the EMR system:

• Platformization: Make EMR a unified data analysis platform on the cloud, help users create full-stack big data solutions, support full series of VM containerization, and provide enterprise-level HAS and big data APM;

• Technical community & depth: Continue to deepen the technical community, create a big data-friendly cloud native storage, and at the same time give back the technology to the community and make contributions to the community;

• Ecosystem: The EMR system will combine with other Alibaba Cloud products to build an ecosystem, connect to Blink and PAI, and integrate OSS and OTS solutions.

EMR-Jindo: Cloud Native Efficient Data Analysis Engine

The figure below shows the TPC-DS benchmark test report. It can be found that in the 10TB test in March 2019, the performance index score was about 1.82 million, and the cost was 0.31 USD; while the same test performance index score in October 2019 has changed The cost is 5.26 million, and the cost has dropped to 0.53 CNY, which means that after about half a year, the performance has increased by 2.9 times, and the cost has been reduced to a quarter of the original. At the same time, Alibaba also became the first manufacturer to submit the TPC-DS test 100TB test report. Behind these achievements is the support of EMR-Jindo engine.

The EMR-Jindo engine architecture is mainly divided into two parts:

• Jindo-Spark: The fully optimized Spark efficient computing engine inside EMR can handle a variety of computing tasks;

• Jindo-FS: Self-developed cloud-native storage engine, compatible with the open source HDFS interface, taking into account both performance and price.

1) Jindo-Spark
The Jindo-Spark high-efficiency computing engine has adopted a series of optimization measures for Spark, such as Runtime Filter to support adaptive runtime data clipping; Enhanced Join Reorder to solve problems such as outer connection rearrangement; TopK supports reasoning and pushes down TopK logic to help Filter data accurately; File Index supports file-level filtering and min/max/bloom/inverting, etc.; self-developed Relational Cache, so that the query can be upgraded from minutes to sub-seconds using a set of engines; for specific scenarios Introduced the Spark Transaction function and introduced Full ACID support for Spark; implemented the Smart Shuffle function to reduce the number of sort-merge from the bottom layer and improve the efficiency of Shuffle.

• Runtime Filter:
Similar to Dynamic Partition Pruning (DPP) in Spark, but it is more powerful than DPP. In addition to the analysis tables that DPP can handle, Runtime Filter can also handle non-analysis tables. The basic principle is to dynamically crop data at runtime to avoid unnecessary calculations. For example, in the face of a join query, the data cannot be filtered by pushing the value down to the storage layer, and the final data magnitude cannot be predicted during logical calculation. In this case, if it is an analysis table, Runtime Filter will first estimate the amount of data involved in the join operation in one of the tables. If the amount of data is small, it will filter the data in advance and then push it to the other side for data filtering; The analysis table will introduce Filter, such as BloomFilter to obtain Min or Max statistics,

According to these statistical information, the side with fewer candidate data is extracted and pushed to the other side for filtering. The cost of Runtime Filter is very small, and it only needs to be evaluated in the optimizer, but it can bring significant performance improvement. As shown in the figure below, Runtime Filter has achieved an overall performance improvement of about 35%. This feature has been submitted as a PR in Spark (SPARK-27227).

• Enhanced Join Recorder:
Everyone knows that the execution order of operators may greatly affect the execution efficiency of SQL. In this case, the core principle of optimization is to change the execution order of operators and filter data as early as possible.

For example, in the example in the upper left corner of the figure below, if the bottom two tables are very large, the overhead of joining these two tables will be very high. After the join, the big data is then joined to the small table, and the big data is transferred layer by layer. If it continues, it will affect the execution efficiency of the entire process. At this time, the idea of optimization is to filter out some irrelevant data in the large table first, so as to reduce the amount of data transmitted downstream. To solve this problem, Spark uses a dynamic programming algorithm, but it is only applicable to the case where the number of tables is relatively small. If the number of tables is greater than 12, the algorithm is helpless. In the case of a large number of tables, EMR provides a multi-table join genetic algorithm, which can reduce the 2n complexity of the original dynamic programming algorithm to a linear level, and can complete the join of hundreds or thousands of tables.

In the upper right corner of the figure below, you can see that Query64 has 18 tables participating in the join, and the optimization time of the dynamic programming algorithm takes 1400 seconds, while the genetic algorithm of multi-table join only takes about 20 seconds to complete. Another important function of Join Recorder is the outer join rearrangement algorithm. Everyone knows that the order of outer joins in SQL cannot be exchanged at will, but this does not mean that the order cannot be exchanged. For example, A left join B, and then left join C. In fact, in The order is interchangeable under certain conditions. In Spark, the optimization of outer joins is directly abandoned, while EMR has found the sufficient and necessary conditions for order exchange based on existing research, and implemented the outer join rearrangement algorithm (as shown in the lower left corner of the figure below), the outer join The execution efficiency has been improved qualitatively (the lower right corner of the figure below)

• Relational Cache:
Spark's original Cache has several limitations. First, Spark's Cache is at the session level. If a certain Query fragment is found to be used frequently, a cache will be created for the session, but the cache will disappear after the session ends; Second, Spark's Cache is stored locally, not distributed, so it cannot be used universally. On this basis, the EMR platform implements Relational Cache, which creates a cache for any abstract data entity of relational data such as Spark tables, views, or Datasets, which is similar to Materialized View, but has more functions than Materialized View. The usage scenarios of Relational Cache include a) sub-second response MOLAP engine; b) interactive BI, Dashboard; c) data synchronization; d) data pre-organization.

The creation process of Relational Cache is as follows, and its syntax is similar to the common DDL of Spark sql. First, cache a table or view, and then specify the Relational Cache update strategy (DEMAND or COMMIT), whether it is used for subsequent optimization, the storage method of Cache data, and the view logic of Cache. Relational Cache supports caching any Table and View, any data source such as cache to memory, HDFS, OSS, and any data format such as JSON, ORC, or Parquet.

Relational Cache also supports the optimization of sql entered by the user. The original Spark sql Cache is very rigid in optimizing the sql input by the user, and the sql input by the user must be exactly matched with the Cache before it can be used. The Relational Cache is completely different. If there are four join caches of tables a, b, c, and d, and when there are three joins of tables a, b, and e, the results of the join of a and b can be obtained from the four tables. Read from the cache data generated during table join. The right side of the figure below shows the benchmark test results with and without Cache. It can be seen that Relational Cache can guarantee the response time of the test at the sub-second level.
Please refer to Spark Relational Cache to achieve interactive analysis of sub-second response

• Spark Transaction: Some users may use Hive tables, which have transaction support, but Spark is not compatible with Hive in terms of transactions. Therefore, in order to meet the scenario support of user data correction/deletion and data flow import, the EMR platform provides Spark Transaction ACID support for transactions.

Traditional data import is done in batches, such as once a day, while in streaming data import scenarios, the data is raw data written in real time without any processing, so there will be delete and update requirements. Generally speaking, Spark Transaction is an implementation of lock + MVCC, and MVCC is inseparable from the underlying storage. When Hive and Spark are compatible, big data is stored in the directory in the form of files. The version of the file is controlled by row, and each row written will add Meta Columns, such as op, original_write-id, bucket id, and row_id etc., to identify that this is the only row in the entire table. When a row needs to be updated, the row will not be updated in place, but the row will be taken out and rewritten to generate a new version for storage. When reading, multiple versions will be merged and returned to the user.

###2) Jindo-FS
EMR launched a local disk model in the early days. Using this model to deploy a cluster is similar to using a local cluster to deploy a big data distribution under the cloud, and the price is relatively high. In addition, because HDFS had a metadata bottleneck at that time, the dynamics of local storage Scalability is facing great challenges. Aiming at this problem, the solution is to separate computing and storage, and store data on OSS. However, the direct result of this separation is poor performance, because OSS metadata operations are time-consuming, and reading data spans the network. Transfer bandwidth can also seriously affect performance.

A further solution is to pull data from the remote end to the computing side for caching, which is what Jindo-FS does. Jindo-FS is a system similar to HDFS, and its architecture is also similar to the Master-Slave architecture of HDFS, which is divided into Name Service and Storage Service. It supports placing some tables with high access frequency in RocksDB for multi-level caching. Jindo-FS as a whole is different from the Master node of HDFS. The "Master" (Name Service) of Jindo-FS is a distributed cluster that uses the raft protocol to provide ingress services; it provides support for multiple Name Spaces; metadata is stored in the form of kv in In the high-performance kv store; because it does not store data itself, the real data is in OSS and OTS, so it supports elastic data expansion and destruction reconstruction.


The metadata management at the bottom of Jindo-FS will split the data into a series of kv, and query layer by layer through the incremental id. For example, /home/Hadoop/file1.txt needs to read OTS three times. The test results on the right side of the figure below show that Jindo-FS has a better performance improvement than OSS in terms of metadata operations.

Jindo-FS uses Storage Service for underlying storage. During the writing process, Storage Service stores the files to be written locally and in OSS at the same time, and then returns the written results to the user. At the same time, it also performs multi-copy transmission in the cluster nodes ; while the read operation is similar to HDFS, if it hits the local, it will be read in the local storage, otherwise it will be read remotely. Storage Service has the characteristics of high performance, high reliability, high availability, and elastic storage. In order to support high performance, Jindo-FS has established a high-speed data flow channel, and also has a series of strategies, such as reducing the number of memory copies.

Related Articles

Explore More Special Offers

  1. Short Message Service(SMS) & Mail Service

    50,000 email package starts as low as USD 1.99, 120 short messages start at only USD 1.00

phone Contact Us