Real time Data Lake Flink Hudi Practice and Exploration
First of all, let's share the historical background of the development of Data Lake and the basic characteristics of Hudi.
1. Historical background of data lake development
From my personal point of view, traditional digital warehouse solutions (such as Hive) are actually data lakes, and I will regard Hudi, Iceberg, and Delta Lake as next-generation new solutions for digital warehouse, rather than just a lake format. Then why did the data lake, a new digital warehouse form, come into being in the past year?
With the background of the gradual maturity of cloud storage (especially object storage), data lake solutions will gradually move closer to the cloud native. As shown in Figure 1, the lake format will adapt to the object storage of cloud manufacturers, and will be used as a commercial example for cloud manufacturers and cloud factories. At the same time, it will adapt to popular big data computing frameworks (such as Spark and Flink), as well as Presto, trino and traditional Hive engines on the query side. Therefore, such a new digital warehouse solution has been born.
2. Four core features of Hudi
It can be seen from the above that Hudi, as a next-generation digital warehouse solution, uses upstream and downstream computing and query engines to achieve a new solution to replace the traditional Hive offline digital warehouse. Its core features can be summarized as follows:
Openness is reflected in two aspects:
First, the upstream supports multiple data source formats. For example, the transfer methods such as change log and message queue log of traditional databases will have very rich support on the source side.
Second, the downstream query side also supports multiple query engines. Such as Presto, the mainstream OLAP engine, Starlocks, Amazon redshift, and Impala, the data analysis product of cloud manufacturers, will be connected to such a digital warehouse architecture.
So openness is the first characteristic of Hudi.
• Rich transaction support
Hudi's support for transactions will be higher and richer than the original Hive database. The core feature is to support updating the file storage layout. In the traditional Hive based T+1 update scheme, the data repeatability is relatively high, and only day level data freshness can be achieved. And with the increasingly complex business requirements and higher real-time requirements, the logarithmic warehouse storage system has put forward higher requirements for end-to-end data freshness of minutes or seconds.
Secondly, the update efficiency needs to be improved. Instead of overwriting the entire table or the entire partition every time, local updates that are accurate to the file granularity can improve storage and computing efficiency. Hudi meets these requirements well. Therefore, the enhancement of ACID semantics is the second major feature of this digital warehouse architecture.
• Incremental processing based on ACID semantics
In my opinion, the third highlight is the incremental processing derived from the semantics of ACID, especially the concept of TimeTravel proposed by Hudi, or the way to directly interface with streaming processing engines such as Flink and Spark Streaming. Whether it is a near real-time or resident streaming service, it is essentially a streaming consumption, which can be understood as an incremental ETL processing. Compared with traditional batch scheduling, it will be more efficient in computing. In particular, a stateful computing framework like Flink will reuse the previous computing results and directly implement end-to-end full link incremental processing. Secondly, there is an order of magnitude increase in data freshness, from the "day level" to the "minute level".
For example, at present, some practical users in China will try to use the Flink computing framework for lake table streaming consumption, analyze the data injected from the source side directly through a set of incremental ETL links, and build a traditional data warehouse hierarchy. What's more, many children will be curious about the incremental query design of TimeTravel to query the incremental data between two snapshots. What's the use? If you are querying in batch mode, the mainstream scenario is ADS end to downstream synchronization. For example, synchronizing the production results of the data warehouse to other database tables (such as ES and MySQL) can be done in batch synchronization on a regular basis through TimeTravel. When you do not have such high requirements on the synchronization timing of ADS, you can use this idempotent TimeTravel query mode to synchronize to other downstream terminals more efficiently.
The above three points are the three core differences from the mainstream Hive architecture, and are also the direction that Hucang projects at home and abroad are striving for.
• Intelligent dispatching
In addition, in Hudi, we will try to optimize the file layout, and implement the data governance scheme of small file management within the framework to achieve intelligent scheduling. This is the core feature that distinguishes Hudi from other digital warehouse solutions, such as Delta Lake and IceBerg.
1. Hudi writes to pipeline (microservice architecture composed of multiple operators)
It can be seen from Figure 2 that the Hudi Write pipeline is a Serverless microservice architecture. The core is that after the whole pipeline is serviced, whether it is Flink or Spark Streaming, the whole set of services can achieve self governance on the table itself. Therefore, not only efficient data writing is considered, but also file management in the writing process should be considered to avoid generating too many small files as far as possible to optimize the efficiency of the query side. Through regular file consolidation and file cleaning, the explosive growth of small files can be avoided.
On the other hand, ACID is transactional, especially when completing an ACID transaction to be updated, many factors need to be considered. When a single job or node needs to fail over, Hudi can ensure that the previously written error data can be found quickly and rollback can be implemented. Therefore, the transaction layer support of Hudi is the most complete and efficient among the three lakes.
Taking the specific implementation of copy on write as an example, it will convert the upstream SQL native data structure to the Hudi data structure. In order to support concurrent writing, we will divide the data after each shuffle into buckets.
There are two main points:
The first is to add new data, which will be written to the existing smaller bucket as much as possible. At the same time, in order to avoid generating small files, we will also try to ensure that the size of each bucket is the same as expected.
The second point is to update the data. Hudi has designed the key primary key. The messages of each key are maintained in a bucket, and each update is written to the previous bucket. However, IceBerg and Delta Lake only write, not file layout, so they will do a lot of merging and cleaning on the query side, so the query efficiency will be relatively low. In contrast, Hudi's complex writing process and bucket strategy are just weighing and considering the efficiency of reading and writing. The bucket concept mentioned here is somewhat similar to the micro partition concept in Snowflake. It will be further refined under the traditional partition partition to maintain the life cycle of messages in a range with file granularity. Maintaining the life cycle in a finer granularity can effectively improve the efficiency of data update and query.
After the second operator, the data is partitioned according to each bucket. We will shuffle the data according to the bucket ID and hand it to the write task for writing. Why should I shuffle again according to the bucket ID? This is mainly to maintain that two write tasks cannot simultaneously modify the update semantics of a bucket, otherwise update conflicts may easily occur.
Therefore, on the whole, these three operators can effectively ensure concurrent write and update. It is obvious that the concurrency of the second operator actually determines the concurrency of the entire update and the number of buckets that can be updated and written at the same time, while the latter operators can be expanded independently. It is recommended from practical experience that the concurrency settings of the second and third operators are the same. When the throughput is not very high, a bucket is handed over to a write task to write. When the throughput is relatively high, a bucket may have multiple write tasks, which can be adjusted to a ratio of 1:2.
The background will also start the cleaning task of clean commitments. The data commit operation occurs in the coordinator component. Ensure that the commit of each write task is roughly aligned with the checkpoint before the data is flushed out. In addition, some metadata information will be uniformly submitted to the coordinator. After the coordinator collects the statistical information, it will make a submission in combination with the events completed by the checkpoint. The actual submission is in the coordinator. After the coordinator completes the commit, the Hudi table will initiate a new transaction. Only when the write task sees the new transaction can it initiate the write action of the new transaction. So there is an asynchronous waiting process, similar to a small state machine.
However, the semantics guaranteed by Flink's snapshot is actually a best before semantics. Once a checkpoint success event is received, it indicates that the previous states are successful, but there may be a checkpoint being aborted.
Because Hudi needs to ensure the integrity and exact once semantics of each write, it is necessary to consider that the intermediate write cannot exceed the boundary. For example, the event data of a checkpoint cannot be written to the next checkpoint, so the exact once semantics cannot be guaranteed.
In version 0.11, we will try to make some optimizations, for example, whether the checkpoint can be reused after being abort. It involves a state switching, which is relatively complicated. Unlike Spark Streaming, which is an abstraction of micro batch every time, each time a task is launched first, which naturally guarantees the exact once. The fault tolerance semantics are handed over to the framework. How Flink combines this asynchronous algorithm with strong exactly once semantics is a difficulty in this architecture.
2. Small file strategy
Next, let's take a closer look at the specific decision of the second operator bucket assign for file writing. That is, how to choose which bucket to put the new message in, as shown in Figure 3. It is introduced in two ways.
First of all, there are three bucket in the left block diagram. The blue represents the size of the currently stored file. For insert data, the policy is to select the bucket with the most free space to write each time. Why not choose the bucket with the least free space? Because it is necessary to consider the problem of COW write amplification, the efficiency is relatively low. When updating data, first find the bucket that maintains the current key, and then write it. This will not result in unlimited growth of file size, because the size of each record before and after updating is basically similar, and the file size will not change significantly. The file size is mainly affected by the insert data. The file size will set a threshold value of about 120M.
The block diagram on the right side of the figure is an extreme case. There is only a small write space left for two buckets. Considering the impact of write amplification, a new bucket will be created to write again.
In order to improve the throughput of concurrent writes, an independent bucket management strategy is allocated to each bucket assignment task, and the hash algorithm is used to hash the bucket ID under each bucket assignment task with a fixed rule to achieve concurrency decisions. Therefore, controlling the concurrency of bucket assign task relatively controls the number of small files written, and the trade-off between write throughput and small files.
3. Full+incremental reading
After introducing the data writing process, let's look at the stream reading part of data reading. How is full read and incremental read of streaming read realized? As shown in Figure 4, TimeLine in Hudi saves the millisecond timestamp of each transaction submission. Each timestamp corresponds to a snapshot version and is recorded in metadata. During full reading, files in the whole table will be scanned, and files in the whole table will be scanned out. When you do not configure the built-in metadata index table, you will directly scan the whole table to find all files in the file system. If the metadata table is enabled, the file information will be scanned in the metadata table (KV storage) to scan the full table file with relatively high efficiency, and then sent to the downstream. The incremental part will monitor and scan the TimeLine regularly (60s by default) to see if there are new commitments, and synchronously send them to the downstream for reading and writing. Each incremental part will be based on the last issued timeline point, Then find the latest commit time.
The Split mornitor operator is responsible for maintaining such a set of rules for monitoring incremental file information and issuing them to tasks that actually perform reading.
Recently, the master version also supports the batch mode TimeTravel query (spot check in a certain time period). Although the previous version supports it, there are some problems. For example, if the incremental meta files are archived or cleaned, the data integrity is not guaranteed. Under the premise of ensuring reading efficiency, the new version addresses these two problems by implementing batch mode incremental reading between two snapshots and commit to ensure data integrity.
Hudi application scenario
Now Flink+Hudi is a very popular technology architecture in China. Here are three application scenarios to introduce.
1. Near real-time DB data warehousing/lake
The core feature of this architecture's DB data warehousing is to improve the original T+1 data freshness to the minute level. Data freshness is achieved through the currently popular CDC technology represented by Debezium and Maxwell. It is synchronized to the data warehouse in a streaming near real-time manner. It is very difficult to ensure real-time in traditional Hive data warehouse, especially file update and lake table real-time write update. The CDC technology has requirements for the storage of the log warehouse itself. First, the update efficiency is high enough to support streaming writes and very efficient updates. In particular, CDC logs may also be out of order during the update process. How to ensure the ACID semantics of such out of order updates is highly demanding. At present, only Hudi can meet the requirements of out of order updates. In addition, Hudi also takes into account the efficiency of updates, which is currently an advanced architecture.
Compared with the above scheme, the scheme at the bottom of Figure 5 is more suitable for companies with large size (up to 100 million increments per day) and sound data platforms. There is a unified data synchronization scheme in the middle (summarizing data from different source tables and synchronizing them to the message queue). The message queue is responsible for data fault tolerance, disaster tolerance, and caching. At the same time, the scalability of this scheme is also better. Through Kafka's topic subscribe mode, data can be distributed flexibly.
2. Near real-time OLAP
The second scenario is a near real time OLAP scenario. The minute level end-to-end data freshness and the very open OLAP query engine can be adapted. In fact, it is a new solution to the Kappa architecture or the original Streaming digital warehouse architecture. Before this architecture, real-time analysis will skip Hudi and directly double write data to the OLAP system, such as ClickHouse, ES, MongoDB, etc. When the warehouse storage has been able to support efficient hierarchical updates and connect to the OLAP engine, the architecture is greatly simplified. First, there is no need to double write. One piece of data can ensure the only one truth semantics and avoid data integrity problems caused by double write. Secondly, because the lake format itself is very open, there are more choices in the query engine. For example, Hudi supports Presto, trino, Spark, Starlocks, and the cloud manufacturer's redshift engine, which will be very flexible
Therefore, this near real-time OLAP architecture can be summarized as follows: ① unifying the upstream storage end; ② Open the downstream query side.
However, the data freshness of this architecture is about 5 minutes. If you want to achieve an architecture like kappa second, Hudi is not suitable at present. Because it relies on Flink's CheckPoint mechanism (which supports end-to-end exact once semantics), it cannot be submitted frequently.
3. Near real time ETL
The third scenario is the current cutting-edge architecture, which is slowly being tried in China. When the data volume of the data source itself is small, for example, the source is not kafka, but a MySQL binlog, and the QPS may be hundreds per second. Then this architecture is a very stable and convenient architecture. It not only realizes the end-to-end incremental processing, but also addresses the needs of intermediate data warehousing. In fact, it provides two sets of abstractions. First, it undertakes a storage abstraction of a data warehouse intermediate storage, and directly stores data in lake format; The second is to provide the queue capability, which is similar to the message queue capability of Kafka. It can be consumed incrementally in the Streaming mode, and some incremental calculations can be done on it. This architecture directly unifies the original lambda and kappa architectures. That is, the storage abstraction of kafka's storage abstraction plus the storage abstraction of the warehouse file is merged into one storage abstraction without increasing too much storage cost. You may use object storage or HDFS in the form of cheap storage later.
The whole architecture solves two problems. The first is the dual write problem. Under the lambda architecture, the data is first written to kafka and then stored. It is difficult to ensure the consistency semantics of the two data. In addition, the throughput of Kafka will drop a lot after it is enabled to write exactly once. How can the data between Kafka and HDFS be consistent? Some people will understand how to read Kafka without streaming, and synchronize Kafka data to HDFS by starting a job again, so that the cost of computing resources, maintenance jobs, and synchronization is twice that of the original.
The second problem to be solved is the query requirements of the M-server. The M-server data is directly stored, and is not updated in an efficient way. When you need to do some join operations on the M-server DWD table, you can directly connect with the engine, without considering the issue of Lambda architecture T+1 update efficiency. The lake format minute level timeliness greatly alleviates this problem.
In addition, this architecture has another advantage. You can select rich OLAP query engines according to different application scenarios, and directly access the database storage in the form of appearance, which is very convenient for OLAP analysis.
4. Alibaba Cloud VVP enters the lake in real time
Next, let's briefly talk about the current integration of Alibaba Cloud VVP products into the lake in real time, which is mainly in the lake entry state. Alibaba Cloud's built-in Flink version will have a built-in Hudi connector. You can quickly build the lake entry task through FlinkSQL, directly write the lake table, dock the Hudi CDC connector or dock the kafka CDC format, so as to achieve fast data entry into the lake.
In addition, during the process of entering the lake, commercial features are provided, such as schema evolution. CE and CTAS syntax support schema evolution. At the same time, we will advocate DLF catalog metadata management components. DLF catalog will be seamlessly integrated with EMR DLF. If EMR writes through Spark, you can also see that Flink inbound tasks can be managed after they are written. Through DLF components, you can directly analyze Hudi format data through EMR query engine.
This is the current set of technical solutions pushed to commercial users. After entering the lake through VVP services and analyzing through EMR, VVP may integrate more capabilities in the later stage, such as stream batch unification, to meet users' stream reading needs.
Recently Hudi RoadMap
As shown in Figure 9, I will briefly introduce some features that Hudi 0.12 and 1.0 will do in the near future.
First of all, we will introduce a CDC feed function similar to Delta2.0, because the currently supported CDC requires that the input must be a CDC, and Hudi will save it in the CDC format. The difference between CDC feeds is that it is unnecessary to ensure that the overall input is in the CDC format. Even if there is an absert semantics or the intermediate data of the CDC is lost, the CDC can be completely restored to the primary end. This feature makes some tradeoffs between read and write throughput and resources, and is not as efficient as the current architecture for handling CDC.
The second point is the meta service. The metadata management is plug-in, and the tables and tasks on the Hudi are managed in a unified meta service plug-in form.
Third, we are still planning to build a secondary index. Because in the current master version, Flink Spark has realized the data skipping capability (when writing, if the user opens the Meta Data table and simultaneously opens the data skipping, the statistics of each column will be recorded additionally). The most typical is that each column will create a Max, Min, enable a metadata acceleration, and improve the file level query efficiency. In the future, secondary indexes similar to databases will be supported. An abstract index similar to LSM will be implemented for a special column to build an efficient index scheme suitable for the point query scenario.
Finally, we will develop the column by column update function similar to feature engineering, which is similar to Clickhouse's Merge Tree abstraction and stores a column independently. Because in the feature engineering of machine learning, a large number of features need thousands of fields, and each time a feature is generated, a column needs to be updated, so a single column needs to have efficient updating capability. In order to adapt to such a scene, Hudi will continue to explore.
Q1: Using Hudi directly to store updates, which one is better than using CDC to Starrocks directly? I feel that Starlocks' QPS should be faster than Hudi's update speed.
A1: That's true, because Starlocks service uses efficient primary key indexes similar to LSM, and uses partition policies in memory to maintain more secondary indexes and metadata information. In addition, the most important point is that when writing and updating the main table, you will use the batch saving operation. First, you will import multiple writes to the buffer and then flash. In addition, you can also save batches on the flash of the data. This is why Starlocks is more efficient in updating.
But at the same time, because of the server cluster, there will be two problems. First, it will bring high operation and maintenance costs. Second, compared with the serverless format of Hudi, the memory mode will have more overhead.
The Hudi format has certain advantages for Starlocks in terms of openness. It can not only interface with Starlocks, but also with Presto, Spark and other mainstream OLAP engines.
This is their difference. The emphasis of the two schemes is different, and they need to be selected according to the actual application scenarios. Starlocks is more suitable for OLAP applications. But if you want to build a digital warehouse, the cost of using Starlocks instead of Hudi should be too high. Because Hudi's scenarios are mainly time series, Hive's traditional data series is more advantageous.
Q2: Is MOR (Merge on Read) table or COW (Copy on Write) table more appropriate in the case of flow data entering the lake?
A2: If the volume of flow data is large, it is recommended to use MOR table. With the current measurement scheme, the QPS does not exceed 20000, and the COW table can still support it. After more than 20000 yuan, MOR online comparison mode is recommended. If the QPS is higher, the compression task may need to be separated again. This is a solution that can be provided at present.
Knowledge Base Team
Knowledge Base Team
Knowledge Base Team
Knowledge Base Team
Explore More Special Offers
50,000 email package starts as low as USD 1.99, 120 short messages start at only USD 1.00