Use Flink Hudi to build a streaming data lake platform
1. Apache Hudi 101
When it comes to data lakes, everyone will have such questions, what is a data lake? Why has the data lake become very popular in the past two years? Data lake is actually not a new concept. The earliest concept of data lake was proposed in the 1980s. At that time, the definition of data lake was the original data layer, which could store various structured, semi-structured and even unstructured data. In many scenarios such as machine learning and real-time analysis, the schema of the data is determined at the time of query.
Lake storage has the characteristics of low cost and high flexibility, which is very suitable for centralized storage in query scenarios. With the rise of cloud services in recent years, especially the maturity of object storage, more and more enterprises choose to build storage services on the cloud. The storage-computing separation architecture of the data lake is very suitable for the current cloud service architecture. It provides basic acid transactions through snapshot isolation, and supports docking with multiple analysis engines to adapt to different query scenarios. It can be said that lake storage is cost-effective and open. had a great advantage.
The current lake storage has begun to assume the function of the data warehouse, and realizes the integrated architecture of the lake and warehouse by connecting with the computing engine. Lake Storage is a table format that encapsulates the advanced semantics of tables based on the original data format. Hudi has put data lakes into practice since 2016. At that time, it was to solve the data update problem on the file system in big data scenarios. The table format of Hudi-like LSM is currently unique among the lake formats, and it is more friendly to near real-time updates. Also relatively perfect.
Table format is the basic attribute of the three currently popular data lake formats, and Hudi has been evolving towards the platform since the beginning of the project. It has relatively complete data governance and table service. For example, when users write, they can concurrently To optimize the layout of files, the metadata table can greatly optimize the efficiency of file search on the query side during writing.
The following introduces some basic concepts of Hudi.
Timeline service is the core abstraction of Hudi's transaction layer. All data operations in Hudi revolve around the timeline service. Each operation is bound to a specific timestamp through the instant abstraction. A series of instants constitute the timeline service. Each instance record The corresponding action and state. Through the timeline service, Hudi can know the status of the current table operation. Through the abstraction of a set of file system views combined with the timeline service, it can expose the file layout view under a specific time stamp to the current reader and writer of the table.
File group is the core abstraction of Hudi in the file layout layer. Each file group is equivalent to a bucket, which is divided by file size. Each write behavior of it will generate a new version, and a version is abstracted as a file slice , the file slice internally maintains the corresponding version of the data file. When a file group is written to the specified file size, a new file group will be switched.
Hudi's writing behavior in file slice can be abstracted into two semantics, copy on write and merge on read.
copy on write writes the full amount of data every time, the new data will be merged with the data of the previous file slice, and then write a new file slice to generate a new bucket file.
Merge on read is more complicated. Its semantics are appending and writing, that is, only incremental data is written each time, so new file slices will not be written. It will first try to append the previous file slice, and only after the written file slice is included in the compression plan, will the new file slice be cut.
2. Flink Hudi Integration
The write pipeline of Flink Hudi consists of several operators. The first operator is responsible for converting the rowdata of the table layer into Hudi's message format HudiRecord. Then through a Bucket Assigner, it is mainly responsible for assigning the converted HudiRecords to specific file groups, and then the records assigned to the file groups will flow into the Writer operator to perform real file writing. Finally, there is a coordinator, which is responsible for the table service scheduling of the Hudi table layer and the initiation and submission of new transactions. In addition, there are some background cleanup roles responsible for cleaning up old versions of data.
In the current design, each bucket assign task will hold a bucket assigner, which independently maintains its own set of file groups. When writing new data or non-updated insert data, the bucket assign task will scan the file view, and give priority to writing this batch of new data into the file group that is judged to be a small bucket.
For example, in the above picture, the default size of file group is 120M, then task1 in the left picture will write to file group1 and file group2 first, and note that file group3 will not be written here, because file group3 already has 100M data, which is relatively close to the target threshold The bucket is no longer written to avoid excessive write amplification. In the right picture, task2 will directly write a new file group, and will not append those larger file groups that have already been written.
Next, the state switching mechanism of the Flink Hudi writing process is introduced. When the job is just started, the coordinator will first try to create this table on the file system. If the current table does not exist, it will write some meta information on the file directory, that is, build a table. After receiving the initialization meta information of all tasks, the coordinator will start a new transaction, and the write task will unlock the flush behavior of the current data after seeing the initiation of the transaction.
Write Task will first accumulate a batch of data. There are two flush strategies here. One is that the current data buffer reaches the specified size, and the data in the memory will be flushed out; the other is when the upstream checkpoint barrier reaches the required When taking a snapshot, all data in memory will be flushed to disk. After each flush data, the meta information will be sent to the coordinator. After the coordinator receives the success event of the checkpoint, it will submit the corresponding transaction and initiate the next new transaction. After the writer task sees the new transaction, it will unlock the writing of the next round of transactions. In this way, the entire writing process is connected in series.
Flink Hudi Write provides very rich writing scenarios. Currently supports writing to the log data type, which is a non-updated data type, and supports small file merging. In addition, Hudi's core writing scenarios such as update streams and CDC data are also supported by Hudi. At the same time, Flink Hudi also supports efficient batch import of historical data. The bucket insert mode can efficiently import offline data such as Hive or offline data in the database into the Hudi format through batch query at one time. In addition, Flink Hudi also provides full and incremental index loading. Users can efficiently import batch data into the lake format at one time, and then realize full and incremental data import through the docking stream writing program.
The Flink Hudi read side also supports a very rich query view. Currently, it mainly supports full reading, incremental reading of historical time range, and streaming reading.
The above figure is an example of writing Hudi through Flink SQL. Hudi supports a lot of use cases, and also simplifies the parameters that users need to configure as much as possible. By simply configuring the table path, concurrency, and operation type, users can easily write upstream data into the Hudi format.
3. Flink Hudi Use Case
The following introduces the classic application scenarios of Flink Hudi.
The first classic scenario is DB import into data lake. At present, there are two ways to import DB data into the data lake: you can import full and incremental data into the Hudi format at one time through the CDC connector; you can also import the data into the Hudi format through the CDC format of Flink by consuming the CDC changelog on Kafka .
The second classic scenario is ETL (near real-time olap analysis) of stream computing. Some simple ETLs are calculated by connecting the upstream flow, such as dual-stream join or dual-stream join to an agg, directly write the change stream into the Hudi format, and then the downstream read end can be connected to the traditional classic olap engine such as presto and spark as the end End-to-end near real-time query.
The third classic scenario is somewhat similar to the second one. Hudi supports native changelog, that is, it supports saving row-level changes in Flink calculations. Based on this capability, end-to-end near-real-time ETL production can be realized through stream reading consumption changes.
In the future, the two major versions of the community will focus on stream reading and stream writing, and will strengthen the semantics of stream reading; in addition, self-management will be done in terms of catalog and metadata; we will also launch a Trino-native connector in the near future Support, replace the current way of reading Hive, and improve efficiency.
4. Apache Hudi Roadmap
The following is a demo of MySql to Hudi thousands of meters into the lake.
First of all, we have prepared two databases for the data source, benchmark1 and benchmark2. There are 100 tables under benchmark1 and 1000 tables under benchmark2. Because thousands of meters into the lake are strongly dependent on the catalog, we first need to create a catalog. For the data source, we need to create a MySql catalog, and for the target, we need to create a Hudi catalog. MySql catalog is used to obtain information related to all source tables, including table structure, table data, etc. The Hudi catalog is used to create targets.
After executing the two sql statements, the two catalogs are created successfully.
Next, go to the job development page to create a job with thousands of meters into the lake. Only 9 simple lines of SQL are needed. The first syntax is create database as database. Its function is to synchronize all the table structures and table data under the MySql benchmark1 library to the Hudi CDS demo library with one key. The relationship between the tables is one-to-one map. The second syntax is create table as table. Its function is to synchronize all tables matching sbtest.regular expressions under the MySql benchmark2 library to the ctas_dema table under Hudi's DB1. It is a many-to-one mapping relationship and will be divided into Merge of database sub-tables.
Then we run and go online, and then go to the job operation and maintenance page to start the job. You can see that the configuration information has been updated, indicating that it has been online again. Then click the Start button to start the job. Then you can go to the job overview page to view job-related status information.
The above figure is the topology of the job, which is very complex, with 1100 source tables and 101 target tables. Here we have made some optimizations - source merge, which merges all tables into one node, which can be pulled only once during the incremental binlog pull phase, reducing the pressure on MySql.
Next refresh the oss page, you can see that there is an additional cdas_demo path, enter the subtest1 path, you can see that metadata is already being written, indicating that the data is actually being written.
Then go to the job development page and write a simple SQL query to a certain table to verify whether the data is actually being written. Execute the SQL statement in the above figure, and you can see that the data can be queried, and the data is consistent with the inserted data.
We use the metadata capabilities provided by the catalog, combined with CDS and CTS syntax, and through a few lines of simple SQL, we can easily implement the data of thousands of tables into the lake, which greatly simplifies the process of data into the lake and reduces the development and maintenance workload.
When it comes to data lakes, everyone will have such questions, what is a data lake? Why has the data lake become very popular in the past two years? Data lake is actually not a new concept. The earliest concept of data lake was proposed in the 1980s. At that time, the definition of data lake was the original data layer, which could store various structured, semi-structured and even unstructured data. In many scenarios such as machine learning and real-time analysis, the schema of the data is determined at the time of query.
Lake storage has the characteristics of low cost and high flexibility, which is very suitable for centralized storage in query scenarios. With the rise of cloud services in recent years, especially the maturity of object storage, more and more enterprises choose to build storage services on the cloud. The storage-computing separation architecture of the data lake is very suitable for the current cloud service architecture. It provides basic acid transactions through snapshot isolation, and supports docking with multiple analysis engines to adapt to different query scenarios. It can be said that lake storage is cost-effective and open. had a great advantage.
The current lake storage has begun to assume the function of the data warehouse, and realizes the integrated architecture of the lake and warehouse by connecting with the computing engine. Lake Storage is a table format that encapsulates the advanced semantics of tables based on the original data format. Hudi has put data lakes into practice since 2016. At that time, it was to solve the data update problem on the file system in big data scenarios. The table format of Hudi-like LSM is currently unique among the lake formats, and it is more friendly to near real-time updates. Also relatively perfect.
Table format is the basic attribute of the three currently popular data lake formats, and Hudi has been evolving towards the platform since the beginning of the project. It has relatively complete data governance and table service. For example, when users write, they can concurrently To optimize the layout of files, the metadata table can greatly optimize the efficiency of file search on the query side during writing.
The following introduces some basic concepts of Hudi.
Timeline service is the core abstraction of Hudi's transaction layer. All data operations in Hudi revolve around the timeline service. Each operation is bound to a specific timestamp through the instant abstraction. A series of instants constitute the timeline service. Each instance record The corresponding action and state. Through the timeline service, Hudi can know the status of the current table operation. Through the abstraction of a set of file system views combined with the timeline service, it can expose the file layout view under a specific time stamp to the current reader and writer of the table.
File group is the core abstraction of Hudi in the file layout layer. Each file group is equivalent to a bucket, which is divided by file size. Each write behavior of it will generate a new version, and a version is abstracted as a file slice , the file slice internally maintains the corresponding version of the data file. When a file group is written to the specified file size, a new file group will be switched.
Hudi's writing behavior in file slice can be abstracted into two semantics, copy on write and merge on read.
copy on write writes the full amount of data every time, the new data will be merged with the data of the previous file slice, and then write a new file slice to generate a new bucket file.
Merge on read is more complicated. Its semantics are appending and writing, that is, only incremental data is written each time, so new file slices will not be written. It will first try to append the previous file slice, and only after the written file slice is included in the compression plan, will the new file slice be cut.
2. Flink Hudi Integration
The write pipeline of Flink Hudi consists of several operators. The first operator is responsible for converting the rowdata of the table layer into Hudi's message format HudiRecord. Then through a Bucket Assigner, it is mainly responsible for assigning the converted HudiRecords to specific file groups, and then the records assigned to the file groups will flow into the Writer operator to perform real file writing. Finally, there is a coordinator, which is responsible for the table service scheduling of the Hudi table layer and the initiation and submission of new transactions. In addition, there are some background cleanup roles responsible for cleaning up old versions of data.
In the current design, each bucket assign task will hold a bucket assigner, which independently maintains its own set of file groups. When writing new data or non-updated insert data, the bucket assign task will scan the file view, and give priority to writing this batch of new data into the file group that is judged to be a small bucket.
For example, in the above picture, the default size of file group is 120M, then task1 in the left picture will write to file group1 and file group2 first, and note that file group3 will not be written here, because file group3 already has 100M data, which is relatively close to the target threshold The bucket is no longer written to avoid excessive write amplification. In the right picture, task2 will directly write a new file group, and will not append those larger file groups that have already been written.
Next, the state switching mechanism of the Flink Hudi writing process is introduced. When the job is just started, the coordinator will first try to create this table on the file system. If the current table does not exist, it will write some meta information on the file directory, that is, build a table. After receiving the initialization meta information of all tasks, the coordinator will start a new transaction, and the write task will unlock the flush behavior of the current data after seeing the initiation of the transaction.
Write Task will first accumulate a batch of data. There are two flush strategies here. One is that the current data buffer reaches the specified size, and the data in the memory will be flushed out; the other is when the upstream checkpoint barrier reaches the required When taking a snapshot, all data in memory will be flushed to disk. After each flush data, the meta information will be sent to the coordinator. After the coordinator receives the success event of the checkpoint, it will submit the corresponding transaction and initiate the next new transaction. After the writer task sees the new transaction, it will unlock the writing of the next round of transactions. In this way, the entire writing process is connected in series.
Flink Hudi Write provides very rich writing scenarios. Currently supports writing to the log data type, which is a non-updated data type, and supports small file merging. In addition, Hudi's core writing scenarios such as update streams and CDC data are also supported by Hudi. At the same time, Flink Hudi also supports efficient batch import of historical data. The bucket insert mode can efficiently import offline data such as Hive or offline data in the database into the Hudi format through batch query at one time. In addition, Flink Hudi also provides full and incremental index loading. Users can efficiently import batch data into the lake format at one time, and then realize full and incremental data import through the docking stream writing program.
The Flink Hudi read side also supports a very rich query view. Currently, it mainly supports full reading, incremental reading of historical time range, and streaming reading.
The above figure is an example of writing Hudi through Flink SQL. Hudi supports a lot of use cases, and also simplifies the parameters that users need to configure as much as possible. By simply configuring the table path, concurrency, and operation type, users can easily write upstream data into the Hudi format.
3. Flink Hudi Use Case
The following introduces the classic application scenarios of Flink Hudi.
The first classic scenario is DB import into data lake. At present, there are two ways to import DB data into the data lake: you can import full and incremental data into the Hudi format at one time through the CDC connector; you can also import the data into the Hudi format through the CDC format of Flink by consuming the CDC changelog on Kafka .
The second classic scenario is ETL (near real-time olap analysis) of stream computing. Some simple ETLs are calculated by connecting the upstream flow, such as dual-stream join or dual-stream join to an agg, directly write the change stream into the Hudi format, and then the downstream read end can be connected to the traditional classic olap engine such as presto and spark as the end End-to-end near real-time query.
The third classic scenario is somewhat similar to the second one. Hudi supports native changelog, that is, it supports saving row-level changes in Flink calculations. Based on this capability, end-to-end near-real-time ETL production can be realized through stream reading consumption changes.
In the future, the two major versions of the community will focus on stream reading and stream writing, and will strengthen the semantics of stream reading; in addition, self-management will be done in terms of catalog and metadata; we will also launch a Trino-native connector in the near future Support, replace the current way of reading Hive, and improve efficiency.
4. Apache Hudi Roadmap
The following is a demo of MySql to Hudi thousands of meters into the lake.
First of all, we have prepared two databases for the data source, benchmark1 and benchmark2. There are 100 tables under benchmark1 and 1000 tables under benchmark2. Because thousands of meters into the lake are strongly dependent on the catalog, we first need to create a catalog. For the data source, we need to create a MySql catalog, and for the target, we need to create a Hudi catalog. MySql catalog is used to obtain information related to all source tables, including table structure, table data, etc. The Hudi catalog is used to create targets.
After executing the two sql statements, the two catalogs are created successfully.
Next, go to the job development page to create a job with thousands of meters into the lake. Only 9 simple lines of SQL are needed. The first syntax is create database as database. Its function is to synchronize all the table structures and table data under the MySql benchmark1 library to the Hudi CDS demo library with one key. The relationship between the tables is one-to-one map. The second syntax is create table as table. Its function is to synchronize all tables matching sbtest.regular expressions under the MySql benchmark2 library to the ctas_dema table under Hudi's DB1. It is a many-to-one mapping relationship and will be divided into Merge of database sub-tables.
Then we run and go online, and then go to the job operation and maintenance page to start the job. You can see that the configuration information has been updated, indicating that it has been online again. Then click the Start button to start the job. Then you can go to the job overview page to view job-related status information.
The above figure is the topology of the job, which is very complex, with 1100 source tables and 101 target tables. Here we have made some optimizations - source merge, which merges all tables into one node, which can be pulled only once during the incremental binlog pull phase, reducing the pressure on MySql.
Next refresh the oss page, you can see that there is an additional cdas_demo path, enter the subtest1 path, you can see that metadata is already being written, indicating that the data is actually being written.
Then go to the job development page and write a simple SQL query to a certain table to verify whether the data is actually being written. Execute the SQL statement in the above figure, and you can see that the data can be queried, and the data is consistent with the inserted data.
We use the metadata capabilities provided by the catalog, combined with CDS and CTS syntax, and through a few lines of simple SQL, we can easily implement the data of thousands of tables into the lake, which greatly simplifies the process of data into the lake and reduces the development and maintenance workload.
Related Articles
-
A detailed explanation of Hadoop core architecture HDFS
Knowledge Base Team
-
What Does IOT Mean
Knowledge Base Team
-
6 Optional Technologies for Data Storage
Knowledge Base Team
-
What Is Blockchain Technology
Knowledge Base Team
Explore More Special Offers
-
Short Message Service(SMS) & Mail Service
50,000 email package starts as low as USD 1.99, 120 short messages start at only USD 1.00