Best practice of data lake construction based on DeltaLake

1、 Business background

Homehelp is an online education company based on technology. At present, it has tool products homework help, homework help mental arithmetic, K12 live class products homework help live class, quality education products deer programming, deer writing, deer art, and other intelligent learning hardware such as meow machine. Operation helps several business systems such as teaching and research center, teaching center, coaching operation center, big data center, etc., continuously endow more quality education products, and constantly bring better learning and use experience to users. The big data center, as the center of the basic system, is mainly responsible for the construction of company-level data warehouse, providing business-oriented data information to each product line, such as retention rate, attendance rate, active number, etc., to improve the efficiency and quality of operational decision-making.

The above figure shows the overview of the data center of the job shop. It is mainly divided into three layers:

• The first layer is data products and enabling layer

It is mainly a data tool and product built based on the subject data domain to support business intelligence, trend analysis and other application scenarios.

• The second layer is the global data layer

Through OneModel unified modeling, we have standardized modeling of the accessed data, and built the subject data of the business domain according to the scenarios with different timeliness to improve the efficiency and quality of the upper products.

• The third layer is the data development layer

A series of systems and platforms have been built to support all data development projects in the company, including data integration, task development, data quality, data service, data governance, etc.

The content of this sharing is mainly to solve the performance problems in the production and use of offline data warehouse (day level and hour level).

2、 Problems&pain points

Based on Hive, the job help offline data warehouse provides data construction capability from ODS layer to ADS layer. When the ADS table is generated, it will be written into the OLAP system through data integration to provide BI services for managers; In addition, DWD, DWS and ADS tables will also provide offline data exploration and data retrieval services for analysts.

With the gradual development of business and the increasing amount of corresponding data, the offline data warehouse system highlights the following main problems:

• The output delay of ADS table is getting longer and longer

Due to the increase of data volume, the construction time of the full link from ODS layer to ADS layer is getting longer and longer. Although the very core ADS table chain can be solved in a short time by tilting the resources, in fact, this is essentially the model of car loss and marshaling. This model can not be replicated in a large scale, which affects the timely output of other important ADS tables. For example, for analysts, due to the delay of data tables, the worst time for T+1 tables is to wait until T+2.

• It is difficult to meet the demand of hourly meter

Some scenarios are hour-level output tables. For example, some activities need hour-level feedback to adjust the operation strategy in time. For such scenarios, as the amount of data increases and the resources of the computing cluster are tight, it is often difficult to guarantee the timeliness of the hour-level table. In order to improve the computing performance, it is often necessary to prepare sufficient resources in advance to do it, especially when the hour-level computing of day-level data is required, the computing resources need to be expanded by 24 times in the worst case.

• Slow data exploration and poor data retrieval stability

After the data is produced, it is often used for analysts. Direct access to Hive takes tens of minutes or even hours, which is totally unacceptable. Users often receive roast from users, and Presto is used to speed up the query of Hive tables. Because of Presto's architectural characteristics, the queried data tables cannot be too large and the logic cannot be too complex, otherwise Presto memory OOM will result, In addition, Hive's existing UDF and VIEW cannot be directly used in Presto, which also limits the use scenarios of analysts.

3、 Solution

problem analysis

Whether the output of the whole link from the conventional ODS layer to the ADS layer is slow, or the probe access to the specific table is slow, in essence, it means that the computing performance of the Hive layer is insufficient. From the above scenario analysis:

• The reason for the slow link calculation: Hive does not support incremental updates, while the Mysql-Binlog from the data source of the business layer contains a large amount of update information. Therefore, in the ODS layer, it is necessary to use incremental data and historical full data to form new full data after duplication. The following DWD, DWS, and ADS are similar principles. This process brings about a large number of repeated calculations of data, and also delays in data output.

• The reason for slow data query: Hive itself lacks the necessary index data, so whether it is the calculation of heavy throughput or the query that wants to guarantee the minute delay, it will be translated into MR-Job for calculation, which leads to the slow output of query results in the scenario of rapid data exploration.

Scheme research

From the above analysis, if we can solve the problem of data incremental update of offline data warehouse, we can improve the performance of link calculation, and support the indexing ability of data table, we can reduce the query delay without degrading the query function.

• HBase+ORC-based solution

HBase can be used to solve the problem of data updating. Set the RowKey as the primary key and the columns as Column, which can provide the ability to write data in real time. However, due to the structure of HBase, the query performance of non-primary key columns is very poor. In order to solve its query performance, it is necessary to export HBase tables to HDFS and store them in ORC format on a regular basis (for example, hourly tables and day-level tables are day-level tables) after sorting according to specific fields. However, ORC format only supports single-column min and max indexes, and the query performance still cannot meet the requirements. Moreover, because the data writing of HBase continues to occur, the export time is difficult to control, The data may also change during the export process. For example, if we want to export the data before 21:00 on December 11 as the data in the 21:00 partition of the data table, we need to consider the number of versions, storage capacity, computing performance brought by filtering, and other factors. The system complexity increases sharply. At the same time, the introduction of HBase system increases the operation and maintenance costs.

• Data Lake

Data lake is actually a data format that can be integrated between mainstream computing engines (such as Flink/Spark) and data storage (such as object storage) without introducing additional services. It also supports real-time Upsert, provides multi-version support, and can read any version of data.

At present, the data lake schemes mainly include DeltaLake, Iceberg and Hudi. We have investigated the three solutions on Alibaba Cloud, and their differences and characteristics are as follows:

In addition, considering the ease of use (DeltaLake has clear semantics, Alibaba Cloud provides full-function SQL syntax support, and is easy to use; the latter two have a high threshold for use), functionality (only DeltaLake supports Zorder/Dataskipping query acceleration), and other aspects, combined with our scenario comprehensive consideration, we finally chose DeltaLake as the data lake solution.

4、 Offline data warehouse based on DeltaLake

After introducing DeltaLake, our offline data warehouse architecture is as follows:

First of all, Binlog is written to Kafka through our self-developed data distribution system after being collected by Canal. Here, we need to explain in advance that our distribution system needs to strictly maintain the order of Binlog according to Table level. The reasons are detailed below. Then use Spark to write data to DeltaLake in batches. Finally, we upgraded the data retrieval platform and used Spark SQL to retrieve data from DeltaLake.

In the process of using DeltaLake, we need to solve the following key technical points:

Stream data transfer to batch

In the business scenario, the ETL tasks of offline data warehouse are triggered according to the data table partition readiness. For example, the tasks on 2021-12-31 will rely on the data table partition readiness on 2021-12-30 to trigger the operation. This scenario is easily supported on Hive's system because Hive naturally supports partitioning by date fields (such as dt). But for DeltaLake, our data writing is streaming, so we need to convert the streaming data into batch data, that is, after the data is completely ready one day, we can provide the reading ability of the corresponding day-level partition.

How to define complete data readiness

Streaming data will generally be out of order. In the case of out-of-order, even if the Watermark mechanism is adopted, it can only guarantee the data order within a certain time range. For offline data warehouse, the data needs to be 100% reliable and not lost. If we can solve the problem of data source order, the solution of data readiness will be much simpler: if the data is partitioned according to the day level, then when there is 12-31 data, we can think that 12-30 data is ready.

Therefore, our scheme is divided into two sub-problems:

• Define the batch data boundary after the stream data is ordered

• Mechanism to ensure orderly flow data

First of all, for the former, the overall plan is as follows:

• Set the logical partition field dt of the data table and the corresponding time unit information.

• When Spark reads a batch data, it uses the event time in the data to generate the corresponding dt value according to the above table metadata. If the value of event time in the data stream belongs to T+1, the snapshot that generates the data version T will be triggered, and the data will be read according to the corresponding data version information found in the snapshot.

How to solve the disorder of stream data

Whether it is app-log or MySQL-Binlog, the log itself is orderly. For example, MySQL-Binlog, the Binlog of a single physical table must be orderly. However, in the actual business scenario, the business system will often use sub-database and sub-table. For the scenario of using sub-table, a logical table table will be divided into Table1, Table2,... tables, and for the ODS table of offline data warehouse, You need to mask the details and logic of MySQL table splitting on the business side. In this way, the problem focuses on how to solve the problem of data ordering in the table splitting scenario.

• Ensure the orderliness of data written to Kafka in different databases and tables, even in different clusters. That is, the data read from a topic to the logical table by the Spark that writes to DeltaLake is partition granularity.

• Ensure the timeliness of the ODS table readiness. For example, if there is no Binlog data, the ODS layer data can also be ready on schedule.

The original system needs to be upgraded here. The scheme is as follows:

As shown in the figure above, the Binlog of a MySQL cluster is written to a specific Kafka-topic after being collected by Canal. However, because the partition is determined by hash according to the db and Table (to divide the table _ * suffix) when writing, the Binlog of multiple physical tables will exist in a single partition, which is very unfriendly for writing to DeltaLake. Considering the compatibility with other data applications, we have added data distribution services:

• Write the data of the logical table name (de split table _ * suffix) to the corresponding topic, and use the physical table name to hash. Ensure that the internal data of a single partition is always in order, and the data of only one logical table is included in a single topic.

• An internal heartbeat table is built in the MySQL cluster to monitor the delay exception of the Canal collection, and a certain threshold is set based on this function to determine whether the system has a problem or no data when there is no Binlog data in the system. If it is the latter, DeltaLake will also be triggered for savepoint, and then snapshot will be triggered in time to ensure that the ODS table is ready in time.

Through the above scheme, we stream Binlog data into DeltaLake, and the table partition ready time delay is<10mins.

Read-write performance optimization

Next, we will talk about the performance problems we encountered in the process of using DeltaLake and the corresponding solutions.

Improve write performance through DPP

DeltaLake supports writing data through SparkStreamingSQL.

Because we need to merge and de-duplicate records, we need to write them by merge into. DeltaLake updates data in two steps:

• Locate the file to be updated. By default, you need to read all the files and the incremental data of the batch in Spark to join and associate the files to be updated.

• Rewrite these files after Merge and mark the old files for deletion.

As shown in the left figure above, because DeltaLake will read the full file of the previous version by default, the write performance is extremely low, and a merge operation cannot be completed in a single batch of Spark.

For this scenario, DeltaLake was upgraded: DPP was used for partition pruning to optimize the performance of Megre into, as shown in the right figure above:

• Analyze the Merge-on condition to get the fields corresponding to the partition fields of the DeltaLake table in the source table.

• The enumeration list of partition fields is obtained by statistics.

• Convert the result of the previous step into a Filter object and apply it to further filter the list of trimmed data files.

• Read the final data file list and associate it with the source data of the batch to get the final file list to be updated.

After DPP optimization, the processing delay of Spark batch (5min granularity) is reduced from the maximum of 20mins+to the maximum of 3mins, which completely eliminates the problem that the delay continues to stack due to the long processing time in the past.

Use Zorder to improve reading performance

After solving the problem of data write performance, we encountered the problem of data read performance.

We use the same data (20 billion+) and use Hive to calculate the average delay of 10min+, but after using DeltaLake, the average delay is as high as~11min+. After analysis, it is found that the main reason is that the filter column is not sorted by using Zorder. When Zorder is enabled, the delay is reduced to~24s, improving the performance of nearly 25X.

The query optimization of DeltaLake table based on Zorder mainly involves two aspects:

• Dataskipping

• DeltaLake will count the max/min value of each field according to the file granularity, which is used to filter the data file directly.

• Zorder

• A method of data layout, which can rearrange data to ensure the data locality of the Zorder field as much as possible.

Zorder construction time optimization

For which columns, the Zorder is built on demand. Generally, the construction time is~30mins. When the data is skewed, the construction time of Zorder is up to~90mins.

Zorder is optimized for these two situations:

• In general, for multi-column Zorder, it is changed from traversing the dataset multiple times to traversing the dataset once to improve the construction efficiency. The construction time is reduced from an average of~30mins to~20mins.

• In the case of skewed data, the bucket where the skewed column is located has been decentralized, and the construction time has been reduced from an average of~90 minutes to~30 minutes.

Overall effect

After more than half a year of development and optimization, the offline data warehouse based on DeltaLake has been launched recently. The focus is on improving the query optimization of analysis. At the same time, it also provides support for scenarios with hourly full demand. The overall effect is as follows:

• Faster preparation time: after the ODS is replaced with DeltaLake, the production time is advanced from 2:00 a.m. - 3:00 a.m. to about 00:10 a.m., and the production time is advanced by more than 2 hours.

• Wider capacity expansion: Big data has the ability to support the hourly full scale. With the deltaLake incremental update feature, it can achieve the hourly full scale demand at a low cost and avoid the consumption of reading the full data under the traditional scheme. At present, it has been applied to some core businesses to build the hour-level full scale, and at the same time, the timeliness is guaranteed to be reduced from~40mins in the past to~10mins.

• Query speed improvement: We focus on improving the efficiency of ad hoc query of analysts. After migrating the data warehouse table commonly used by analysts to Deltake, we use Zorder to realize query acceleration, and the query speed is reduced from the past few minutes to~3 minutes.

5、 Future planning

With the use of DeltaLake in the job shop, there are still some problems to be solved:

• Improve the efficiency of revision.

• When using Hive, we can easily repair a historical partition independently, but when repairing the DeltaLake table, we need to rollback all versions after the failed version.

• Hive engine is fully supported.

• At present, we use DeltaLake, mainly to solve the problems of using Hive to query slowly and using Presto to limit complex queries in the past, and provide solutions for complex queries and low latency, but the aforementioned features such as GSCD and Dataskipping Hive are not supported, resulting in users not being able to use DeltaLake as Hive.

• Support Flink access.

• Our stream computing ecosystem is mainly built around Flink. After the introduction of DeltaLake, we also use Spark, which will increase our stream computing ecosystem maintenance costs.

Related Articles

Explore More Special Offers

  1. Short Message Service(SMS) & Mail Service

    50,000 email package starts as low as USD 1.99, 120 short messages start at only USD 1.00

phone Contact Us