Data Lake Secrets - Delta Lake
Introduction to DeltaLake
Delta Lake is an open source storage framework developed by DataBricks for building a lake warehouse architecture. It can support query/computing engines such as Spark, Flink, Hive, PrestoDB, Trino, etc. As an open-format storage layer, it provides a reliable, secure, and high-performance guarantee for the lake warehouse architecture while providing batch-stream integration.
Delta Lake key features:
1. ACID transaction: Through different levels of isolation strategies, Delta Lake supports concurrent reading and writing of multiple pipelines;
2. Data version management: Delta Lake manages and audits the version of data and metadata through Snapshot, etc., and then supports time-travel to query historical version data or back to historical versions;
3. Open source file format: Delta Lake stores data in parquet format to achieve high-performance compression and other features;
4. Batch and stream integration: Delta Lake supports batch and stream reading and writing of data;
5. Metadata evolution: Delta Lake allows users to merge schemas or rewrite schemas to adapt to changes in data structures in different periods;
6. Rich DML: Delta Lake supports Upsert, Delete and Merge to meet the needs of users in different scenarios, such as CDC scenarios;
A big difference between lake tables and ordinary Hive tables is that the metadata of lake tables is self-managed and stored in the file system. The following figure shows the file structure of the Delta Lake table.
The file structure of Delta Lake mainly consists of two parts:
• _delta_log directory: stores all metadata information for the deltalake table, where:
• Each operation on the table is called a commit, including data operations (Insert/Update/Delete/Merge) and metadata operations (adding new columns/modifying table configuration), each commit will generate a new log file in json format , record the actions (actions) generated by this commit on the table, such as adding files, deleting files, and updating metadata information, etc.;
• By default, every 10 commits will be automatically merged into a checkpoint file in parquet format, which is used to speed up the parsing of metadata and support regular cleaning of historical metadata files;
• Data directory/file: The files other than the _delta_log directory are the files that actually store the table data; note that:
• DeltaLake's data organization for partitioned tables is the same as that of ordinary Hive tables, and the partitioned fields and their corresponding values are part of the actual data path;
• Not all visible data files are valid; DeltaLake organizes tables in the form of snapshots, and the valid data files corresponding to the latest snopshot are managed in the _delta_log metadata;
Delta Lake manages multiple versions of a table through snapshots, and supports Time-Travel queries on historical versions. Whether you want to query the latest snapshot or the snapshot information of a historical version, you need to parse the metadata information of the corresponding snapshot first, which mainly involves:
• The current read-write version of DeltaLake (Protocol);
• Table field information and configuration information (Metadata);
• A list of valid data files; this is described by a set of AddFile and RemoveFile;
When loading a specific snopshot, in order to speed up the loading process, first try to find a checkpoint file that is less than or equal to the version, and then combine with the log files of the current version to parse to obtain the metadata information.
The Alibaba Cloud EMR team has been following the DeltaLake community since 2019 and implemented it in EMR's commercial products. During this period, DeltaLake has been continuously polished and upgraded in terms of iterative functions, optimized performance, integrated ecology, reduced ease of use, and scene implementation, so as to better integrate it into EMR products and facilitate customers' use.
The following table summarizes the main self-developed features of EMR DeltaLake compared to open source DeltaLake (Community 1.1.0).
DML syntax enhancements:
• time-travel SQL syntax for VERSION/Timestamp AS OF;
• show partitions, drop partition syntax;
• Dynamic partition overwrite syntax;
Metadata sync metastore:
• Metadata tables of various scenarios are more synchronized with DLF/Hive metastore;
Automated lake table management:
• Automatically merge small files with multiple strategies (auto-optimize);
• Support automatic cleaning of expired data files (auto-vacuum);
• Support to permanently save the specified version (savepoint);
• Support rollback to the execution version (rollback);
• Supports automatic adjustment of the average file size according to the actual size of the table;
Support min-max statistics and dataskipping;
Support dynamic partition pruning (DPP);
Support dynamic file cropping (Runtime Filter);
Support custom manifest to speed up Hive/Presto/Trino/Impala query;
Support Presto/Trino/Impala/Alibaba Cloud MaxCompute/Alibaba Cloud Hologres query;
Support Alibaba Cloud OSS and JindoData;
Deep integration with Alibaba Cloud DLF;
A solution to realize slowly changing dimension SCD Type2;
Implement a CDC solution to build a complete incremental lake warehouse architecture with DeltaLake;
DeltaLake1.x version only supports Spark3 and is bound to a specific Spark version, so some new features/optimizations cannot be used on older versions and Spark2, while EMR DeltaLake keeps Spark2's DeltaLake (0.6) and Spark3's DeltaLake (1.x) synchronization of functional characteristics;
Deep integration with DLF
DLF (Data Lake Formation) is a fully managed service that quickly helps users build cloud data lakes and LakeHouse, providing customers with unified metadata management, unified permissions and security, convenient data entry capabilities, and one-click access to data. It can seamlessly connect multiple computing engines, break data silos, and gain insight into business value.
EMR DeltaLake is deeply integrated with DLF, so that after the DeltaLake table is created and written, the metadata is automatically synchronized to the DLF metastore, avoiding the need for users to create a Hive table and associate the DeltaLake table like the open source version. After synchronization, users can query directly through Hive, Presto, Impala, and even Alibaba Cloud MaxCompute and Hologres without any additional operations.
Similarly, DLF has mature lake entry capabilities. Users can directly synchronize data from Mysql, RDS, and Kafka to generate DeltaLake tables through product-side configuration.
On the product side of DLF, the lake format DeltaLake is the first citizen. DLF will also provide easy-to-use visual display and lake table management capabilities in the next iteration to help users better maintain lake tables.
Slowly Changing Dimension (SCD), or slowly changing dimension, is considered to be one of the key ETL tasks for tracking dimensional changes. In a data warehouse scenario, a star schema is usually used to associate fact tables and dimension tables. If some of the dimensions in the dimension table are updated over time, how can current and historical dimension values be stored and managed? Whether to ignore it directly, or directly overwrite it, or other processing methods, such as permanently saving all dimension values in history. According to different processing methods, SCD defines a variety of types, among which SCD Type2 retains all historical values by adding new records.
In the actual production environment, we may not need to pay attention to all historical dimension values, but focus on the latest values in a fixed period of time, such as the granularity of days or hours, and pay attention to the value of a dimension in each day or hour . Therefore, the actual scene can be transformed into a slowly changing dimension (Based-Granularity Slowly Changing Dimension, G-SCD) based on a fixed granularity (or business snapshot).
In the implementation of traditional data warehouses based on Hive tables, there are several options. The following two solutions are listed:
• Stream build the incremental data table at time T+1, and merge it with the partition data at time T of the offline table to generate the T+1 partition of the offline table. where T represents granularity or business snapshot. It is not difficult to imagine that each partition of this scheme saves a full amount of data, which will cause a lot of waste of storage resources;
• Save the offline basic table, save the incremental data at each business moment independently, and combine the basic table and incremental table when querying data. This scheme will reduce query efficiency.
Through the upgrade of DeltaLake itself, combined with the adaptation of SparkSQL and Spark Streaming, we have realized the SCD Type2 scenario. The architecture is as follows:
It also connects to the upstream Kafka data, and divides the batch data according to the configured business snapshot granularity on the Streaming side, commits them separately, and attaches the value of the business snapshot. After DeltaLake receives the data, it saves the relationship between the current snapshot and the business snapshot. And when the next business snapshot arrives, savepoint the previous snapshot and keep the version permanently. When the user queries, the specific snapshot is identified through the specific value of the specified business snapshot, and then the query is implemented in a time-travel manner.
Advantages of G-SCD on DeltaLake solution:
• Stream-batch integration: no incremental table and base table are required;
• Storage resources: With the help of Delta Lake's own data versioning capability, the management of incremental change dimensions can be realized, and there is no need to retain full historical data according to time granularity;
• Query performance: With the help of Delta Lake's metadata checkpoint, data's Optimize, Zorder and DataSkipping capabilities, query efficiency is improved;
• Retain the original SQL statement: The user can still use similar partition fields to execute the snapshot data within the business time granularity to be queried in the same way as the way of implementing snapshots with partitions.
Under the current data warehouse architecture, we often stratify data into ODS, DWD, DWS, ADS, etc. to facilitate management. If the original data is stored in Mysql or RDS, we can consume its binlog data to implement incremental updates to the ODS table. But what about layer data from ODS to DWD, and from DWD to DWS? Since the Hive table itself does not have the ability to generate similar binlog data, we cannot achieve incremental updates of downstream links. The ability to generate binlog-like data is the key to building real-time incremental data warehouses.
Based on DeltaLake, Alibaba Cloud EMR implements the CDC capability of using it as a Streaming Source. After it is enabled, ChangeData will be generated and persisted for all data operations at the same time, so that it can be read by downstream Streaming; at the same time, SparkSQL syntax query is supported. As shown below:
The Delta table user_city_table of the ODS layer receives the source data, performs the Merge operation, and persists the changed data; the DWS layer reads the ChangeData data of the user_city_table table from the city_cnt_table table aggregated by city, and updates the cnt aggregated field.
As the main lake format promoted by EMR, DeltaLake has won the trust and choice of many customers, and has been implemented in their actual production environments to connect with various scenarios. In the future, we will continue to strengthen investment in DeltaLake, in-depth exploration and integration of DLF, enrich lake table operation and maintenance management capabilities, and reduce user entry costs; continue to optimize read and write performance, improve ecological construction with Alibaba Cloud's big data system, and promote customer lakes The construction of warehouse integrated structure.
Knowledge Base Team
Knowledge Base Team
Knowledge Base Team
Knowledge Base Team
Explore More Special Offers
50,000 email package starts as low as USD 1.99, 120 short messages start at only USD 1.00