Linkflow builds data lake production practice

1. Background

As a customer data platform (CDP), Linkflow provides enterprises with a closed-loop operation from customer data collection, analysis to execution. A large amount of data is collected every day through a data collection endpoint (SDK) and three-party data sources, such as WeChat and Weibo. These data will be cleaned, calculated, integrated and then written to storage. Users can analyze and calculate persistent data through flexible reports or tags, and the results will be used as the data source of the MA (Marketing Automation) system to achieve precise marketing for specific groups of people.

In Linkflow, data is divided into immutable data (Immutable Data) and variable data (Mutable Data), these data will participate in the analysis, there are about a dozen tables involved, and the data volume of immutable data is large, you can to billions. If placed in a traditional big data system, immutable data is factual data, and variable data is dimensional data. But in real business practice, the natural attributes of users, the amount and status of orders, etc. are all updatable, and the data volume of these data is often very considerable, and such data in our system will reach billions. For variable data, it has always been managed through the relational database MySQL. Firstly, data maintenance is convenient, and secondly, business connection is easy.

But the problem is also obvious:

Data fragmentation. Due to the high risk of online DDL of MySQL large tables, with the increase of business complexity, it is often necessary to add new sub-tables to expand business attributes. That is to say, a complete user data will be scattered in multiple tables. Queries are very unfriendly.
Multi-dimensional query cannot be realized, because the advantage of relational database is not multi-dimensional query, and it is not realistic to add indexes to all fields, so a data component that can support OLAP query engine is needed to support multi-dimensional analysis business scenarios. And considering the possibility of independent expansion in the future, we also give priority to the architecture of computing and storage separation.

2. CDC and data lake

CDC (CHANGE DATA CAPTURE) is a software design pattern for identifying and tracking changed data so that actions can be taken on the changed data. In fact, as early as two years ago, we had the experience of using canal redundant MySQL data to heterogeneous storage, but we didn't realize that it could be integrated with big data storage in this way. In the process of using canal, we found some performance problems, and the open source community is basically unmaintained, so we investigated Maxwell and Debezium before the launch of the new architecture, and we happened to pay attention to the open source project flink-cdc-connectors of Flink parent company Ververica [1] , this project embeds Debezium as the binlog synchronization engine into the Flink task, which can easily filter, verify, data integrate and format convert the binlog messages in the flow task, and has excellent performance. Considering that in the future, we can directly perform dual-stream join with behavioral data, and even perform simple risk control through CEP, we finally chose the CDC solution of Debezium in Flink.

Since there are many data topics in MySQL, we also implement data routing in the flow task, that is, the changed data of different topics will be routed to different Kafka topics, and Kafka will be used as ODS. There are many advantages to doing this. First, we can clearly observe the process of each change for variable data. Second, we can replay the data. The superimposed result of successive changes is the final state.

The next thing to consider is where the data exists. Combined with the principle of "separation of computing and storage" mentioned above, this is also an advantage provided by data lakes. Data lakes are generally built using similar file system storage (object storage or traditional HDFS) , exactly as we expected. After comparing several data lake solutions, we chose Apache Hudi for the following reasons:

Hudi provides a solution for upsert in HDFS, which is similar to the experience of using relational databases. It is very friendly to updatable data and also conforms to the semantics of MySQL binlog.

Incremental query can easily obtain the data that has changed in the last 30 minutes or within one day. This is very friendly to some superimposed offline computing tasks. It no longer needs to be calculated for the full amount of data, but only for the changed data. Greatly save machine resources and time.

Metadata can be synchronized to Hive in real time, creating the conditions for "searching when entering the lake".

Two different usage scenarios of COW and MOR are optimized respectively.

The Hudi community is open and iterates quickly. It was integrated by AWS EMR during its incubation period, and then integrated by Alibaba Cloud DLA data lake analysis [2], Alibaba Cloud EMR [3] and Tencent Cloud EMR [4]. The prospect is good. At the same time, Apache Hudi Discussions in domestic technical exchange groups are very lively, and more and more domestic companies are building data lakes based on Hudi.

After integrating Hudi, our architecture evolved like this:

The COW (copy-on-write) mode is selected for the data tables, mainly considering the characteristics of more reads and less writes, and we need the query process to be as fast as possible. The performance of the MOR (merge-on-read) strategy is still slightly weaker on the query side. Some, coupled with the fact that there is no sub-second requirement for data latency, so COW was finally chosen.

At the top layer, we use Presto as the analysis engine to provide the ability to query data ad hoc. Since the Hudi version we are using is 0.6.0, the integration with Flink has not been released yet, so we have to adopt the Flink + Spark dual-engine strategy and use Spark Streaming to write the data in Kafka to Hudi.

3. Technical challenges

After conducting PoC, we determined the architectural design shown in the above figure, but in the actual implementation process, we also encountered many challenges.

3.1 CDC operation mode customization

■ Full mode

A major advantage of Debezium is the "integration of batch and stream". In the snapshot stage, the data is played back into messages consistent with the binlog incremental log content by scanning the entire table, so that users can use the same code to process both full and incremental data at the same time. However, in our business practice, if the number of historical tables and the data in the tables are large, the snapshot stage will last for a very long time. Once the process is interrupted unexpectedly, we need to start from the first table next time. Rescan. Assuming that the complete snapshot process takes several days, we cannot accept "retry" of this scale, so we need a mechanism similar to resuming uploads. After consulting the official Debezuim documentation, we found snapshot.include.collection .list parameter.

An optional, comma-separated list of regular expressions that match names of schemas specified
in table.include.list for which you want to take the snapshot.
Therefore, after the snapshot is interrupted, the remaining tables to be scanned can be passed in through this parameter, so as to realize the "relay" capability. But one thing to note here is that no matter how many times the snapshot stage is retried, the incremental binlog location must be the location of the first snapshot, otherwise data will be lost. This also brings another problem. If the interrupt is interrupted and the relay is resumed until the snapshot is completed, Debezuim will automatically start incrementally synchronizing data directly from the binlog location of this (not the first) snapshot, which is not the result we need , we need the task to terminate directly after the snapshot ends.

After looking through a lot of Debezuim documents, I didn't find such a function, but in the process of browsing the source code, I saw that there is actually a way.

■ Incremental mode
If the task stops automatically after the snapshot ends, you need to manually restart the task to continue the incremental synchronization. At the same time, the incremental mode needs to support specifying the MySQL binlog file and the specific location

3.2 Partial update (Patch Update)

Here it is necessary to explain what is an overlay update and what is a partial update. This actually corresponds to the semantics of RESTful. put is an overlay update, which requires the caller to provide a complete resource object. In theory, if put is used, However, if a complete resource object is not provided, the missing fields should be cleared. patch corresponds to partial update, or partial update. The caller only provides the fields that need to be updated, not the complete resource object. The advantage is that it can save bandwidth.

By default, only coverage updates are supported in Hudi, but for our business, the data reported by the collection endpoint cannot contain complete business objects, such as the user's age growth, and only one field of information will be included in the report.

This requires first finding out the data content of rowkey=123 and merging it with the content to be updated before writing. When merging, if the field of the data to be written is not empty, then merge. Hudi uses the combineAndGetUpdateValue method of OverwriteWithLatestAvroPayload by default.

Simply overwrites storage with latest delta record
For forward compatibility, data development colleague Karl has added the OverwriteNonDefaultsWithLatestAvroPayload class, overwritten combineAndGetUpdateValue to deal with the above problems, and has fed back to the community [HUDI-1255] Add new Payload (OverwriteNonDefaultsWithLatestAvroPayload) for updating specified fields in storage[5] , In fact, there are many similar needs in the community, such as [HUDI-1160] Support update partial fields for CoW table[6], and we also expect more developers to make this function more perfect.

Of course, there are also limitations here. If you really want to update a field to a null value, then using OverwriteNonDefaultsWithLatestAvroPayload cannot be achieved.

At the same time, we have also supplemented the community's compaction strategy by adding a time-based compaction scheduling strategy, that is, compaction can be performed not only based on the number of incremental submissions, but also based on time. This work has also been fed back to the community, see [HUDI -1381] Schedule compaction based on time elapsed[7], which provides more flexibility for compaction within the specified time.

3.3 Merge of the same rowkey data in a batch

Since a feature of CDC is to monitor data changes in real time, for example, the status of an order may change several times within a few minutes, coupled with the characteristics of Spark Streaming micro-batch processing, there is a greater probability that it will be changed in a time window Obtain a large amount of data with the same rowkey, and different rowkeys correspond to some data. Therefore, we merge a batch of data with the same rowkey in the Streaming task. The overall logic is similar to Hudi's logic of using Bloom to determine whether a rowkey exists. Special attention should be paid to the timing problem. The superposition of data must strictly follow the ts time, otherwise the old version of the data will overwrite the new version.

3.4 Schema evolution

Due to business development and flexibility requirements, table field expansion (Schema evolution) must be just needed. Hudi happens to take this into consideration. We learned from Hudi's wiki[8]:

Schema evolution can be roughly divided into four types:

Backwards compatible: backwards compatible, old data can be read with the new schema, if the field has no value, the default value is used, which is also the compatibility method provided by Hudi.

Forwards compatible: Forward compatibility, new data can be read with the old schema, and Avro will ignore the newly added fields. If it is to be forward compatible, the deleted fields must have default values.

Full compatible: Supports forward compatibility and backward compatibility. If you want to be fully compatible, you need to add only fields with default values, and only remove fields with default values.

No Compatibility Checking: In this case, it is generally necessary to forcibly change the type of a certain field. In this case, a full amount of data migration is required, which is not recommended.

In production practice, we can realize the field expansion requirements by modifying the schema. However, some problems will also be found later, such as too many fields will cause a single file to be very large (breaking 128mb), and the writing will be very slow. In extreme cases, the writing of files with more than 1,000 columns will reach the hour level. In the future, we are also looking for some optimization solutions, such as field recycling or vertical table splitting, to reduce the number of fields in a single file.

3.5 Querying and writing at the same time causes an exception

This is a problem on the query side. When we use Presto to query the Hive table, there will be an exception that the Hudi metadata file cannot be found, which will lead to NPE inside Hudi.

Based on the above information, it is suspected that the metadata information is modified while querying. After seeking help from the community, we changed the hoodiePathCache in HoodieROTablePathFilter to thread-safe ConcurrentHashMap, repackaged to get hudi-hadoop-mr.jar and hudi-common.jar, replaced them in the directory of presto/plugin/hive-hadoop2, and restarted Presto . No NPE was found in the follow-up.

4. Effect

Let's review our vision for the data lake at the beginning of the architecture:

Mutable data is supported.
Support schema evolution.
Computing and storage are separated, and multiple query engines are supported.
Supports incremental views and time travel.
These features are basically realized in Hudi. After the completion of the new architecture, compared with the previous system, the data delay and offline processing performance have been significantly improved, as shown in:

The real-time data writing process is simplified, and the previous update operation is cumbersome. Now, in the development process, there is basically no need to care about adding or updating operations, which greatly reduces the mental burden of developers.
The time from real-time data entering the lake to querying is shortened. Although we use the COW table mode, the actual test found that the timeliness from entering the lake to querying is not low, basically at the minute level.
Offline processing performance is improved. Based on Hudi's incremental view feature, daily offline tasks can easily obtain data that has changed in the past 24 hours, and the magnitude of processed data is reduced, resulting in shorter processing time.

5. Future plans

5.1 Flink integration

The "forced" dual-engine strategy mentioned earlier is actually very distressing, and the operation and maintenance and development methods cannot be unified, so we are very concerned about the progress of Hudi's official integration of Flink, and there is a new RFC recently - 24: Hoodie Flink Writer Proposal[10] has also deeply integrated Flink capabilities in Hudi version 0.8.0. It is expected that the future integrated version of Flink will greatly improve performance, and at the same time, the processing engine can be unified into Flink, no longer In twin-engine mode.

5.2 Concurrent writing

To ensure the consistency of metadata, Hudi files do not support concurrent writing before version 0.8.0. However, in practical applications, a lot of data in the data lake is not only real-time data, but also needs to be obtained through offline calculations. If some fields in a table are directly reflected by CDC, the other part is the calculation of offline tasks. As a result, this creates the need for concurrent writes.

We currently use two methods to circumvent:

Vertical table splitting means that the two files are separated, the CDC data is written through Spark Streaming, and the offline calculation results are written into another file to avoid concurrent writing.

It is simulated as a CDC message and written back to Kafka. If the table cannot be divided for query performance, the offline calculation result will be simulated as a CDC message and written to Kafka, and then written to Hudi through Spark Streaming. But the disadvantage is also obvious, that is, it takes a long time for the results of offline tasks to be reflected to the final storage.

The recently released version 0.8.0 of Hudi already supports the concurrent writing mode, which is based on optimistic locking. The file-level conflict detection can well meet the concurrent writing requirements, and we will test it later to see the effect.

5.3 Performance Optimization

The above also mentioned some problems such as large files and frequent GC. On the whole, we found that the bottleneck of writing mainly occurs in two places.

■ index
Since we currently use HoodieGlobalBloomIndex, it takes a long time to build and query the index. The official provides 3 index implementations:

■ update

In addition to the problem of large files, the slow upsert is also related to the characteristics of CDC. The update range of variable data is actually unpredictable. In extreme cases, when 1,000 pieces of data to be updated belong to 1,000 different files, it is difficult to improve the update performance through code optimization. The only way to increase CPU resources is to improve processing parallelism. . We will start from several aspects:

For parameter adjustment, whether there is a way to balance the number and size of files.
Try to use the MOR mode for some business tables. MOR will first write the data to the log file when updating, and then merge it into Parquet. In theory, it can reduce the frequency of overwriting Parquet files.

Discuss business trade-off in exchange for better write speed.

Related Articles

Explore More Special Offers

  1. Short Message Service(SMS) & Mail Service

    50,000 email package starts as low as USD 1.99, 120 short messages start at only USD 1.00

phone Contact Us