Building data warehouse system based on Delta Lake
The concepts of data lake, data warehouse and data lake warehouse have been introduced in many articles and sharing. I believe you all know something about them, but I will not repeat them here. Let's directly look at the key features of data lake warehouse Lakehouse proposed by Databricks.
• ACID transactions. A table can be read and written by multiple workflows, and transactions can ensure the correctness of data.
• Schema Enforcement and Data Management. Schema Enforcement can also be called Schema Validation. When writing data, check whether the schema of the data can be accepted by the table to ensure the data quality. At the same time, we will also perform some management, operation and maintenance operations on tables.
• BI support. The data stored in the warehouse can be directly connected to the BI system for data analysis.
• Support structured, semi-structured and unstructured data. The data warehouse provides unified and centralized storage, and can support all types of data.
• Openness. Open and open source storage formats, such as Parquet and ORC, are used as the underlying storage formats.
• Support multiple APIs. In addition to SQL, it can also support APIs such as dataframe or machine learning to solve scenarios that SQL cannot implement.
• Integrated batch flow. Streaming and offline data ETL links are simplified, while the existing management and operation and maintenance costs are reduced.
• Separation of storage and accounting. Every company and team is concerned about cost. The separation of storage and computing, and on-demand scaling can better achieve cost control.
As mentioned above, we can find that most of the features of the lake warehouse are carried and supported by the lake format, which is the main background and reason for the rise of Delta Lake, Iceberg and Hudi.
Let's take a look at the functional iteration and development history of Delta Lake.
As shown in the figure, the upper part shows the development of the community in recent years, and the lower part shows some progress of EMR on Delta Lake. Let's introduce several key points. First, Databricks will use version 0.2 as the first release version in June 2019. In 2020, version 0.6 and 0.7 will be the last version of Spark2 and the first version of Spark3, respectively. Since version 0.7, Databricks will support DML SQL syntax. This year (2022), version 1.2 and version 2.0 just released released some significant new features.
Alibaba Cloud EMR has followed up Delta Lake early. We have implemented some key features since 2019, including the coverage of common SQL, Z-Order and data skipping capabilities. At the same time, we have gradually solved the problem of Metastore synchronization and realized seamless access to other products. Time Travel is also a feature that we supported earlier on Spark2. At present, the community only supports Time Travel after Spark3.3. EMR also provides automatic vacuum and automatic compaction capabilities. In terms of data warehouse scenario support, EMR proposed the G-SCD scheme. With the time travel capability of Delta Lake, it implemented the SCD (Slow Changing Dimension) Type2 scenario on the basis of retaining the original table structure. At the same time, EMR also supported Delta Lake CDC, enabling Delta Table to be used as the source of CDC to achieve incremental data warehouse.
Let's introduce some key features of Delta Lake 2.0 that you are interested in.
• Change Data Feed
• Z-Order clustering
• Idempotent Writes
• Drop Column
• Dynamic Partition Overwrite
• Multi-Part Checkpoint
Change Data Feed and Z-Order will be highlighted later.
This article focuses on Drop Column, which can be combined with Rename Column, a feature released in version 1.2. This type of schema evolution depends on the ability of Column Mapping. Let's think about it by comparing Add Column and Change Column. After the data is written to the Delta table, Delta saves the schema information. Similarly, the Parquet layer will retain the same schema information. The two are identical, and both are identified and stored by field names. Under such implementation, a simple drop column can still be implemented, but what if the drop is followed by an add column with the same name? This requires us to make a mapping relationship between Delta Schema and Parquet Schema, mapping each field to a globally unique identifier, and Parquet stores these unique identification information. With this implementation, when we perform an operation of renaming a column, we can change the mapping configuration.
Dynamic Partition Overwrite is just a syntax that has never been supported by the community, and we all know it better than we can explain it. Multi Part Checkpoint is a feature used to improve the efficiency of metadata loading. We will further discuss what the checkpoint is.
Analysis and key technologies of Delta Lake kernel
1. Delta Lake file layout
The metadata of Delta Lake is managed by itself and does not depend on external metadata storage such as Hive Metastore. The introduction text in the figure is divided into two parts: green text and orange text. The orange text on the lower part identifies ordinary data or directory files, which are no different from ordinary tables. They are also managed in a partition structure. The difference lies in the metadata part above. There are three types of files in this part. The first type is the json file, which records the information generated after each commit. Each commit will generate a new json file; The second type is checkpoint Parquet, which is merged from the previous checkpoint file and the subsequent json file, is used to accelerate metadata parsing; The third category is_ last_ The checkpoint file stores the version number of the last checkpoint to quickly locate the checkpoint file to be read. It can be seen that the first two types of files are the core of metadata.
Next, let's take a closer look at the composition of Delta Lake metadata.
2. Delta Lake metadata -- element
First, let's introduce the basic concepts. A table is usually composed of two parts, one is data, and the other is metadata. Metadata is usually stored in Hive Metastore, and the data is stored on the file system. The Delta Lake table is the same. Its difference from ordinary tables is that its metadata is managed by itself and stored in the directory of its own file system together with the data; In addition, the data files stored in the table path are not all valid. We need to mark which data files are valid and which are invalid through metadata. All metadata operations of Delta Lake are abstracted into corresponding Action operations, that is, metadata of all tables are implemented by Action subclasses. Let's look at the current actions:
• Metadata: save the schema, partition column, table configuration and other information of the table.
• AddFile: valid data file newly added to commit.
• RemoveFile: delete the file marked as invalid in commit.
• AddCDCFile: the newly added CDC file in commit.
• Protocol: Delta read-write protocol, which is used to manage the compatibility of different versions of Delta.
• CommitInfo: Records the statistics of commit operations and does some simple auditing work.
• SetTransaction: stores Streaming Sink information.
3. DDL/DML Organization
After knowing the elements of Action, we need to know which Action sets will be corresponding to different operations. Let's use several examples in the figure to illustrate. First of all, we can see that all operations in the table will generate CommitInfo, which is more used for auditing than for actual purposes.
Next, let's look at the specific operation:
• Create Table。 Since only a table is defined, only metadata is used to save the metadata information of the table.
• CTAS（Create Table As Select）。 Data will be loaded at the same time as the table is created, so there will be both Metadata and AddFile Action.
• Alter Table。 Except for Drop Partition, other Alter Table operations only modify metadata, so here you only need to modify metadata.
• Insert/Update/Delete/Merge。 Metadata will not be modified without DML involving Schema Evolution, so there will be no Metadata. Taking Update as an example, the Delta Lake will first read the files that may be involved in the where condition in the Update statement, load the data, then use the set part of the Update statement to modify the parts that need to be modified, and then rewrite them to a new file together with the parts that do not need to be modified in the original file. This means that we will mark the read file as an old file, that is, RemoveFile, and we will use AddFile to identify the new write file.
4. Metadata loading
Next, let's see how to build a table snapshot based on the Action element.
First try to find_ last_ The checkpoint file. If it does not exist, read the latest json metadata file from the 0 # commit json file; If it exists, the last will be obtained_ The version number of the checkpoint record. Find the checkpoint file corresponding to this version number and the commit json file of its later versions, and parse the metadata file in turn according to the version. Get the metadata of the latest snapshot through the six rules in the figure. Finally, we will get the latest Protocol, Metadata and a set of valid AddFiles. As long as we have these three, we will know the metadata and data files of this table, thus forming a relatively complete snapshot.
5. Delta Lake transaction
The ACID transaction is an important feature of Hucang. For Delta Lake, its ACID transaction is guaranteed by successfully submitting the json file to the file system to mark the successful execution of the commit. That is, for multiple concurrent write streams, the stream that can first submit a version of the json file to the file system is the stream that has successfully submitted. If you know something about storage, you can see that the core of Delta Lake transaction depends on whether the file system where the data is located has atomicity and persistence. Let's explain this:
• Once a file is written, it must be completely visible or invisible, and there will be no incomplete data file being read and written.
• Only one writer can create or rename a file at the same time.
• Once a file is written, subsequent List operations must be visible.
For the concurrency control protocol, Delta Lake uses OCC, and the specific principle of the protocol is not expanded.
For conflict detection, Delta Lake supports simultaneous writing of multiple streams, which leads to the possibility of conflict. Let's illustrate with an example. Assuming that the user has read the file with version number 10 and wants to submit the changed data to version 11, it is found that other users have submitted version 11. At this time, it is necessary to detect whether version 11 conflicts with the version information submitted by the user. The way to detect conflicts is to determine whether the same file set is operated between two submissions. If not, users will try to submit as version 12. If version 12 is also submitted in this process, then continue to detect. If there is a conflict, an error will be reported directly to judge that the current write operation failed, rather than forcing a write to cause dirty data.
Z-Order is a technology that people pay more attention to at present. It is an earlier concept, that is, a spatial index curve, which is continuous and without crossing, and can make points more clustered in spatial location. Its core capability is to realize the mapping relationship from multi dimension to single dimension.
Let's illustrate with an example. As shown in the figure, there are two columns, X and Y ∈ [0, 7]. The figure shows the sorting method of Z-Order, which divides the data into 16 files. For traditional sorting, if we sort X first and then Y linearly, we will find that the elements closer to X will be divided into a file. For example, if X is 0, Y ∈ [0, 3], four vertical elements will be stored in one file, so we can also generate 16 files. At this time, if we want to query 4<=Y<=5, we need to scan all 8 files in the lower half (the four vertical elements are one file, Y ∈ [4,7]). If we use Z-Order to sort, we can see that we only need to scan four files.
Let's take another example. If we want to query 2<=X<=3 and 4<=Y<=5, if we sort by Z-Order, we only need to scan one file. According to the traditional linear sorting method, we need to scan two files (X ∈ [2,3], Y ∈ [4,7]). We can see that the amount of data to be scanned has been reduced by half after using Z-Order. That is to say, under the same computing resources, our query time can be reduced by half, doubling the performance. From the above examples, we can see that linear sorting is more concerned with the aggregation effect of the currently sorted fields than the aggregation effect of the space.
Z-Order only makes a file layout for us. We need to combine data skipping to make it work. The two do not interfere with each other in terms of function. They do not have any functional coupling, but they must support each other. We can imagine that if there is no Z-Order file layout with good aggregation effect, data skimping alone cannot achieve good file filtering effect. Similarly, only Z-Order does not have data skimping, and its simple file layout cannot play any role in reading acceleration. The specific usage process is: when writing, complete the Z-Order arrangement of data, write it to the file system, extract the min max value of the file's corresponding field with the file granularity, and write it to the metadata stats of AddFile as shown in the figure. During the query, use the min max value to filter, select the files that meet the query conditions and need to be loaded, and then filter the data to reduce the reading of files and data.
One point to pay attention to here is that if the query mode changes, for example, the original Z-Order is based on two fields, a and b, but after a period of time, the main query is field c, or the file has been written many times, its aggregation effect will degrade. In this case, we need to re execute Z-Order regularly to ensure the aggregation effect.
Delta Lake ecological construction
We mentioned some basic concepts of Delta Lake above. You can also see that based on the current big data architecture, we can't build an overall big data ecosystem through a single system. Next, let's learn how the current ecosystem of Delta Lake helps us build a big data system.
First, let's look at the ecology of open source. For big data components, we can roughly divide them into storage, computing and metadata management. The de facto standard of metadata management is Hive Metastore. The storage mainly includes HDFS and object storage of cloud manufacturers. Various computing engines have corresponding storage interfaces. For queries, due to the different framework semantics or APIs of each engine, each lake format needs one-to-one docking support with the query/calculation engine.
Let's introduce the current open source ecosystem with several typical engines.
Delta Lake itself is open source by Databricks, so their support for Spark is good from the implementation of underlying code to performance, but some SQL functions are not fully open or supported for open source versions. The Delta Lake version of Alibaba Cloud EMR currently covers common SQL.
For Hive, Presto and Trino, the community has implemented the query function, but the write ability is not supported at present. The implementation of the above three engine interfaces is based on the Delta standalone project, which abstracts a standalone function to interface with the read and write functions of non Spark computing and query engines.
Here are some points that the community has not yet supported well:
• After creating a table through Spark, you cannot directly use Hive and other engines to query. You need to manually create a surface on the Hive side to query. The reason is that Hive queries Delta tables need to be implemented through InputFormat. When the Delta tables created on the Spark side synchronize metadata to Hive Metastore, they do not get the correct information (other table types, such as Parquet and ORC, are hard coded into HiveServe classes in the Spark source code), so they cannot achieve the correct metadata synchronization. I understand that the main reason for this is that Spark did not take these scenarios into account to achieve a better expansion capability. At the same time, the Delta Lake community did not want to embed the logic related to synchronization metadata into its code implementation.
• The partition field cannot be specified when creating a delta surface in Hive. Even though the delta itself is a partition table, it is also regarded as a common table by the Hive engine. It is mentioned here that such a design will not cause performance differences. Delta Standalone still performs partition tailoring according to query conditions.
Alibaba Cloud EMR has provided better support for the above two points: using Spark to create tables will automatically synchronize metadata to metastore, and then directly query through Hive, Presto, and Trino without any additional operations. At the same time, we support the correct display of the table partitioning feature in Hive Metastore to avoid confusion when users use it.
In addition, querying Delta tables on query engines based on standalone modules such as Hive may cause metadata loading efficiency problems. For example, Hive query, the metadata loading of the delta table is completed locally in Hive CLI. When metadata is large, it will consume a lot of memory and time. On EMR, we have realized the ability of emr manifest metadata acceleration. At each write, we write the AddFile information associated with the latest snapshot to the file system in advance. When querying, we skip metadata loading to solve the metadata acceleration problem in this scenario.
At the same time, we support TimeTravel query and data skipping optimization on Presto/Rino.
Finally, for the writing of Flink, Delta released the function of Flink sink in the community since version 0.4, and released the function of Flink source in version 0.5.
Next, let's introduce Alibaba Cloud's ecological support for Delta Lake. At present, we have implemented Dataworks, MaxCompute and Hologres to query the Delta table; Connect and support the use of Alibaba Cloud data lake to build DLF as metadata to help achieve better integration of lake and warehouse. At the same time, we have also connected the lake table automatic management module of DLF, which we will introduce.
In the lake format, we have introduced the concept of version and the function of batch streaming, which will cause some historical versions of data to become invalid in the current snapshot, or generate some small files in the streaming scene. In addition, the Z-Order effect we just mentioned will deteriorate over time. These problems require us to manage the lake table, such as regular cleaning of historical files, Re execute Z-Order and perform some file merging operations. In DLF, we have implemented an automatic lake table management module, which can sense the table version update in real time, analyze the table status in real time (such as the percentage of valid files, average file size and other indicators), and take corresponding actions in combination with the predefined policies in the policy center, so as to transparently help users manage tables. At the same time, we have also expanded the management of the life cycle of the lake surface. If we use some old partitions less frequently, we can compress them or move them to low-cost storage. At the same time, the data profiling module of DLF will also make real-time statistics on various dimensions at the table level or partition level, and update them to the indicator library for further query acceleration or lake table management.
Delta Lake Classic Digital Warehouse Case
Finally, let's take a look at the classic data warehouse case of Delta Lake.
Slowly Changing Dimension (SCD) is used to solve the dimensional data that changes slowly over time in the digital warehouse scenario. Different SCD types are defined according to the processing methods of new values after changes. Here we focus on Type 2: the type of saving historical values by adding a new line of records. In traditional databases, we usually first add Start and End columns to the table to identify the effective range of the current dimension value. If the End value is empty, it means the current dimension is effective in the latest version. We can also add a status column to indicate whether the current dimension value is effective. More often than not, we don't pay attention to every change, but only to what is the latest value in a fixed business cycle or a period of time. For example, we make a dimension table of users and their locations. Suppose user A moves from Beijing to Hangzhou and Wuhan, and user A will have different addresses at different times in the table. We want to know the location of User A on July 16, 2022, that is, its final location in Wuhan, rather than focusing on the process of User A moving from Beijing to Hangzhou in the morning and from Hangzhou to Wuhan at noon.
The traditional scheme of SCD Type 2 is roughly as follows: continuously obtain incremental data through real-time streams and write it to the incremental table. When all T+1 data is processed, we will merge it with the T partition of the offline table to generate the T+1 partition of the offline table. In the process of use, we only need to specify a fixed granularity (such as days) to query relevant data through partition fields based on offline tables. The disadvantage here is that the offline table's T and T+1 data are highly redundant, which results in obvious storage waste. At the same time, the offline and de facto workflows also increase the cost of management and operation.
Let's see how Delta Lake solves the above problems. Just mentioned that we are more concerned about the latest value in a fixed period of time, so we named it G-SCD - the slow change dimension based on a fixed granularity. The lake format such as Delta Lake has the concept of multiple versions, so you can use Time Travel's ability to query the data of a historical snapshot, while ensuring the query performance and data non repeated storage. EMR G-SCD uses the above features to build. Let's look at the specific solution:
First, MySQL will synchronize the binlog to Kafka, and then Spark Streaming will consume it. Finally, we will submit the data to Delta Lake.
The whole process looks no different from ordinary streaming writing, but the key is:
① Finally, submit the data and business snapshot information together.
② Spark Streaming will split the batch data according to the business snapshot to ensure that each submission contains only the data in one business snapshot, and will make the processed snapshot a save point to permanently retain a version.
There are two core problems to be solved in the implementation of G-SCD:
① Mapping between business snapshots and Delta versions. As shown in the figure, each commit is associated with a specific business snapshot (the data submitted by the Delta version V7 and V8 are business snapshot T), and the business snapshot is required to increase with the Delta version (from T-1, to T, to T+1 business snapshot). In this way, the snapshot of a certain business, such as July 15 data, can be mapped into a specific version of Time Travel for query.
② Savepoint&Rollback。 For traditional solutions, partitions will not be lost as long as they are not deleted actively, while lake tables have the ability to automatically clean up historical versions. Under the G-SCD scheme, we do not need to retain all versions, but want to specify that a version can be retained without being deleted, so we need the save point function here. Another point is that the data is inevitably wrong. We also need the version backtracking function, which can backtrack to the data of a certain day to repair the data. The rollback function here is equivalent to the restore function released by the community version 2.0.
Students familiar with streaming data processing can find that there is a problem of data drift. The reason for this phenomenon is that the data from the previous snapshot does not arrive until the next snapshot cycle. What should we do in this case? It has been mentioned that G-SCD requires that business snapshots be incremental on the Delta version. At the same time, the scheme also requires that the partitions of upstream Kafka are strictly ordered according to the business snapshot, and the data of the same ID can only fall into the same partition, so that the data of a certain primary key will never be out of order. Later, on the streaming level, we will judge whether each batch belongs to the same business snapshot. If yes, we will submit it directly. If not, we will only submit the data with a smaller business snapshot cycle, and cache the other part of the data first. For the caching mechanism, we will temporarily store the next snapshot data that appears for the first time, and first process the previous snapshot data due to reasonable data drift. After a certain period of time, we will submit this part of data when we think there will be no drift data. This segmentation can ensure that each commit on the Delta side will only correspond to the data of a business snapshot.
Next, let's look at the advantages of G-SCD:
• Integrated batch flow, reducing management costs
• Fully save storage resources
• Make full use of Spark/Delta's query optimization
• It is not necessary to add multiple auxiliary fields like other SCD Type2 implementations; At the same time, the traditional scheme uses dt as the partition mode, so that the original SQL can be reused, making the user migration cost free.
This solution has been widely used in production by Alibaba Cloud customers.
Change Data Capture (CDC). Finally, let's talk about the CDC scenario, which involves the very important CDF features released by Delta 2.0. CDC is a scene used to capture and identify changes in data, and submit the changed data to downstream for further processing. CDF enables tables or databases to spit out changed data. The output results of the CDF can identify what changes have been made to the data, such as insert, update or delete, and let us know the content of the data before and after the change. The CDF also includes the time point and version number information of the change of the version data. To open CDF in Delta Lake, just select delta EnableChangeDateFeed is set to true.
Before CDF, we can only update the incremental data to the ODS layer in the form of Binlog from MySQL, but when it comes to downstream DWDs, we can only update the data to the DWS layer in an inefficient and full way. When we have the ability to CDF, we can use the lake format as a source of CDC to realize the incremental real-time data warehouse of the whole link from ODS to DWD and DWS. Let's take a look at a specific case.
As shown in the figure, we have defined a data source and three tables. user_ Dim is a dimension table, user_ city_ Tbl indicates the user's location, city_ population_ tbl is used to count the permanent population of the city. user_ city_ The source source and user are required to update the tbl table_ The DIM table is written after the join. city_ population_ The tbl table is generated by aggregating the city field. Now let's turn on CDF for both tables to see what data will be generated. For example, currently there are two pieces of data coming from the upstream, user1 from Hangzhou and user5 from Wuhan. The data is loaded to user through the Merge statement_ city_ In tbl, as shown in the figure, user1 already exists, so the address information will be updated, and user5 is a new user, so data will be inserted. The update operation will be represented by two pieces of data, one is pre_ Update, indicating the old value before the update, one is post_ Update indicates the new value after update. For newly inserted data, we only need one piece of data to represent the insertion operation, and there is no old value. For the deletion operation, the current value of the CDC represents an old value and there is no new value.
It can be seen that the output format here adopts a format different from the MySQL binlog or debezium format that everyone is familiar with. By comparison, the CDF implementation scheme is more friendly for downstream data processing, and it also contains all the information we need without too much conversion. If we use binlog or Debezium, we also need to extract the column information we need from the json string.
Use user_ city_ The change data of tbl affects the downstream city_ population_ The tbl is updated incrementally to finally realize the city_ population_ In the tbl table, the population of bj city is reduced by one, and the urban population of hz and wh is increased by one. From here, we can also see that the output data of CDC needs to include the details of the old records of the update or delete data. Otherwise, it is impossible to incrementally update the population of bj city and accurately implement the data aggregation operation.
The process continues. If city_ population_ The tbl table also needs to be used as the CDC source. The CDC output information after the CDF is enabled is shown in the following figure on the right.
Finally, let's look at the design and implementation of CDC through Delta Lake.
Delta Lake implements CDC through the CDF scheme. Its idea is to persist CDC data in necessary scenarios, reuse existing data files as much as possible, and fully balance the read and write ends.
Unlike some traditional databases, which have their own resident services, they can directly generate relevant data in the background without affecting the writing efficiency. Delta Lake is only used as the data organization mode of the data storage layer. The execution of data reading and writing still depends on the computing engine itself, such as Flink or Spark. All the additional costs need to be completed in the current commit, which will affect the writing efficiency.
The reason why the change data is not calculated in real time in a way that depends entirely on queries, such as Diff or Join between versions, is, of course, the query performance. Here, a scenario is used to clarify a point that may be ignored by the CDC, that is, the CDC needs to perceive the change between each adjacent commit, not just the change of the first and last commit in the query direction. The lake format CDC is based on a single commit. That is, if there is a piece of data, the first commit changes from 1 to 2, and the second commit changes from 2 to 3, then the CDC data of the two commits should be from 1 to 2 to 3, rather than directly from 1 to 3. Some actual production scenarios of CDC require this capability.
In the design scheme, Delta Lake provides the ability to persist CDC only when it is not possible to simply obtain complete data changes through the current commit information. Here, the complete CDC contains the previous value and new value, including all operations, timestamp and version number information. This means that CDC data can be read and loaded directly without reading historical snapshot data.
After understanding the above CDF design features, we will find that some scenarios need to persist CDC, while others do not. Let's talk about the scenario where you don't need to persist CDC, that is, which operations can directly return CDC data through the current commit information. Here are two examples. The first one is Insert into. The newly added AddFile in the Insert into syntax will not have any impact on other data. The metadata in the commit json file is only AddFile, so we can directly load the data of these AddFile files, add the insert operation ID to each record, add timestamp and version information, and convert them to the CDC format to return. The second example is Drop Partition, which is not supported in the community but in Alibaba Cloud EMR. It will identify all the valid data under a partition as RemoveFile. When we read the commit json file, we get a list of files with only RemoveFile. Then we can load the data file identified by RemoveFile. For each piece of data, add the delete operation ID and timestamp and version information. For operations like this, the implementation scheme of CDF does not add any write overhead. It directly reuses the existing data to complete the loading transformation and returns the CDC data.
Let's take a look at what needs persistent CDC. For example, in the Update operation, some data in a data file needs to be updated and written to a new data file together with the non updated part. In such a scenario, you need to directly convert the updated data into CDC format data to be output, and then save it to the file system. When querying, for such a commit, you can directly read the CDC file it contains, load and return the CDC data. The file for persisting CDC data is recorded through the AddCDCFile action that was not explained in detail just now.
Knowledge Base Team
Knowledge Base Team
Knowledge Base Team
Knowledge Base Team
Explore More Special Offers
50,000 email package starts as low as USD 1.99, 120 short messages start at only USD 1.00