Using Flink Hudi Builds Streaming Data Lake Platform

streaming data lake platform Introduction:

Streaming data lake platform.Sharing by Alibaba technical expert Chen Yuzhao and Alibaba development engineer Liu Dalong at FFA 2021

1. streaming data lake platform Apache Hudi 101
When it comes to data lakes, everyone will have such questions, what is a data lake? Why is the data lake so popular in the past two years? The data lake is actually not a new concept. The earliest concept of the data lake was proposed in the 1980s. At that time, the definition of the data lake was the original data layer, which can store various structured, semi-structured and even unstructured data. In many scenarios such as machine learning and real-time analysis, the schema of the data is determined at the time of query.

The lake has the characteristics of low storage cost and high flexibility, which is very suitable for centralized storage in query scenarios. With the rise of cloud services in recent years , especially the maturity of object storage, more and more enterprises choose to build storage services on the cloud. The storage-computing separation architecture of the data lake is very suitable for the current cloud service architecture. It provides basic acid transactions through snapshot isolation, and supports docking with multiple analysis engines to adapt to different query scenarios. It can be said that lake storage is cost-effective and open. had a great advantage.

The current lake storage has begun to assume the function of the data warehouse , and realizes the integrated structure of the lake and warehouse by connecting with the computing engine . Lake storage is a table format that encapsulates the high-level semantics of tables based on the original data format. Hudi has put data lakes into practice since 2016. At that time, it was to solve the data update problem on the file system in the big data scenario. The Hudi -like LSM table format is currently unique in the lake format. It is friendly to near real-time updates and has semantics. Also relatively perfect.

Table format is the basic attribute of the three popular data lake formats . Hudi has been evolving towards the platform since the beginning of the project. It has relatively complete data governance and table service. For example, users can write concurrently. Optimize the layout of the file, the metadata table can greatly optimize the file search efficiency of the query side when writing.

Streaming data lake platform.Here are some basic concepts of Hudi .

Timeline service is the core abstraction of Hudi transaction layer. All data operations in Hudi are carried out around the timeline service. Each operation is bound to a specific timestamp through the instant abstraction. A series of instants constitute the timeline service, and each instance records The corresponding action and state are displayed. Through the timeline service, Hudi can know the status of the current table operation . Through a set of file system view abstraction combined with the timeline service, it can expose the file layout view at a specific timestamp to the current reader and writer of the table.

File group is the core abstraction of Hudi in the file layout layer. Each file group is equivalent to a bucket, which is divided by file size. Each write behavior of it will generate a new version, and a version is abstracted as a file slice. , the file slice internally maintains the corresponding version of the data file. When a file group is written to the specified file size, a new file group will be switched.
Hudi 's writing behavior in file slices can be abstracted into two semantics, copy on write and merge on read.

copy on write will write the full amount of data every time, the new data will be merged with the data of the previous file slice, and then a new file slice will be written to generate a new bucket file.

streaming data lake platform.Merge on read is more complicated. Its semantics is to append write, that is, only incremental data is written each time, so no new file slice will be written. It first tries to append the previous file slice, and only cuts the new file slice when the written file slice is included in the compression plan.

2. streaming data lake platform.Flink Hudi Integration

Flink Hudi 's write pipeline consists of several operators. The first operator is responsible for converting the rowdata of the table layer into Hudi 's message format HudiRecord . Then go through a Bucket Assigner, which is mainly responsible for allocating the converted HudiRecord to a specific file group, and then the records of the divided file group will flow into the Writer operator to perform real file writing. Finally, there is a coordinator, which is responsible for the table service scheduling of the Hudi table layer and the initiation and submission of new transactions. In addition, there are some background cleaning roles responsible for cleaning old versions of data.

In the current design, each bucket assign task will hold a bucket assigner, which independently maintains its own set of file groups. When writing new data or non-updated insert data, the bucket assign task will scan the file view, and preferentially write this batch of new data to the file group determined to be a small bucket.

For example, in the above picture, the default size of file group is 120M, then task1 in the left picture will be written to file group1 and file group2 first. Note that file group3 will not be written here, because file group3 already has 100M data, which is closer to the target threshold. The bucket is no longer written to avoid excessive write amplification. The task2 in the figure on the right will directly write a new file group, and will not append those larger file groups that have already been written.

Next, introduce Flink The state switching mechanism of the Hudi writing process. When the job is just started, the coordinator will first try to create a new table on the file system. If the current table does not exist, it will write some meta information on the file directory, that is, build a table. After receiving the initialization meta information of all tasks, the coordinator will start a new transaction. After the write task sees the initiation of the transaction, it will unlock the flush behavior of the current data.

Write Task will first accumulate a batch of data. There are two flush strategies. One is that the current data buffer reaches the specified size, and the data in the memory will be flushed out; the other is that when the upstream checkpoint barrier reaches the required size When taking a snapshot, all in-memory data is flushed to disk. After each flush data, the meta information will be sent to the coordinator. After the coordinator receives the success event of the checkpoint, it will submit the corresponding transaction and initiate the next new transaction. After the writer task sees the new transaction, it will unlock the writing of the next round of transactions. In this way, the entire writing process is strung together.

Flink Hudi Write provides a very rich writing scene. Currently supports writing to the log data type, that is, the non-update data type, and supports small file merging. In addition, Hudi 's core writing scenarios such as update streams and CDC data are also supported by Hudi . Meanwhile, Flink Hudi also supports efficient batch import of historical data. In the bucket insert mode, offline data such as Hive or offline data in the database can be efficiently imported into Hudi format through batch query. Also, Flink Hudi also provides full and incremental index loading. Users can efficiently import batch data into the lake format at one time, and then implement full-to-incremental data import through the docking stream writer.

Flink The Hudi read side also supports a very rich query view. Currently, it mainly supports full reading, incremental reading of historical time range, and streaming reading.

The above picture is a section through Flink The example of sql writing Hudi , Hudi supports very rich use cases, and also simplifies the parameters that users need to configure. By simply configuring table path, concurrency and operation type, users can easily write upstream data into Hudi format.

3.streaming data lake platform. Flink Hudi Use Case

The following introduces Flink Classic application scenarios of Hudi .
The first classic scenario is DB import data lake. At present, there are two ways to import DB data into the data lake : you can import full and incremental data into Hudi format at one time through the CDC connector; you can also import data into Hudi format through Flink 's CDC format by consuming the CDC changelog on Kafka. .

The second classic scenario is ETL (near real-time olap analysis) for stream computing. By docking the upstream stream to calculate some simple ETL, such as dual-stream join or dual-stream join followed by an agg , the change stream is directly written into the Hudi format, and then the downstream read end can be docked with traditional classic olap engines such as presto and spark. End- to -end near real-time queries.

The third classic scenario is somewhat similar to the second one. Hudi supports native changelog, which supports saving row-level changes in Flink computing. Based on this capability, end-to-end near-real-time ETL production can be realized by streaming reading and consuming changes .

In the future, the two major versions of the community will focus on stream reading and stream writing , and will strengthen the semantics of stream reading; in addition, self-management will be done in catalog and metadata; we will also launch a trino native connector in the near future. Support, replace the current way of reading Hive, improve efficiency.

4. Streaming data lake platform.Apache Hudi Roadmap

The following is a demonstration of MySql to Hudi thousands of tables into the lake.
First of all, we have prepared two libraries for the data source, benchmark1 and benchmark2. There are 100 tables under benchmark1 and 1000 tables under benchmark2. Because thousands of tables are strongly dependent on the catalog, we must first create the catalog. For the data source, we need to create the MySql catalog, and for the target, we need to create the Hudi catalog. MySql catalog is used to obtain all source table related information, including table structure, table data, etc. The Hudi catalog is used to create targets.
executing the two SQL statements, the two catalogs are created successfully.

Next, go to the job development page to create a job with thousands of meters entering the lake. Only 9 simple lines of SQL are required. The first syntax is create database as database. Its function is to synchronize all table structures and table data under the MySql benchmark1 library to the Hudi CDS demo library with one click. The relationship between the tables is one-to-one. map. The second syntax is create table as table. Its function is to synchronize all the matching sbtest.regex tables in the MySql benchmark2 library to the ctas_dema table under Hudi 's DB1. It is a many-to-one mapping relationship and will be divided into points. Merge of library sub-tables.

Then we run and go online, and then go to the job operation and maintenance page to start the job. You can see that the configuration information has been updated, indicating that it has been online again. Then click the Start button to start the job. You can then go to the Job Overview page to view job-related status information.

The above figure is the topology of the job, which is very complex, with 1100 source tables and 101 target tables. Here we have made some optimizations - source merge, which merges all tables into one node, which can be pulled only once in the incremental binlog pull phase, reducing the pressure on MySql .

Next, refresh the oss page, you can see that there is an additional cdas_demo path, enter the subtest1 path, and you can see that metadata is already being written, indicating that the data is actually being written.

Then go to the job development page and write a simple SQL query to a table to verify whether the data is really being written. Execute the SQL statement above, you can see that the data can be queried, and the data is consistent with the inserted data.

Using the metadata capabilities provided by catalog, combined with CDS and CTS syntax, we can easily realize the data entry of thousands of tables into the lake through a few lines of simple SQL, which greatly simplifies the process of data entry into the lake and reduces development, operation and maintenance. workload.

Related Articles

Explore More Special Offers

  1. Short Message Service(SMS) & Mail Service

    50,000 email package starts as low as USD 1.99, 120 short messages start at only USD 1.00