Databricks Enterprise Edition Spark and Delta Lake Engine Helps Lakehouse Access Efficiently

Databricks Enterprise Edition Spark Introduction:


This article introduces the performance advantages of Databricks Enterprise Edition Delta Lake, which can greatly improve the query performance of Spark SQL and speed up the query speed of Delta tables.

Databricks Enterprise Edition Spark ,Background introduction

Databricks Enterprise Edition Spark, Databricks is the world's leading Data+AI company, the founding company of Apache Spark, and the largest code contributor to Spark. The core is to build enterprise-level Lakehouse products around open source ecosystems such as Spark, Delta Lake, and mellow. In 2020, Databricks and Alibaba Cloud jointly built a fully managed big data analysis & AI platform on the cloud based on Apache Spark - Databricks Data Insight (DDI, Databricks DataInsight ), providing users with data analysis, data engineering, data science, and artificial intelligence, etc. Aspects of services to build an integrated Lakehouse architecture.

Databricks Enterprise Edition Spark, a Data lake product that Databricks has developed internally since 2016 to support transactions, and was officially open sourced in 2019. In addition to the community-led open-source version of Delta Lake OSS, Databricks commercial products also provide an enterprise version of the Spark&Detla Lake engine. This article will introduce how the product features provided by the enterprise version optimize performance and facilitate efficient access to Lakehouse.

Databricks Enterprise Edition Spark, An optimized solution to the small file problem



Databricks Enterprise Edition Spark, Frequent execution of merge, update and insert operations in Delta Lake, or continuous data insertion into the Delta table in stream processing scenarios, will result in a large number of small files in the Delta table. The increase in the number of small files will, on the one hand, reduce the amount of data that Spark reads serially each time, reducing the reading efficiency. read efficiency.
In order to solve the problem of small files, Databricks provides three optimization features, from avoiding the generation of small files and merging small files automatically/manually to solve the small file problem of Delta Lake.

Feature 1: Databricks Enterprise Edition Spark, Optimize the writing of the Delta table to avoid the generation of small files

Databricks Enterprise Edition Spark, In the open-source version of Spark, when each executor writes data to a partition, it will create a table file for writing, which will eventually lead to many small files in a partition. Databricks optimizes the writing process of the Delta table. For each partition, a dedicated executor is used to merge the writes of other executors to the partition, thereby avoiding the generation of small files.

This feature is controlled by the table property delta.autoOptimize.optimizeWrite :

Can be specified when creating the table
CREATE TABLE student ( id INT , name STRING )
TBLPROPERTIES ( delta.autoOptimize.optimizeWrite = true );

You can also modify table properties
ALTER TABLE table_name
SET TBLPROPERTIES ( delta.auto-optimize.optimizeWrite = true );
This feature has two advantages:

1.Improve the throughput of writing data by reducing the number of table files being written;

2. Avoid the generation of small files and improve query performance.
Its shortcomings are also obvious. Since an executor is used to merge the writing of the table file, the parallelism of the table file writing is reduced. In addition, an additional layer of executor needs to be introduced to shuffle the written data, which brings additional s expenses. Therefore, when using this feature, the scenario needs to be evaluated:

Where SQL statements such as MERGE, UPDATE, DELETE, INSERT INTO, CREATE TABLE AS SELECT are frequently used ;

Scenarios where this feature is not applicable: Write data above TB level.

Feature 2: Databricks Enterprise Edition Spark, Automatically merge small files

In stream processing scenarios, such as streaming data into the lake scenario, it is necessary to continuously insert the arriving data into the Delta table, and a new table file will be created for each insertion to store the newly arrived data, assuming that it is triggered every 10s Once, the number of table files generated by such a stream processing job in a day will reach 8640, and since stream processing jobs are usually long-running, running the stream processing job for 100 days will generate millions of table files. For such a Delta table, the maintenance of metadata alone is a big challenge, and the query performance deteriorates sharply.
In order to solve the above problems, Databricks provides the function of automatic merging of small files. After each data is written to the Delta table, it will check the number of table files in the Delta table. If the small files in the Delta table (size < 128MB are considered as If the number of small files) reaches the threshold, a small file merge will be performed to merge the small files in the Delta table into a new large file.
This feature is controlled by the table property delta.autoOptimize.autoCompact , which is the same as the feature delta.autoOptimize.optimizeWrite . It can be specified when creating a table, or it can be modified for an already created table. The threshold for automatic merging is controlled by spark.databricks.delta.autoCompact.minNumFiles , the default is 50, that is, when the number of small files reaches 50, table file merging will be performed; the maximum file size after merging is 128MB. If you need to adjust the size of the merged target file , which can be achieved by adjusting the configuration spark.databricks.delta.autoCompact.maxFileSize .

Feature 3: Databricks Enterprise Edition Spark, Manually merge small files

Automatic small file merging is triggered when the Delta table is written and the small files in the table reach the threshold after writing. In addition to automatic merging, Databricks also provides the Optimize command to allow users to manually merge small files, optimize the table structure, and make the table file structure more compact. In terms of implementation, Optimize uses the bin-packing algorithm, which not only merges the small files in the table, but also generates a more balanced table file (the table file size is similar). For example, if we want to optimize the table file of the Delta table student, we only need to execute the following command:
OPTIMIZE student;
Optimize command supports not only the merging of small files in the whole table , but also the merging of table files in specific partitions. For example, we can only merge small files in partitions with a date greater than 2017-01-01:
OPTIMIZE student WHERE date >= '2017-01-01'
of Databricks data insight product, Optimize can improve the query performance by more than 8x.
Query optimization technology comparable to enterprise-level databases
Databricks has also made many optimizations in data query, including:
Feature 1: Data Skipping
In a data query system, there are two classic query optimization techniques: one is to process data faster, and the other is to reduce the amount of data that needs to be scanned by skipping irrelevant data. Data Skipping belongs to the latter optimization technology, which skips irrelevant table files through the statistics of table files, thereby improving query performance.
When adding a table file to the Delta table, Delta Lake will store the statistics of the first 32 columns of the data in the table file in the metadata of the Delta table, including the maximum and minimum values of the data columns, and the number of null rows , at query time, Databricks will use these statistics to improve query performance. For example: for the x column of a Delta table, assuming that the minimum value of the x column of a table file of the table is 5 and the maximum value is 10, if the query condition is where x < 3, then according to the statistical information of the table file, we can It is concluded that the table file must not contain the data we need, so we can directly skip the table file to reduce the amount of scanned data and improve query performance.

The implementation principle of Data Skipping is similar to that of the Bloom filter. The query conditions are used to determine whether there may be data to be queried in the table file, thereby reducing the amount of data that needs to be scanned. If the queried data cannot exist in the table file, it can be skipped directly. If the queried data may exist in the table file, the table file needs to be scanned.
In order to skip as many unrelated table files as possible, we need to narrow the min-max gap of the table files, so that similar data can be gathered in the files as much as possible. To give a simple example, suppose a table contains 10 table files. For the x column in the table, its value is [1, 10]. If the distribution of the x column of each table file is [1, 10] ], then for the query condition: where x < 3, no table file can be skipped, so performance improvement cannot be achieved, and if the min-max of each table file is 0, that is, in the x column of table file 1 The distribution is [1, 1], and the x column distribution of table file 2 is [2, 2]. . . , then for the query condition: where x < 3, 80% of the table files can be skipped. Inspired by this idea, Databricks supports the use of Z-Ordering to aggregate data, narrow the min-max gap of table files, and improve query performance. Below we introduce the principle and use of Z-Ordering optimization.

Feature 2: Databricks Enterprise Edition Spark, ,Z-Ordering Optimization

As explained in the previous section, in order to skip as many irrelevant table files as possible, the columns used as query conditions in the table file should be as compact as possible (that is, the min-max gap should be as small as possible). Z-Ordering can achieve this function, it can store related information in the same set of files in multiple dimensions, so to be precise, Z-Ordering is actually a data layout optimization algorithm, but combined with Data Skipping, it Can significantly improve query performance.
The use of Z-Ordering is very simple. For table events , if the columns eventType and generateTime are often used as query conditions, then execute the command:
OPTIMIZE events ZORDER BY ( eventType , generateTime )
The Delta table will use the columns eventType and generateTime to adjust the data layout to make the eventType and generateTime in the table file as compact as possible.
According to our experiments on Databricks DataInsight , using Z-Ordering optimization can achieve a performance improvement of 40 times. For specific test cases, please refer to the official documentation of Databricks Data Insight at the end of the article.
Feature 3: Bloom Filter Index
Bloom filters are also a very useful data-skipping technique. This technology can quickly determine whether the table file contains the data to be queried, and if not, skip the file in time, thereby reducing the amount of scanned data and improving query performance.
If a bloom filter index is created on a column of the table, and where col = "something" is used as the query condition, then when scanning files in the table, we can use the bloom filter index to draw two conclusions: file must not contain a line with col = "something" , or it is possible that the file contains a line with col = "something" .
•When it is concluded that the file contains no lines with col = "something" , the file can be skipped, thereby reducing the amount of data scanned and improving query performance.
•engine will only process the file when it concludes that it may contain a line with col = "something" . Note that this is only to determine that the file may contain target data. The Bloom filter defines an indicator to describe the probability of a judgment error, that is, the probability that the file contains the data to be queried, but in fact the file does not contain the target data, and is called FPP (False Positive Probability (False Positive Probability). : false positive probability).
Databricks supports file-level Bloom filters. If a Bloom filter index is created on some columns of the table, each table file in the table will be associated with a Bloom filter index file, and the index file is stored in the same directory as the table file. in the _delta_index subdirectory. Before reading the table file, Databricks will check the index file and judge whether the table file contains the data that needs to be queried according to the above steps.
The creation of a bloom filter index is similar to that of a traditional database index, but requires specifying the false positive probability and the number of possible values for the column:
CREATE BLOOMFILTER INDEX ON TABLE table_name
FOR COLUMNS ( col_name OPTIONS ( fpp = 0.1 , numItems = 50000000 ))
According to our experiment on Databricks DataInsight , using Bloom filter index can achieve more than 3 times performance improvement. For the test case, please refer to the official documentation of Databricks Data Insight at the end of the article.
Feature 4: Dynamic file pruning
Dynamic file pruning (DFP) is similar to dynamic partition pruning (Dynamic Partition Pruning), both of which are pruned during the Join execution phase of dimension tables and fact tables, reducing the amount of data scanned and improving query efficiency.
Let's take a simple query as an example to introduce the principle of DFP:
SELECT sum( ss_quantity ) FROM store_sales
JOIN item ON ss_item_sk = i_item_sk
WHERE i_item_id = 'AAAAAAAAICAAAAAA'
In this query, item is a dimension table (with a small amount of data), store_sales is a fact table (with a very large amount of data), and the where query condition acts on the dimension table. If DFP is not enabled, the logical execution plan of the query is as follows:

Databricks Enterprise Edition Spark, As can be seen from the above figure, first perform a full table scan on store_sales , and then join with the rows of the filtered item table. Although the result is only more than 40,000 pieces of data, more than 8 billion pieces of data in the table store_sales are scanned. data. For this query, a very intuitive optimization is: first query the i_item_id = 'AAAAAAAAAICAAAAAA' data row in the table item , and then use the i_item_sk value of these data rows as the query condition of the ss_item_sk of the table store_sales to filter in the SCAN stage of the table store_sales, combining The Data Skipping technique we introduced above can greatly reduce the scanning of table files. This idea is the fundamental principle of DFP. The logical execution plan after DFP is started is shown in the following figure:

It can be seen that after DFP is enabled, the filter conditions are pushed down to the SCAN operation, and only more than 6 million data in store_sales are scanned. From the results, after DFP is activated, the performance of this query is improved by 10 times In addition, Databricks also tested TPC-DS for this feature. The test found that after enabling DFP, the 15th query of TPC-DS achieved an 8-fold performance improvement, and 36 queries achieved a 2-fold or more performance improvement.

Databricks Enterprise Edition Spark, Summarize
The previous article briefly introduced the performance advantages of Databricks Enterprise Edition Delta Lake. With these features, the query performance of Spark SQL can be greatly improved and the query speed of Delta tables can be accelerated. Databricks provides value to many enterprises based on the enterprise version of Lakehouse architecture. You can try Alibaba Cloud Databricks data insights for 599 yuan and experience the extreme performance of the enterprise version of Spark&Delta Lake engine.

Related Articles

Explore More Special Offers

  1. Short Message Service(SMS) & Mail Service

    50,000 email package starts as low as USD 1.99, 120 short messages start at only USD 1.00