Flink Kylin Hudi Lake warehouse integrated big data ecosystem
1、 Integrated architecture of lake and warehouse
Data lake and Data Warehouse
Since talking about the integration of lakes and warehouses, let's first understand what a lake is and what a warehouse is. Data lake is a very old concept, which has been popular in recent years. Up to now, the industry does not have a unified definition of data lake. AWS is the first cloud service provider to launch data lake solutions on the cloud, Here we refer to AWS's definition of the data lake: "The data lake is a centralized repository that allows data of any structure to be stored and applied to big data processing, real-time analysis, machine learning and other related application scenarios." Similarly, we use AWS to define the data warehouse: "The data warehouse is a central repository of information." The information here can be analyzed and more informed decisions can be made.
This definition still has a detailed development. AWS illustrates the difference and connection between the data lake and the data warehouse by showing the relationship of data flow from the lake to the warehouse. First, the data originally exists in the data lake or database, and then after data screening and preparation, it will flow to the data warehouse for some high-value analysis. This comparison table intuitively compares data lake and warehouse from six dimensions: data, schema, cost performance, data quality, users and analysis.
A precedent for the integration of lakes and warehouses
This year we heard about the concept of "integrating lakes and warehouses" mentioned by Alibaba. I don't know if everyone has ever thought about whether there is a successful precedent for Hucang Integration in the industry? I personally think there is. In September 2020, a company named Snowflake went public on the New York Stock Exchange. Snowflake is a cloud data warehouse company that provides SaaS platforms based on the infrastructure provided by cloud vendors, providing data hosting and analysis services for small and medium-sized enterprises. Snowflake claims to be a cloud data warehouse company and published a paper at the 2016 Data Summit to introduce its elastic data warehouse architecture and some technical details.
Snowflake is actually an architecture based on object storage on the cloud, where one copy stores multiple copies of computation and separates computation from storage. In fact, this is the architecture of the data lake promoted by AWS and current mainstream cloud manufacturers. On the first day of Snowflake's listing, its market value soared to $70 billion. So personally, I think Snowflake can be considered as the most successful precedent for implementing the integration of lakes and warehouses. You can go and learn about the paper just mentioned. I have picked out these 5 points to share with everyone briefly:
Firstly, the first point is not to adopt the widely used Shared Nothing architecture in traditional data warehouses, but to shift towards the Shared Data architecture.
Secondly, the separation of storage and computation, which is emphasized in the paper, is the most valuable viewpoint in my opinion. He proposed the concept of unified storage followed by elastic computing.
Thirdly, data warehousing and services are the most successful aspects of their commercialization in my opinion. It provides a SaaS like experience for data warehouses and abandons the traditional bias that data warehouses are big and heavy.
Fourthly, high availability is a crucial aspect in improving user experience and fault tolerance.
Finally, the extension of structured to semi structured has already demonstrated their ability to explore universal data on the lake at that time.
Although this is a paper from 2016, the concepts in it are not outdated and still worth learning from. In the future, we will briefly introduce a few points that we have absorbed and will practice, and these points are also crucial for T3 travel to achieve the integration of lakes and warehouses.
The advantages of Shared - Nothing architecture
Firstly, as an architecture widely used by many traditional data warehouses, Shared Nothing still has some architectural advantages:
The first point is that the data on the Table can be horizontally partitioned across nodes, and each node has its own local storage. The computing resources of each node only focus on processing the data stored by each node itself.
So another advantage of it is its relatively simple processing mechanism, which is a typical architecture in the field of data warehousing.
Disadvantages of Shared - Nothing architecture
This architecture actually has some shortcomings:
The biggest point is that it couples computing and storage resources,
At the same time, it also brings a second problem, which is insufficient elasticity. This can be reflected in two aspects.
a. When expanding or shrinking a cluster, data needs to be heavily redistributed
b. There is no way to simply uninstall unused computing resources
The third issue is that coupled computing and storage resources also result in its limited availability. Due to these so-called stateful calculations, they can significantly affect performance in the event of failure or upgrade, leading to an overall unavailability of the service.
Finally, there is the issue of homogeneous resources and heterogeneous workloads. Because in the scenario of data warehousing, we have many heterogeneous workloads, such as batch loading, querying, large-scale calculation and analysis of reports, and so on. But the resources of the Shared Nothing architecture are isomorphic, so this leads to a collision between the two.
Shared - Data Architecture
Based on these issues, Snowflake proposed an architecture called Multi Cluster Shared Data. We have made a simple adjustment to the official image here.
The first advantage of this architecture is that it has no data silos and is a unified storage. This also enables decoupling of storage from calculations.
The second advantage is to accommodate structured and unstructured data based on current object storage.
Thirdly, its cluster size can act elastically.
Fourthly, the above features also bring the low-cost advantage of on-demand computing.
Next, we will review this architecture in a hierarchical form. Overall, its structure can be roughly divided into three levels.
The bottom layer is the object storage provided by cloud vendors, that is, user storage.
The middle layer is a multi-purpose and multi-purpose computing cluster.
The management service of the data lake, which stores a large SaaS platform, plays a role in the management of the entire underlying storage and computing cluster.
Continuous high availability of Shared - Data
The next point is the high availability of this architecture. This can be simply decomposed into two aspects. The first is fault tolerance, and the second is online upgrade.
Firstly, as a SaaS based application, its fault tolerance needs to be reflected in the overall architecture. Here we will also review in layers.
The lowest level storage layer utilizes the object storage capabilities of cloud vendors, which is a mechanism for cross center replication and near infinite expansion, so users basically do not need to care.
Up there is a diverse computing cluster. Each computing cluster is located within the same data center to ensure its network transmission performance. Here is a question mentioned, it is possible that a certain computing cluster may have node failures. If a node fails in a query, these computing nodes will return this status to the service layer above. After accepting this failure, the service layer will pass the calculation back to the available nodes for a second query. So, in the architecture where Shared Data storage and computation are separated, nodes are almost stateless in computation. The failure of one node in this architecture is not a very big problem.
Furthermore, the service layer also utilizes the ability of object storage for metadata storage. So this service layer can basically be seen as a stateless service.
The top layer is a load balancer that can perform service redundancy and load balancing.
The second point is that online upgrading mainly utilizes two designs, but this is not a very novel approach. One is to map multiple aspects between the computing layer and the service layer, and then switch the grayscale. Here, it can be seen that there are multiple versions in the computing layer, and local caches are shared between these versions. The metadata management of the service layer is also shared in multiple aspects. This is actually a sub Shared Data within the architecture, which has the ability to upgrade and smooth grayscale for data sharing between multiple versions.
Next, my colleague (Wang Xianghu) will introduce these three frameworks and how they integrate and ultimately support the practice of integrating T3 lakes and warehouses. Before introducing the second topic, he will first introduce our main framework, Hudi and Kylin frameworks, and then introduce how the three of them are integrated in pairs. Finally, we will introduce how T3 constructs a lake warehouse integrated system.
2、 Introduction and Integration of Flink/Hudi/Kylin
Hudi
First, let's understand what Hudi is. Hudi was originally a data lake framework designed and developed by Uber engineers to meet their data analysis needs. It joined the Apache Incubator in January 2019 and successfully graduated in May 2020, becoming a top-level project for Apache. Hudi's name comes from the abbreviation Hadoop Upserts Delete and Increments. In other words, Hudi is a data lake framework that supports insert, update, delete, and incremental processing. In addition, it also supports transactional ACID incremental processing, storage management, and time management. Hudi is able to manage hundreds of petabytes of large-scale analytical datasets on the cloud, making it very convenient for all cloud services to be used out of the box. Moreover, it has been running stably within Uber for nearly 4 years.
The following figure shows Hudi's plug-in architecture. We can see that Hudi has relatively loose support in storage, data processing engines, table types, index types, query views, and query engines. That is to say, it is not bound to a certain component.
In terms of storage, Hudi can support HDFS, OSS, and S3.
In terms of data processing engines, Hudi supports Flink and Spark. Java and Python clients are already supported in the community. Hudi supports two types of tables, COW and MOR, which correspond to two scenarios: low latency queries and fast ingestion.
In terms of indexing, Hudi supports four types of indexing, including Bloom and HBase. The underlying layer uses Parquet and Avro to store data, and the community is also providing support for ORC format and SQL. We believe we will meet in the near future.
Hudi supports three types of queries: read optimized queries, incremental queries, and snapshot queries. In terms of query engines, there are Spark, Presto, Hive, and Impala, which are actually supported by some other components.
Below is a detailed introduction to storage modes and views.
The first is the Copy On Write mode, which corresponds to Hudi's COW table. It is a table that focuses on low latency data query scenarios, and the underlying layer uses Parquet data files to store data. It can support two query methods: snapshot query and incremental query. In terms of query engines, you can see that there are 5 engines above, which have varying degrees of support for snapshot queries, incremental queries, and read optimization views.
The Merge On Read table complements Copy On Write at different levels, as it focuses on fast data ingestion scenarios. Use Parquet files to store specific data, and use row Avro incremental files to store operation logs, similar to HBase WAL. It supports all three views of Hudi, and it can be seen that Hive, Spark SQL, Spark Datasource, Presto, and Impala support read optimized queries. Hive and Spark SQL only support snapshot queries. You can check the information supported by this component on the official website in the future.
In travel business, orders have the attribute of long tail payment. That is to say, after an order starts, its payment process may be delayed for a long time. In other words, it may only make the payment before the user's next trip (or it may take longer, or even never make the payment). This long tail attribute will result in an overly long business closed-loop window, which will prevent us from accurately predicting the timing of data updates. If there are multi-level updates, the link will be longer and the update cost will be very high.
The following diagram shows the frequent updates of cold data caused by our long tail update. On the left is the business library, and on the right are three schematic tables with dependencies. When there is data update in the business library, the data that needs to be updated on the right may have been archived to a device with relatively poor performance, increasing the cost of data update. And if this data update triggers a long chain cascade update, this slow I/O will be further amplified.
The reliability of data is also an inevitable issue in data ETL. Perhaps due to machine malfunctions or computational logic, the processing data may be distorted or completely incorrect, which can have a significant impact on operational decisions. In terms of digital latency, in traditional architectures based on Hive components, due to the lack of indexing mechanism in Hive, data updates often result in data partition rewriting and cannot be deleted in place. Secondly, small file issues can increase the burden of NameNode storage and queries, slow down processes, and to some extent increase data latency.
Kylin framework
Let's introduce this Kylin framework again. Compared to Hudi, everyone should be relatively familiar with Kylin. It is an open source distributed analytical data warehouse that can provide data query windows on Hadoop/Park SQL. Initially opened by eBay and contributed to the open source community, it was able to query huge tables in sub seconds. The secret of it is actually to perform pre computation, budget multiple dimensional combinations of metrics for a star topology data cube, write the results out to the output table, and expose the query interface to achieve real-time queries, which means exchanging space for access time.
Kylin released version 4.0 alpha in September of this year, which is a major architecture upgrade after Kylin 3. Using Parquet instead of Hbase storage improves file scanning performance and reduces or even eliminates the maintenance burden of Hbase. Kylin4 reimagines the Spark build engine and query engine, separating computing and storage, and making it more suitable for cloud native technology trends.
Fusion between Flink/Hudi/Kylin frameworks
With the release of Kylin 3.1, the integration of Kylin and Flink has been completed. This feature was completed in 2019, and the integration of Kylin and Flink began in January last year, implemented through Flink Batch. Regarding Hudi fusion, it can be said that Kylin and Hudi are inherently compatible because Hudi can expose itself as a Hive table, and users can use Hudi's data like reading Hive, which will be very friendly to Kylin. Because Kylin can seamlessly use data using Hudi as a Hive table. The integration of Hudi and Flink is my main contribution to the community this year. These two screenshots correspond to the two milestone PR's on the integration of Hudi and Flink.
The first Hudi client supports multiple engines, decoupling Hudi from Spark, making it possible for Hudi to support multiple engines.
The second is to contribute the basic implementation of the Flink client to the community, allowing Hudi to truly write to the Flink data table. These two changes are very significant, and together they have exceeded 10000 lines of code, making them a prominent feature of the Hudi community this year.
The fusion process of Hudi and Flink
Below is a detailed introduction to the fusion process of Hudi and Flink. Hudi originally only supported Spark engines, so the first step was to decouple Hudi from Spark before integrating the engine we wanted.
The difficulty of decoupling is that Hudi did not initially consider the support of multiple engines, so RDD is ubiquitous from reading data from the data source to ultimately writing it out to the Hudi table. Even ordinary tool classes use RDD as the basic operating unit. Decoupling from Spark, we evaluated that his changes were very significant. Secondly, there are differences in core abstractions between Flink and Spark. Spark believes that data is a limited dataset, while Flink believes that data is unbounded and a data stream. This difference in abstraction makes it difficult for us to unify a universal abstraction.
This change is a bone breaking experience for Hudi, so we have decided to prioritize ensuring the functionality and performance of the original Hudi, while sacrificing some of the Flink Stream API. Let Flink operate the list, and use Spark to operate RDD. This way, a generic can be extracted to form a unified abstraction layer.
Abstract principle:
Unified use of generics I, K, O instead.
To de Spark, the abstract layer API is engine independent and difficult to implement in the abstract layer. We will change it to an abstract method and push it down to Spark subclass implementation.
Do not affect the original version, minimize changes to the abstract layer as much as possible to ensure fixed functionality.
Introducing HoodieEngineContext instead of JavaSparkContext to provide runtime context.
Next, let's talk about Flink Client DAG, which is mainly divided into 5 parts
The first part is the Kafka Streaming Source, which is mainly used to receive Kafka data and convert it into a List.
The second one is the InstantGeneratorOperator, a Flink operator used to generate globally unique instants.
The third is the KeyBy partition operation, which avoids conflicts caused by multiple subtasks writing data to the same partition based on the partitionPath partition.
The fourth one is the WriteProcessOperator, which is also a custom operator for us. This operator is where the write operation actually occurs.
The fifth one is CommitSink, which will accept the data sent by the upstream WriteProcessOperator and determine whether to commit the transaction based on the upstream data.
The following is a code example of Flink updates. On the left is a simplified version of HoodieWriteClient from the original version,
You can see that the input parameter of the insert function is RDD, and the return value is also RDD. After the abstraction on the right, you can see that its input parameter has changed to generic I and its return value has changed to O. If you are interested, you can go and learn more.
Here is another idea we have on how Flink can be integrated, which is to create a streaming source and use Flink to build a complete ETL pipeline that reads data from the Hudi table and then writes it out to the Hudi table.
Then is our preliminary idea. The gray image on the left contains 5 columns of Hudi metadata. On the far left is Hoodie_ commit_ Time transaction list. Every hoodie_ commit_ Time corresponds to a transaction, and each transaction corresponds to a batch of data. Each record in each batch of data will have a submitted serial number, which is the second column of hoodie_ commit_ Seqno serial number. hoodie_ commit_ Time and Hoodie_ commit_ The mapping relationship of seqno is very similar to the mapping relationship between partitions and offsets in Kafka. In the future, we may implement a Hoodie Streaming Source based on this feature.
Based on the fusion relationship between these three frameworks, we found that the three engines used for computation, analysis, and storage are mutually compatible. And they can support the integration of lakes and warehouses, moving closer to the cloud native system.
3、 Practice of integrating T3 travel structure with lake and warehouse
Finally, let's take a look at how T3 travel constructs a lake warehouse integration. This is the architecture of our T3 car networking system, which continuously empowers from the bottom up, from basic support to the upper level, and interacts with the information systems of car companies and national information platforms. As a transportation company driven by the Internet of Vehicles, we have collected data related to people, cars, roads, and other aspects. Each type of data has its own application scenario, and the data is not isolated and mutually empowering, jointly supporting T3 smart transportation.
This is our database architecture that separates storage and computing. The entire architecture is divided into two layers: the computing layer and the storage layer.
In the computing layer, we used Flink, Spark, Kylin, and Presto, along with ES for task scheduling. Da Vinci and Zeppelin were used in data analysis and presentation.
In the storage layer, we used Alibaba Cloud OSS and paired it with HDFS for data storage. In terms of data format, Hudi is used as the main storage format, combined with Parquet, ORC, and Json files. Before computing and storing, we added an Alluxio to accelerate and improve data processing performance. I used Yarn in resource management and will also switch to K8s when the timing is ripe in the later stages.
In the current trend of separating storage and computing, we have also built a complete big data ecosystem around lake storage, including lake accelerated lake computing, OLAP analysis, interactive queries, visualization, and more.
T3 Application Scenario for Hudi
Below are several application scenarios for Hudi within T3.
One is a near real-time streaming data pipeline. We can import the business data into the data pipeline from the left through Log, MySQL, or Kafka, and then use Flink or the original DeltaStreamer to input the streaming data into the list.
On the Flink UI interface for near real-time streaming data processing, it can be seen that several DAG operators introduced earlier are included, such as source and instant_ Generator, etc.
Another scenario is near real-time data analysis. We use Hive, Spark, or Presto to query data and ultimately use Leonardo da Vinci or Zeppelin to create the final data report.
This is the incremental data pipeline we built using Hudi. After capturing the leftmost CDC data, it needs to be updated to the following series of tables. After having Hudi, because Hudi supports indexing and incremental data processing, we only need to update the data that needs to be updated, and there is no need to update the entire partition or table as before.
The final scenario is to use Flink to subscribe online or business data to an ETL in a Hudi table for machine learning. However, machine learning requires a data foundation, so we use Hudi to incrementally publish online data to offline environments for model training or parameter tuning. Afterwards, we will publish the model online to provide services for our business.
Data lake and Data Warehouse
Since talking about the integration of lakes and warehouses, let's first understand what a lake is and what a warehouse is. Data lake is a very old concept, which has been popular in recent years. Up to now, the industry does not have a unified definition of data lake. AWS is the first cloud service provider to launch data lake solutions on the cloud, Here we refer to AWS's definition of the data lake: "The data lake is a centralized repository that allows data of any structure to be stored and applied to big data processing, real-time analysis, machine learning and other related application scenarios." Similarly, we use AWS to define the data warehouse: "The data warehouse is a central repository of information." The information here can be analyzed and more informed decisions can be made.
This definition still has a detailed development. AWS illustrates the difference and connection between the data lake and the data warehouse by showing the relationship of data flow from the lake to the warehouse. First, the data originally exists in the data lake or database, and then after data screening and preparation, it will flow to the data warehouse for some high-value analysis. This comparison table intuitively compares data lake and warehouse from six dimensions: data, schema, cost performance, data quality, users and analysis.
A precedent for the integration of lakes and warehouses
This year we heard about the concept of "integrating lakes and warehouses" mentioned by Alibaba. I don't know if everyone has ever thought about whether there is a successful precedent for Hucang Integration in the industry? I personally think there is. In September 2020, a company named Snowflake went public on the New York Stock Exchange. Snowflake is a cloud data warehouse company that provides SaaS platforms based on the infrastructure provided by cloud vendors, providing data hosting and analysis services for small and medium-sized enterprises. Snowflake claims to be a cloud data warehouse company and published a paper at the 2016 Data Summit to introduce its elastic data warehouse architecture and some technical details.
Snowflake is actually an architecture based on object storage on the cloud, where one copy stores multiple copies of computation and separates computation from storage. In fact, this is the architecture of the data lake promoted by AWS and current mainstream cloud manufacturers. On the first day of Snowflake's listing, its market value soared to $70 billion. So personally, I think Snowflake can be considered as the most successful precedent for implementing the integration of lakes and warehouses. You can go and learn about the paper just mentioned. I have picked out these 5 points to share with everyone briefly:
Firstly, the first point is not to adopt the widely used Shared Nothing architecture in traditional data warehouses, but to shift towards the Shared Data architecture.
Secondly, the separation of storage and computation, which is emphasized in the paper, is the most valuable viewpoint in my opinion. He proposed the concept of unified storage followed by elastic computing.
Thirdly, data warehousing and services are the most successful aspects of their commercialization in my opinion. It provides a SaaS like experience for data warehouses and abandons the traditional bias that data warehouses are big and heavy.
Fourthly, high availability is a crucial aspect in improving user experience and fault tolerance.
Finally, the extension of structured to semi structured has already demonstrated their ability to explore universal data on the lake at that time.
Although this is a paper from 2016, the concepts in it are not outdated and still worth learning from. In the future, we will briefly introduce a few points that we have absorbed and will practice, and these points are also crucial for T3 travel to achieve the integration of lakes and warehouses.
The advantages of Shared - Nothing architecture
Firstly, as an architecture widely used by many traditional data warehouses, Shared Nothing still has some architectural advantages:
The first point is that the data on the Table can be horizontally partitioned across nodes, and each node has its own local storage. The computing resources of each node only focus on processing the data stored by each node itself.
So another advantage of it is its relatively simple processing mechanism, which is a typical architecture in the field of data warehousing.
Disadvantages of Shared - Nothing architecture
This architecture actually has some shortcomings:
The biggest point is that it couples computing and storage resources,
At the same time, it also brings a second problem, which is insufficient elasticity. This can be reflected in two aspects.
a. When expanding or shrinking a cluster, data needs to be heavily redistributed
b. There is no way to simply uninstall unused computing resources
The third issue is that coupled computing and storage resources also result in its limited availability. Due to these so-called stateful calculations, they can significantly affect performance in the event of failure or upgrade, leading to an overall unavailability of the service.
Finally, there is the issue of homogeneous resources and heterogeneous workloads. Because in the scenario of data warehousing, we have many heterogeneous workloads, such as batch loading, querying, large-scale calculation and analysis of reports, and so on. But the resources of the Shared Nothing architecture are isomorphic, so this leads to a collision between the two.
Shared - Data Architecture
Based on these issues, Snowflake proposed an architecture called Multi Cluster Shared Data. We have made a simple adjustment to the official image here.
The first advantage of this architecture is that it has no data silos and is a unified storage. This also enables decoupling of storage from calculations.
The second advantage is to accommodate structured and unstructured data based on current object storage.
Thirdly, its cluster size can act elastically.
Fourthly, the above features also bring the low-cost advantage of on-demand computing.
Next, we will review this architecture in a hierarchical form. Overall, its structure can be roughly divided into three levels.
The bottom layer is the object storage provided by cloud vendors, that is, user storage.
The middle layer is a multi-purpose and multi-purpose computing cluster.
The management service of the data lake, which stores a large SaaS platform, plays a role in the management of the entire underlying storage and computing cluster.
Continuous high availability of Shared - Data
The next point is the high availability of this architecture. This can be simply decomposed into two aspects. The first is fault tolerance, and the second is online upgrade.
Firstly, as a SaaS based application, its fault tolerance needs to be reflected in the overall architecture. Here we will also review in layers.
The lowest level storage layer utilizes the object storage capabilities of cloud vendors, which is a mechanism for cross center replication and near infinite expansion, so users basically do not need to care.
Up there is a diverse computing cluster. Each computing cluster is located within the same data center to ensure its network transmission performance. Here is a question mentioned, it is possible that a certain computing cluster may have node failures. If a node fails in a query, these computing nodes will return this status to the service layer above. After accepting this failure, the service layer will pass the calculation back to the available nodes for a second query. So, in the architecture where Shared Data storage and computation are separated, nodes are almost stateless in computation. The failure of one node in this architecture is not a very big problem.
Furthermore, the service layer also utilizes the ability of object storage for metadata storage. So this service layer can basically be seen as a stateless service.
The top layer is a load balancer that can perform service redundancy and load balancing.
The second point is that online upgrading mainly utilizes two designs, but this is not a very novel approach. One is to map multiple aspects between the computing layer and the service layer, and then switch the grayscale. Here, it can be seen that there are multiple versions in the computing layer, and local caches are shared between these versions. The metadata management of the service layer is also shared in multiple aspects. This is actually a sub Shared Data within the architecture, which has the ability to upgrade and smooth grayscale for data sharing between multiple versions.
Next, my colleague (Wang Xianghu) will introduce these three frameworks and how they integrate and ultimately support the practice of integrating T3 lakes and warehouses. Before introducing the second topic, he will first introduce our main framework, Hudi and Kylin frameworks, and then introduce how the three of them are integrated in pairs. Finally, we will introduce how T3 constructs a lake warehouse integrated system.
2、 Introduction and Integration of Flink/Hudi/Kylin
Hudi
First, let's understand what Hudi is. Hudi was originally a data lake framework designed and developed by Uber engineers to meet their data analysis needs. It joined the Apache Incubator in January 2019 and successfully graduated in May 2020, becoming a top-level project for Apache. Hudi's name comes from the abbreviation Hadoop Upserts Delete and Increments. In other words, Hudi is a data lake framework that supports insert, update, delete, and incremental processing. In addition, it also supports transactional ACID incremental processing, storage management, and time management. Hudi is able to manage hundreds of petabytes of large-scale analytical datasets on the cloud, making it very convenient for all cloud services to be used out of the box. Moreover, it has been running stably within Uber for nearly 4 years.
The following figure shows Hudi's plug-in architecture. We can see that Hudi has relatively loose support in storage, data processing engines, table types, index types, query views, and query engines. That is to say, it is not bound to a certain component.
In terms of storage, Hudi can support HDFS, OSS, and S3.
In terms of data processing engines, Hudi supports Flink and Spark. Java and Python clients are already supported in the community. Hudi supports two types of tables, COW and MOR, which correspond to two scenarios: low latency queries and fast ingestion.
In terms of indexing, Hudi supports four types of indexing, including Bloom and HBase. The underlying layer uses Parquet and Avro to store data, and the community is also providing support for ORC format and SQL. We believe we will meet in the near future.
Hudi supports three types of queries: read optimized queries, incremental queries, and snapshot queries. In terms of query engines, there are Spark, Presto, Hive, and Impala, which are actually supported by some other components.
Below is a detailed introduction to storage modes and views.
The first is the Copy On Write mode, which corresponds to Hudi's COW table. It is a table that focuses on low latency data query scenarios, and the underlying layer uses Parquet data files to store data. It can support two query methods: snapshot query and incremental query. In terms of query engines, you can see that there are 5 engines above, which have varying degrees of support for snapshot queries, incremental queries, and read optimization views.
The Merge On Read table complements Copy On Write at different levels, as it focuses on fast data ingestion scenarios. Use Parquet files to store specific data, and use row Avro incremental files to store operation logs, similar to HBase WAL. It supports all three views of Hudi, and it can be seen that Hive, Spark SQL, Spark Datasource, Presto, and Impala support read optimized queries. Hive and Spark SQL only support snapshot queries. You can check the information supported by this component on the official website in the future.
In travel business, orders have the attribute of long tail payment. That is to say, after an order starts, its payment process may be delayed for a long time. In other words, it may only make the payment before the user's next trip (or it may take longer, or even never make the payment). This long tail attribute will result in an overly long business closed-loop window, which will prevent us from accurately predicting the timing of data updates. If there are multi-level updates, the link will be longer and the update cost will be very high.
The following diagram shows the frequent updates of cold data caused by our long tail update. On the left is the business library, and on the right are three schematic tables with dependencies. When there is data update in the business library, the data that needs to be updated on the right may have been archived to a device with relatively poor performance, increasing the cost of data update. And if this data update triggers a long chain cascade update, this slow I/O will be further amplified.
The reliability of data is also an inevitable issue in data ETL. Perhaps due to machine malfunctions or computational logic, the processing data may be distorted or completely incorrect, which can have a significant impact on operational decisions. In terms of digital latency, in traditional architectures based on Hive components, due to the lack of indexing mechanism in Hive, data updates often result in data partition rewriting and cannot be deleted in place. Secondly, small file issues can increase the burden of NameNode storage and queries, slow down processes, and to some extent increase data latency.
Kylin framework
Let's introduce this Kylin framework again. Compared to Hudi, everyone should be relatively familiar with Kylin. It is an open source distributed analytical data warehouse that can provide data query windows on Hadoop/Park SQL. Initially opened by eBay and contributed to the open source community, it was able to query huge tables in sub seconds. The secret of it is actually to perform pre computation, budget multiple dimensional combinations of metrics for a star topology data cube, write the results out to the output table, and expose the query interface to achieve real-time queries, which means exchanging space for access time.
Kylin released version 4.0 alpha in September of this year, which is a major architecture upgrade after Kylin 3. Using Parquet instead of Hbase storage improves file scanning performance and reduces or even eliminates the maintenance burden of Hbase. Kylin4 reimagines the Spark build engine and query engine, separating computing and storage, and making it more suitable for cloud native technology trends.
Fusion between Flink/Hudi/Kylin frameworks
With the release of Kylin 3.1, the integration of Kylin and Flink has been completed. This feature was completed in 2019, and the integration of Kylin and Flink began in January last year, implemented through Flink Batch. Regarding Hudi fusion, it can be said that Kylin and Hudi are inherently compatible because Hudi can expose itself as a Hive table, and users can use Hudi's data like reading Hive, which will be very friendly to Kylin. Because Kylin can seamlessly use data using Hudi as a Hive table. The integration of Hudi and Flink is my main contribution to the community this year. These two screenshots correspond to the two milestone PR's on the integration of Hudi and Flink.
The first Hudi client supports multiple engines, decoupling Hudi from Spark, making it possible for Hudi to support multiple engines.
The second is to contribute the basic implementation of the Flink client to the community, allowing Hudi to truly write to the Flink data table. These two changes are very significant, and together they have exceeded 10000 lines of code, making them a prominent feature of the Hudi community this year.
The fusion process of Hudi and Flink
Below is a detailed introduction to the fusion process of Hudi and Flink. Hudi originally only supported Spark engines, so the first step was to decouple Hudi from Spark before integrating the engine we wanted.
The difficulty of decoupling is that Hudi did not initially consider the support of multiple engines, so RDD is ubiquitous from reading data from the data source to ultimately writing it out to the Hudi table. Even ordinary tool classes use RDD as the basic operating unit. Decoupling from Spark, we evaluated that his changes were very significant. Secondly, there are differences in core abstractions between Flink and Spark. Spark believes that data is a limited dataset, while Flink believes that data is unbounded and a data stream. This difference in abstraction makes it difficult for us to unify a universal abstraction.
This change is a bone breaking experience for Hudi, so we have decided to prioritize ensuring the functionality and performance of the original Hudi, while sacrificing some of the Flink Stream API. Let Flink operate the list, and use Spark to operate RDD. This way, a generic can be extracted to form a unified abstraction layer.
Abstract principle:
Unified use of generics I, K, O instead.
To de Spark, the abstract layer API is engine independent and difficult to implement in the abstract layer. We will change it to an abstract method and push it down to Spark subclass implementation.
Do not affect the original version, minimize changes to the abstract layer as much as possible to ensure fixed functionality.
Introducing HoodieEngineContext instead of JavaSparkContext to provide runtime context.
Next, let's talk about Flink Client DAG, which is mainly divided into 5 parts
The first part is the Kafka Streaming Source, which is mainly used to receive Kafka data and convert it into a List.
The second one is the InstantGeneratorOperator, a Flink operator used to generate globally unique instants.
The third is the KeyBy partition operation, which avoids conflicts caused by multiple subtasks writing data to the same partition based on the partitionPath partition.
The fourth one is the WriteProcessOperator, which is also a custom operator for us. This operator is where the write operation actually occurs.
The fifth one is CommitSink, which will accept the data sent by the upstream WriteProcessOperator and determine whether to commit the transaction based on the upstream data.
The following is a code example of Flink updates. On the left is a simplified version of HoodieWriteClient from the original version,
You can see that the input parameter of the insert function is RDD, and the return value is also RDD. After the abstraction on the right, you can see that its input parameter has changed to generic I and its return value has changed to O. If you are interested, you can go and learn more.
Here is another idea we have on how Flink can be integrated, which is to create a streaming source and use Flink to build a complete ETL pipeline that reads data from the Hudi table and then writes it out to the Hudi table.
Then is our preliminary idea. The gray image on the left contains 5 columns of Hudi metadata. On the far left is Hoodie_ commit_ Time transaction list. Every hoodie_ commit_ Time corresponds to a transaction, and each transaction corresponds to a batch of data. Each record in each batch of data will have a submitted serial number, which is the second column of hoodie_ commit_ Seqno serial number. hoodie_ commit_ Time and Hoodie_ commit_ The mapping relationship of seqno is very similar to the mapping relationship between partitions and offsets in Kafka. In the future, we may implement a Hoodie Streaming Source based on this feature.
Based on the fusion relationship between these three frameworks, we found that the three engines used for computation, analysis, and storage are mutually compatible. And they can support the integration of lakes and warehouses, moving closer to the cloud native system.
3、 Practice of integrating T3 travel structure with lake and warehouse
Finally, let's take a look at how T3 travel constructs a lake warehouse integration. This is the architecture of our T3 car networking system, which continuously empowers from the bottom up, from basic support to the upper level, and interacts with the information systems of car companies and national information platforms. As a transportation company driven by the Internet of Vehicles, we have collected data related to people, cars, roads, and other aspects. Each type of data has its own application scenario, and the data is not isolated and mutually empowering, jointly supporting T3 smart transportation.
This is our database architecture that separates storage and computing. The entire architecture is divided into two layers: the computing layer and the storage layer.
In the computing layer, we used Flink, Spark, Kylin, and Presto, along with ES for task scheduling. Da Vinci and Zeppelin were used in data analysis and presentation.
In the storage layer, we used Alibaba Cloud OSS and paired it with HDFS for data storage. In terms of data format, Hudi is used as the main storage format, combined with Parquet, ORC, and Json files. Before computing and storing, we added an Alluxio to accelerate and improve data processing performance. I used Yarn in resource management and will also switch to K8s when the timing is ripe in the later stages.
In the current trend of separating storage and computing, we have also built a complete big data ecosystem around lake storage, including lake accelerated lake computing, OLAP analysis, interactive queries, visualization, and more.
T3 Application Scenario for Hudi
Below are several application scenarios for Hudi within T3.
One is a near real-time streaming data pipeline. We can import the business data into the data pipeline from the left through Log, MySQL, or Kafka, and then use Flink or the original DeltaStreamer to input the streaming data into the list.
On the Flink UI interface for near real-time streaming data processing, it can be seen that several DAG operators introduced earlier are included, such as source and instant_ Generator, etc.
Another scenario is near real-time data analysis. We use Hive, Spark, or Presto to query data and ultimately use Leonardo da Vinci or Zeppelin to create the final data report.
This is the incremental data pipeline we built using Hudi. After capturing the leftmost CDC data, it needs to be updated to the following series of tables. After having Hudi, because Hudi supports indexing and incremental data processing, we only need to update the data that needs to be updated, and there is no need to update the entire partition or table as before.
The final scenario is to use Flink to subscribe online or business data to an ETL in a Hudi table for machine learning. However, machine learning requires a data foundation, so we use Hudi to incrementally publish online data to offline environments for model training or parameter tuning. Afterwards, we will publish the model online to provide services for our business.
Related Articles
-
A detailed explanation of Hadoop core architecture HDFS
Knowledge Base Team
-
What Does IOT Mean
Knowledge Base Team
-
6 Optional Technologies for Data Storage
Knowledge Base Team
-
What Is Blockchain Technology
Knowledge Base Team
Explore More Special Offers
-
Short Message Service(SMS) & Mail Service
50,000 email package starts as low as USD 1.99, 120 short messages start at only USD 1.00