Architecture and practice of EMR Delta Lake in Liulishuo data access

Background

Fluently speaking, most of the current offline computing tasks come from the service DB. The accuracy, stability and timeliness of the service DB data access determines the accuracy and timeliness of the entire downstream offline computing pipeline. At the same time, we also have some business requirements, which require near-real-time joint query of data in DB and data in hive.

Before the introduction of Alibaba Cloud EMR Delta Lake, we complete the access of business DB data by encapsulating DataX. The Master-Slave architecture is adopted. The Master maintains the metadata information of the DataX tasks to be executed every day. The Worker node executes the DataX tasks with the status of init and retrievable through continuous preemption until all the DataX tasks of the day are completed.

The architecture diagram is roughly as follows:

The process of worker processing is as follows:

For near-real-time requirements, we directly open a slave database and configure the presto connector to connect the slave database to realize the near-real-time joint query requirements for data in business BD and data in hive.

The advantages of this architecture are simple and easy to implement. However, with the increasing amount of data, the shortcomings gradually exposed:

Performance bottleneck: With the growth of business, the performance of accessing data through SELECT will become worse and worse. Affected by the performance bottleneck of DB, it cannot be alleviated by adding worker nodes.

Large-scale tables can only be pulled from the database, resulting in higher and higher data access costs.

The service can not meet the near-real-time query demand, and the near-real-time query can only be queried by the way of database, which further increases the cost of access.

In order to solve these problems, we focus on the solution of CDC real-time access.

Technical scheme selection

At present, there are mainly the following solutions for CDC real-time access in the industry: CDC+Merge, CDC+Hudi, CDC+Delta Lake and CDC+Iceberg. Among them, the CDC+Merge scheme is the practice before the emergence of the data lake scheme. This scheme can save the cost of the DB slave database, but it can't meet the requirements of the business near real-time query and other functions, so the pass was dropped at the beginning. But Iceberg was not mature enough at the beginning of our model selection, and there was no reference case in the industry, so it was also dropped by the pass. Finally, we chose between CDC+Hudi and CDC+Delta Lake.

When selecting models, both Hudi and Delta Lake have similar functions, so we mainly consider these solutions from the following aspects: stability, small file consolidation, whether to support SQL, cloud vendor support, language support, etc.

Based on the above indicators and the fact that our entire data platform is built based on Alibaba Cloud EMR, choosing Delta Lake will save a lot of adaptation development work, so we finally chose CDC+Delta Lake.

Overall architecture

Overall structure diagram

The overall architecture is shown in the figure above. The data we access will be divided into two parts: stock historical data and new data. The stock historical data will be exported from MySQL using DataX and stored in OSS. The new data will be collected and stored in the Delta Lake table using Binlog. Before running the ETL task in the morning every day, merge the historical data and new data first. The ETL task uses the data after merge.

Delta Lake data access

In terms of real-time collection of Binlog, we use the open source Debezium, which is responsible for pulling Binlog from MySQL in real time and completing appropriate parsing. Each table corresponds to a topic, and the sub-database and sub-table are combined into a topic and distributed to Kafka for upstream and downstream consumption. After Binlog data is connected to Kafka, we need to create a Kafka Source table to point to the corresponding Kafka Topic. The format of the table is:

The fields we mainly use are value and offset. The format of value is as follows:

StreamingSQL processes the data in Kafka. We mainly extract the offset, value field and CDC information in the value field in the Kafka Source table, such as op, ts_ After and before fields of ms and payload. In StreamingSQL, we use a mini-batch of 5min. The main reason is that the mini-batch is too small to produce many small files, the processing speed will be slower and slower, and the reading performance will be affected. It is too large to meet the requirements of near-real-time queries. For the Delta Lake table, we do not parse the after or before fields. The main reason is that the schema of our business table often changes. Once the schema of the business table changes, we need to repair the data, which is costly. During StreamingSQL processing, we will directly insert the data with op='c ', json_ Record takes the after field. For data with op='u 'or op='d', if it does not exist in the Delta Lake table, execute the insert operation, and if it does, execute the update operation; json_ The assigned value of record, op='d ', json_ Record takes the before field, op='u ', and jsonrecord takes the after field. The field of op='d 'is reserved, mainly considering that the deleted data may be in the stock history table. If it is deleted directly, the data in the stock history table in the early morning merge will not be deleted.

Delta Lake supports Time travel, but if we access CDC data, we can't use the data rollback strategy. If we keep multiple versions of data, it will have some impact on our storage. So we need to delete the expired version of data regularly. At present, we only keep the version data within 2 hours. At the same time, Delta Lake does not support the function of automatically merging small files, so we also need to merge small files regularly. At present, our practice is to merge small files and clean up expired data files once an hour through OPTIMIZE and VACCUM:

At present, Hive and Presto cannot directly read the Delta Lake table created by Spark SQL, but the monitoring and near-real-time query needs to query the Delta Lake table, so we have also created a query for Hive and Presto tables.

Delta Lake data and stock data Merge

Since the data of Delta Lake is only connected to new data, we import the stock historical data through DataX at one time, and the Delta Lake table Hive cannot be queried directly, so we need to perform a merge operation on these two parts of data every morning, and write them into a new table for the unified use of Spark SQL and Hive. The architecture of this module is roughly as follows:

Before 0am every day, call the DeltaService API, and automatically generate the task information, spark sql script and the corresponding Airflow DAG file of the merge task according to the configuration of the Delta Lake task.

The task information of the merge task mainly includes the following information:

Automatically generate the Merge script, which is mainly to obtain the schema information of the mysql table from the configuration of the Delta Lake task, delete the historical Hive table, re-create the Hive external table according to the schema information, and then obtain the json of the Delta Lake table according to the new schema_ Get the corresponding field values from the record field and the historical stock data table and perform the union all operation. The missing values are the default values of mysql. After the union, according to row_ Key to group, press ts_ Take the first item for ms sorting and take out operation at the same time_ Data with type='d. The whole is as follows:

After 0am, Airflow will automatically schedule and execute the merge Spark SQL script according to the Airflow DAG file. After the script is successfully executed, the status of the merge task will be updated to successful. Airflow's ETL DAG will automatically schedule the downstream ETL tasks according to the status of the merge task.

Delta Lake data monitoring

For the monitoring of Delta Lake data, we mainly aim at two purposes: monitoring whether the data is delayed and monitoring whether the data is lost, mainly between MySQL and Delta Lake tables and between Kafka Topic and Delta Lake tables accessed by CDC.

Delay monitoring between Kafka Topic and Delta Lake tables accessed by CDC: We obtain the row of MySQL corresponding to the maximum offset of each partition from Kafka Topic every 15 minutes_ The content of the key field is put into the monitored MySQL table delta_ kafka_ monitor_ Info, and then from delta_ kafka_ monitor_ Get the row of the previous cycle from info_ The content of the key field can be queried in the Delta Lake table. If it is not found, the data is delayed or lost, and an alarm is sent.

Monitoring between MySQL and Delta Lake: We have two options. One is the probe scheme. Every 15 minutes, the maximum ID is obtained from MySQL. For database and table, only one table is monitored and stored in delta_ mysql_ monitor_ Info, and then from delta_ mysql_ monitor_ Get the maximum id of the previous cycle from info, and query it in the Delta Lake table. If it cannot be queried, it indicates that the data is delayed or lost, and an alarm is issued. The other is direct count (id), which is divided into single database and single table and sub-database and sub-table. Metadata is stored in mysql table id_ based_ mysql_ delta_ monitor_ In info, it mainly contains min_ id、max_ id、mysql_ Count three fields. For a single database and single table, min is also obtained from the Delta Lake table every 5 minutes_ Id and max_ The count value between id and mysql_ Count comparison, if less than mysql_ The count value indicates that there is data loss or delay, and an alarm is sent. Then get max (id) and max from mysql_ Count value between id and max (id), updated to id_ based_ mysql_ delta_ monitor_ Info table. For the case of sub-database and sub-table, the corresponding id of each table is generated according to the sub-database and sub-table rules_ based_ mysql_ delta_ monitor_ Info information, which is monitored every half an hour. The rules are the same as that of a single database and table.

Challenges encountered

The schema of the business table changes frequently. If the Delta Lake table directly parses the field information of CDC, if the data cannot be found and repaired in time, the cost of repairing the data in the later stage will be large. At present, we do not parse the field, and we will not parse it until the morning merge.

With the increasing amount of data, the performance of StreamingSQL tasks will become worse and worse. At present, we are dealing with Streaming SQL processing delay. After a large number of delay alarms, we replace the Delta Lake stock data with the data after yesterday's merge, delete the Delta Lake table, delete the checkpoint data, and consume the KafkaSource table data from the beginning. Reduce the Delta Lake table data to relieve the pressure of StreamingSQL.

Hive and Presto cannot directly query the Delta Lake table created by Spark SQL. At present, we are creating external tables that support Hive and Presto queries for Hive and Presto to use, but these tables cannot be queried through Spark SQL. Therefore, the upper ETL application cannot freely switch between Hive, Spark SQL and Presto engine without changing the code.

Benefits

The cost of DB slave database has been saved. After adopting CDC+Delta Lake, our cost has been saved by nearly 80%.

The time cost of DB data access in the early morning is greatly reduced, which can ensure that all DB data access without special requirements can be completed within one hour.

Follow-up planning

StreamingSQL task follows up with the increasing data volume of Delta Lake table and the worse performance.

Promote whether the Delta Lake table created by Spark SQL can not be directly queried by Hive and Presto.

Related Articles

Explore More Special Offers

  1. Short Message Service(SMS) & Mail Service

    50,000 email package starts as low as USD 1.99, 120 short messages start at only USD 1.00

phone Contact Us