Programming Delta Lake real-time data warehouse application practice

Introduction to Walnut Programming

Walnut Coding was established on August 9, 2017. As a leader in the children's programming education industry, it has always adhered to the mission of "making every child love and know how to learn, and making high-quality education within reach", and is committed to promoting children's education through technological means. Programming education, relying on the first AI human-computer dual-teacher teaching model and ten-level advanced curriculum system, realizes large-scale teaching in accordance with aptitude and "inspires the learning ability of Chinese children". As of August 2019, Walnut Programming has become the largest children's programming education institution with paid students, helping more than 650,000 children gain learning interest, exercise programming skills, and develop good thinking habits. The repurchase rate of students exceeds 91%, and students complete The class rate is as high as 98%, and there are 18.73 million original works online.

1. Business Status
Business needs
Classes start at a fixed time in business. During the class time, the head teacher needs to know the students' learning situation in real time/quasi-real time
Data statistics dimensions are generally summarized by class and semester, and the time range may be several months or even one year
The business changes rapidly, and it is necessary to respond to the logic changes of the indicators brought about by the business changes in a timely manner
data source

Scheme before structural transformation
Existing indicators are to write Kafka/Mysql data into HDFS, use Hive offline batch processing, execute once every 10 minutes, cycle statistics of historical cumulative indicators, and then synchronize the data to Mysql at regular intervals to provide data background query. As shown below:

problems encountered
As the amount of calculated data increases, it gradually cannot meet the update frequency requirements of the business.

Using Apache Sqoop for full data synchronization will put pressure on the business Mysql library/HDFS.
When using Apache Sqoop for incremental synchronization, generally only a certain time field (such as update time) can be used to synchronize newly modified data. In this way, when making a partition table, a more complicated offline merge is required.
As the data becomes larger and larger, the synchronization and processing time will become longer and longer, which cannot meet the real-time requirements of the business.
2. Research on real-time data warehouse solutions
The offline synchronization solution can no longer meet the business needs, and it is planned to migrate to the real-time solution, and some research has been done.

Problems with Migrating Streaming Computing
Long development cycle
Existing offline tasks are basically hundreds of lines of SQL at every turn, and the logic is complex. Migrating all logic to stream computing is relatively difficult to develop and costly to transform.
For example, offline incremental synchronization needs to synchronize the full amount of base data first.

Then consume the incremental binlog data, stream write to the hive external table, and finally merge the two tables

The application of Delta Lake only needs a streaming sql to achieve real-time incremental synchronization.

Data recovery is difficult
For offline tasks, data recovery only needs to re-execute the task.

However, in convective computing, when the data is abnormal or the logic changes, and the full amount of data needs to be run again, the historical data can only be supplemented offline, and then the real-time data can be unioned. Because Kafka cannot store all historical data, and it will take a long time to consume and chase data from scratch.

In order to meet the needs of rapid recovery, all indicators need to prepare two sets of codes, offline and real-time, from the beginning, similar to the Lambda architecture.

Data verification is difficult
Kafka generally acts as a message queue in the big data architecture, and the data storage period is short. The full amount of historical data will be written to HDFS by consuming Kafka. If an indicator is calculated for a month and the calculation result is found to be abnormal, it is difficult to trace whether there was a problem with the Kafka data at that time or a problem with the calculation logic. Although HDFS data can be used for troubleshooting, there is no guarantee that the data in HDFS is consistent with the data in Kafka at that time.

The desired function
Just because there are some migration costs and problems in migrating streaming jobs, some functional requirements are put forward for the real-time computing solution.

Flexible development
The business development of Internet companies is fast, and human resources are relatively tight. They need to develop new indicators at a lower cost and faster to meet the requirements of business agility.

Easy to rerun historical data
The definition of business indicators is often changed. Once changed, or new data indicators need to be consumed from the earliest. However, there are usually a lot of historical data, and Kafka, a general real-time data source, cannot store all historical data.

Easy to troubleshoot when data is abnormal
Taking the offline data warehouse as an example, hundreds of lines of SQL can be executed in sections to check step by step. Flink can bury metrics to obtain intermediate processes.

3. Real-time data warehouse solution based on Delta Lake
Delta Lake
Delta Lake is an open-source data lake technology from Databricks in the United States. Based on Apache Parquet, it enriches data management functions, such as metadata management/transaction/data update/data version backtracking, etc. Using Delta Lake can easily connect stream processing and batch processing to quickly build a Near-RealTime Data Pipeline.

At present, the Alibaba E-MapReduce ("EMR") team has optimized many functions and performances of Delta Lake, and deeply integrated with Spark, mainly in the following aspects. For more information, please refer to the official EMR documentation

SparkSQL supports syntax such as Update/Delete/Merge Into/Optimize/Vacuum to operate Delta Lake
Self-developed SparkStreaming SQL, supporting Delta Lake related DML operations
Hive & Presto On Delta Lake
Delta Lake On OSS (Alibaba Cloud Object Storage)
Delta Lake transaction conflict detection optimization
DataSkipping & Zorder performance optimization
Spark Streaming SQL
The Alibaba EMR team has developed SparkStreaming SQL based on StructStreaming. Users can easily use SQL to write the logic of streaming jobs, which greatly reduces the development threshold. For details, see the official SparkStreaming SQL documentation.

Batch Unified Engine
Can reuse the optimization of the underlying SparkSQL/SparkCore
Rich SQL support

Rich UDF support
Hive UDF / window function
Rich data source support

Delta Lake deep integration
Combined with the usage scenarios of Delta Lake, some new functions are supported (such as streaming dynamic partition table writing)
Real-time data warehouse solution
Architecture plan
Based on Delta Lake+SparkStreaming SQL, you can quickly build a real-time data warehouse pipeline, as follows:

ODS layer
ODS data is mainly real-time buried point data, binlog logs in CDC, etc.
DIM dimension table
DW layer
The DW layer is mainly a part of lightly aggregated data, such as courses and assignments in the user dimension.

The main reuse is the dw layer data, so for each indicator, it is necessary to comprehensively consider whether to aggregate, which dimension to aggregate to, and whether to associate dimension tables.
The DW layer is divided into two types

a. The business is simple and will basically not change. Write directly to Kafka.
b. The business logic is complex, the data may change , and it is written to Delta Lake. In practice, directly writing to Kafka is the easiest solution, but the flexibility is very low, and historical data cannot be traced or modified. By introducing Delta Lake in the DW layer, functions such as stream batch unified data source and historical partition data recovery can be realized.

DM layer

The DM layer is the final report display indicator. The DW layer delta table can be used as the data source, and after summarizing again, the sink is sent to the DataBase for display.
The EMR team provides the streaming Merge Into function, which allows CDC to replay the binlog to the Delta table by writing SparkStreaming SQL.
See the CDC synchronization documentation for details.

problem optimization
In the process of using Delta Lake, we also found some problems. The detailed solutions and suggestions are as follows:

many small files
During the CDC Streaming Merge playback binlog process, small files will be continuously generated, and some small files need to be processed. EMR provides some optimization solutions.

Added the function of serial auto compaction
During the running of the CDC streaming job, the small files are merged and compacted according to a certain strategy
Using Adaptive Execution
Turning on the adaptive execution switch can effectively reduce the small files generated in the Merge process, such as reducing a single batch from 100 small files to 1 or 2 files.
Compact conflict problem
If you do not use the serial compact function, you need to manually compact and merge small files on the Delta table on a regular basis, but you often encounter conflicts between compact and CDC stream job transaction submission when the transaction is submitted, and if the CDC stream or compact fails, this is also Some optimizations and suggestions are provided:

Optimize the Delta kernel conflict mechanism, so that the CDC flow can run stably and will not hang up due to Compact
Use the partition table to compact partitions in batches to reduce the probability of conflicts
Compact when there are few update/delete operations on database tables (EMR workflow scheduling can be used)
Use the job retry function in the EMR workflow to retry when a Compact transaction fails to be submitted
Architecture scheme further explained
• Why not calculate directly from ODS
Take walnut's class attendance index as an example. The data source is the buried topic of Kafka. The index to be calculated includes personal dimension to class data, semester dimension, class dimension, semester dimension, and market channel dimension.
Each dimension needs to consume all buried point data, and pick out class-related events from it. And the calculation program of each dimension needs to query the HBase/Mysql related dimension tables such as semester, class, and unit.
Once the overall logic is adjusted, such as filtering test class data, it is impossible to filter the data from the ods layer (so that the data will be lost from the bottom layer and cannot be traced later), then all programs need to be readjusted to add this filtering logic.

• How to recover data
Ideally, the same set of SQL, the same set of calculation logic, and the same data source are used in real time and offline, so that offline scripts can be used to rerun historical data at any time. But the reality is that no framework supports it. The so-called stream-batch integration is at the engine level. For example, Spark's streaming and SQL are both batch methods, and streams are just smaller batches. Flink, on the other hand, hopes to process batch data in a streaming manner, and batches are just streams with boundaries. For high-level SQL APIs, stream batches are very different. Based on the partition table of Delta Lake, the real-time data of the dw layer is partitioned by time, so that the data of the historical partition can be restored by offline jobs at any time. However, because the amount of data collected on the DW is relatively small, streaming jobs can be used to consume it from scratch after recovery.

4. Business effect
After the Delta Lake real-time data warehouse was launched in the production environment of some data warehouses programmed by Walnut, some business statistical indicators have been produced based on the new architecture, and the indicator update delay has been increased from tens of minutes to less than one minute. The class teacher can obtain the learning status of the students faster and follow up the learning progress in time, thus significantly improving the teaching quality.
After the application of CDC, the data synchronization delay has been increased from half an hour to 30 seconds, and at the same time, the impact on the business database during Sqoop high-concurrency synchronization has been solved. When data analysts Ad-Hoc query, they can obtain real-time business data, which significantly improves the effect of data analysis and can guide business development in a more timely manner.

5. Follow-up plan
According to the current business application effect, the follow-up big data team will continue to sort out all real-time indicators in the business scope, further optimize the structure of each layer of the real-time data warehouse, and promote the comprehensive application of the real-time data warehouse construction based on Delta Lake.
Based on the characteristics of Delta Lake mode execution and time travel, further promote the application of Delta in machine learning scenarios, and construct a more reliable and easily expandable Data Pipeline.

Related Articles

Explore More Special Offers

  1. Short Message Service(SMS) & Mail Service

    50,000 email package starts as low as USD 1.99, 120 short messages start at only USD 1.00

phone Contact Us