Cloud Native Practice of MPP Architecture Data Warehouse
One. Introduction
Garner predicts that by 2022, 75% of all databases will be deployed or migrated to cloud platforms. Another authoritative organization, IDC, also predicts that by 2025, more than 50% of databases will be deployed on public clouds, and China will reach an astonishing 70% or more. After years of development, cloud databases have undergone a transformation from Cloud-Hosted (cloud hosting) to Cloud Native (cloud-native) models.
Cloud-Hosted: Based on market and industry cloud requirements, most vendors choose cloud hosting as the first step of evolution. This model will no longer require users to build their own IDC offline, but will rely on the cloud provider's standardized resources to migrate the data warehouse and provide a high degree of hosting, thereby liberating users from the management costs of the underlying hardware and the constraints of Lingplan resources. .
Cloud-Native: However, as more businesses are migrated to the cloud, underlying computing and storage resources are bound together, causing users to still need to consider unnecessary waste of resources during use. For example, an increase in computing resources will require storage association increase, resulting in ineffective costs. Users begin to expect that cloud resources can disassemble data warehouse resources in a more fine-grained manner, that is, decouple computing and storage capabilities and split them into salable units to meet business resource orchestration. At this point, the maximum value of cloud native is truly highlighted. We are no longer focusing on building a data warehouse with a balance of storage and calculation, but are oriented to user business, allowing large-scale computing or storage tilt, and independently deploying the resources required by the business. Deploy and sell according to the smallest unit. At this moment, we have truly entered the cloud-native era of data warehouses.
At the 2021 Yunqi Conference, Alibaba Cloud announced a data warehouse with a new cloud-native architecture [1]. This article introduces the evolution and exploration of the cloud-native data warehouse product AnalyticDB PostgreSQL (hereinafter referred to as ADB PG) from Cloud-Hosted to Cloud-Native, discusses the underlying design and thinking in order to realize real resource pooling and flexible sales, and covers the content of the product. Architecture design, key technologies, performance results, effect realization and follow-up plans. (The reading time of the full text is about 10 minutes)
Two: ADB PG cloud native architecture
In order to allow users to quickly adapt to the cloud data warehouse, we are currently adopting the design concept of the MPP architecture on the cloud, deploying the coordination node and computing node independently, but carrying them on a single ECS, realizing the integration of computing node storage and computing Deployment design, which is highly adaptable to the design architecture and self-built on the client side, can quickly and non-destructively migrate the data warehouse business to the cloud, which is very friendly to early cloud adaptation and meets the main requirement of parallel expansion of resources .
With the further evolution of cloud native, we provide a brand-new storage-computing separation architecture, which further splits the product into a service layer, computing layer, and shared storage layer. The architecture diagram is as follows:
Master coordination node: save global schema information and realize global transaction management;
Row storage engine: used to save metadata information, where metadata information mainly refers to the visibility information of shared storage files, including two parts:
One is the relationship between files and tables
The other is the delete bitmap of the deleted data
Based on line storage, we can inherit PG's local transaction capabilities, and are fully compatible with PG's transaction capabilities while adding, deleting, modifying and checking;
Local cache: By introducing DADI from the storage team to achieve high-performance local cache, the full name of DADI is Alibaba Cloud Data Accelerator for Disaggregated Infrastructure. Compared with open source products, the performance has an order of magnitude improvement;
Shared storage: We have borrowed some key designs from ClickHouse, and implemented a MergeTree-based row-column mixed storage at the storage layer. In addition, we have created a unified access interface based on the file interface for shared storage, and are highly compatible with both OSS and HDFS. Various forms of distributed file system;
When we are designing the architecture, compared with HAWQ, which is also derived from Greenplum, HAWQ saves metadata in the master. Every time we write, we bring the modified metadata to the master to update. When reading, we start with The master reads the required metadata, and then includes all the metadata in the execution plan, so that the segment can get the corresponding metadata, and the segment can be completely stateless.
But this design will bring 2 core problems:
Metadata inflation causes the master to become a bottleneck.
Limited by the performance of metadata, it cannot support high-concurrency real-time writing.
The reason why we don’t design this way is that we hope to support high-concurrency tasks in the future. ADB PG spent more than 2 years expanding Greenplum’s single-point master architecture to a multi-master. The core is to solve high-concurrency tasks. The problem of real-time writing, if the metadata is saved on the master, it will cause problems such as:
Metadata storage and access on the master can easily form a single point of bottleneck
It is necessary to do a lot of refactoring on the execution layer of ADB PG to bring metadata into the execution plan, which will drastically increase the bandwidth of the query plan itself, which is very unfriendly to small queries with high concurrency.
So we improved the architecture, distributed metadata to segments, avoided and realized:
The master's storage and reading and writing will not become a bottleneck
There is no need to refactor the execution layer, and the metadata is dispersed to reduce the bandwidth pressure of a single query.
Put the metadata on the segment on the distributed kv to solve the problem of metadata relocation for expansion and contraction.
The reason for using OSS for shared storage is that as the business data of a single user continues to grow, a sustainable storage solution is required, and OSS's low storage cost, high availability, and data persistence are the best choices.
Another advantage of using OSS is that you pay on demand. Users do not need to prefabricate the size of the storage space, how much data is stored, and how much to pay. After the data is deleted, there will be no charge. ESSD cloud disks usually need to calculate the storage water level based on the data, which cannot be done. The real on-demand supply of storage resources, and the inability to automatically shrink capacity, all of these violate our cloud-native design philosophy. But at the same time, the disadvantage of OSS is RT:
In order to solve the RT problem of OSS, we configure a certain proportion of local disks for computing nodes for access acceleration. In addition, we designed a high-performance row-column mixed storage, drawing on the core idea of ClickHouse mergetree storage, with order as the core, the files are absolutely ordered, and the files are relatively ordered. Through the asynchronous operation of merge, the file is realized. Merging and sorting, based on ordering, we designed 3 layers of statistical information in the file, and did a lot of IO clipping optimization.
Below we further introduce each technical point.
Three: key technologies
1. Elastic expansion
In order to achieve fast elastic scaling, our method is to organize data in hash buckets on shared storage. After scaling, the computing nodes and buckets are remapped through consistent hash. In order to solve the uniformity of bucket and segment allocation, and reduce the For the impact of cache failure after scaling, we have improved the traditional consistent hash algorithm to support dynamic mapping during scaling.
Divide the data into multiple shards according to the hash bucket, and remap the data on the object storage according to the sharding granularity. If the expansion of computing nodes exceeds the number of shards, the data can only be redistributed. To solve this problem, we support hash buckets that can be split and merged in the background to avoid redistribution of data.
The above is the remapping of "data" during expansion and contraction, and the metadata describing the visibility of data files is stored in the row table. We still use Greenplum's data redistribution strategy. The difference is that in order to speed up the metadata For redistribution, we have made optimizations such as parallel distribution.
Let's take capacity expansion as an example to further refine the process of capacity expansion:
Combined with technologies such as ECS resource pooling, network card parallel loading, and docker image preheating, the end-to-end time consumption within 16 nodes is close to 1 minute.
2. Hierarchical storage
The implementation of hierarchical storage is as follows:
As shown in the figure above, we divide storage resources into three layers, including memory, local disk, and shared storage.
Memory: It is mainly responsible for the acceleration of line storage access and the cache of file statistics;
Local disk: as persistent storage for row storage, and as a local accelerator for remote shared storage;
Remote shared storage: as a persistent storage of data.
3. Read and write process
The writing process is as follows:
The data written by the user is directly written to OSS through data batching, and a piece of metadata is recorded on the local disk at the same time. This piece of metadata records the correspondence between files and data tables. Metadata is implemented using PG's line storage table, and we save this information through the file metadata table.
When updating or deleting, we don’t need to directly modify the data on OSS. We do this by marking the deletion. The information marked for deletion is also stored in the local row storage table. We store this information through the visibility bitmap. Marking deletion will lead to a decrease in read performance. We use background merge to apply deletion information to files to reduce the impact of deletion on read performance.
When we write, we further divide the data on the segment according to the bucket, which will bring about the problem of small files. In order to solve the problem of small files, we have made the following optimizations:
Group flush: A batch of written data can be written to the same OSS file through group flush. Our OSS file adopts the ORC format, and different buckets are written to the corresponding strip;
The pipeline is asynchronous and parallel: coding and sorting are typical cpu-intensive tasks, and uploading to oss is a typical network IO-intensive task. We will parallel these two task types, and the task of uploading oss is executed as an asynchronous task. At the same time, the next batch of data is encoded and sorted to speed up the writing performance.
Because the remote persistent storage provides 12 nines of persistence, only the row storage that saves metadata has WAL logs and double copies to ensure reliability. The data itself is written to the shared storage without WAL logs and multiple copies. Due to the reduction of WAL logs and the master-slave synchronization of WAL logs, and through asynchronous parallelism and batch accumulation, in the batch writing scenario, our writing performance is basically equal to that of the ECS elastic storage version.
The reading process is as follows:
We get the OSS files to be scanned by reading the file metadata table.
Read the corresponding file according to the OSS file.
The read file filters out deleted data through the visibility bitmap of the metadata table.
In order to solve the delay caused by reading OSS, we also introduced DADI to help us implement cache management and encapsulate shared file access. When reading a file, it will first determine whether there is a local cache. If so, it will be read directly from the local disk. No It will go to OSS to read, and it will be cached locally after reading. When writing, it will directly write to OSS and write back to the local disk. Writing back is an asynchronous operation. We also manage the elimination of local cache data through DADI, which automatically eliminates cold data according to the LRU/LFU strategy.
Since the transaction is implemented using PG's row storage, it is fully compatible with ADB PG's transaction. The problem is that we need to redistribute this part of the data when expanding and shrinking. We redesigned the redistribution mechanism of this data , through pre-partitioning, parallel copy, point-to-point copy and other technologies, the expansion and contraction time is greatly shortened.
To summarize the performance optimization points:
Realize transaction ACID through local row storage table, support data block level concurrency;
Improve write throughput through Batch and pipeline parallelization;
Accelerated access to memory and local SSD multi-level cache based on DADI.
4. Visibility table
We store information related to shared storage files in File Metadata, and its structure is as follows:
Hash bucket: It is used to scan according to the bucket when relocating data during expansion and contraction. When querying, it is also a bucket followed by a bucket;
Level: It is the level of the merge tree. Level 0 represents the data written in real time, and this part of the data has a higher weight when merging;
Physical file id: It is the id corresponding to the file, 64 bytes because it is no longer associated with the segment, and no longer only needs to ensure the uniqueness of the table in the segment, but needs to be globally unique;
Stripe id: because an oss file can contain multiple bucket files, with stripe as the unit, it is convenient to merge multiple buckets written in a segment into one oss file. Avoid small oss files, resulting in performance degradation, and explosion of small oss files;
Total count: It is the number of file lines, which is also a weight for background merging, the larger the merging weight is, the lower it is.
Visibility bitmap records deleted file information
Start_row corresponds to 32k and corresponds to a delete bitmap. This 32000 4k, the 32k page used by the line storage can save 7 records.
Delete count is the number of deleted.
We don't need to visit oss, we can directly get the files that need to be merged, avoiding the delay caused by accessing oss, and oss also has a limited access to the throughput, so as to avoid triggering the flow limit of oss caused by frequent access.
5. Row and column mixed storage
The structure of Mergetree is shown on the left side of the figure above. The core is to merge small files into large orderly files through the background merge method, and when merging, we can rearrange the data, such as changing the orderly characteristics of the data. More optimization, refer to the follow-up ordered perception optimization. The difference with leveldb is:
The real-time writing of layer 0 will be merged, and the files of different buckets will be merged into a large file, and different buckets will fall into the corresponding stripe;
Merge will merge the files that meet the merge across layers. The files are strictly ordered, but the files are roughly ordered. The higher the number of layers, the larger the files, and the smaller the overlap between files.
We use the row-column mixed storage format for each file, and the specific storage format of the row-column mixed storage is on the right. We have made a lot of optimizations on the basis of ORC.
ORC file: An ORC file can contain multiple stripes, each stripe contains multiple row groups, each row group contains fixed records, and these records are stored independently by column.
Postscript: includes the description information PostScript of the file, file meta information (including the statistical information of the entire file, data dictionary, etc.), all stripe information and file schema information.
stripe: stripe is the segmentation of rows, grouping rows to form a stripe, and each read file is in units of row groups, saving the index and data of each column. It consists of index data, row data and stripe footer.
File footer: Save the location of the stripe, the statistics of each column in the stripe, and all stream types and locations.
Index data: Saves statistics at the row group level.
Data stream: A stream represents a valid piece of data in a file, including index and data.
The index stream saves the position and statistical information of each row group. The data stream includes multiple types of data. The specific types are determined by the column type and encoding method. The following two types of integer and string are used as examples:
For an Integer field, both a bitstream and a shapingstream are used. The bit stream is used to identify whether a certain value is null, and the shaping stream is used to save the integer value of the non-null record of the shaping field.
String type field, ORC writer will check at the beginning that the number of different contents in the field value accounts for the percentage of the total number of non-empty records is not more than 0.8, then use dictionary encoding, and the field value will be saved in a bit stream, a byte stream and in two shaping streams. Bit streams are also used to identify null values, byte streams are used to store dictionary values, a shaping stream is used to store the length of each entry in the dictionary, and another shaping stream is used to record field values. If dictionary encoding cannot be used, ORC writer will know that there are too few repeated values in this field, and the efficiency of dictionary encoding is not high. ORC writer will use a byte stream to save the value of the String field, and then use a shaping stream to save the value of each field byte length.
Three levels of statistical information are saved in the ORC file, namely file level, stripe level and row group level. The core of improving storage performance is to reduce IO. We implement various pushdowns based on ORC statistics and indexes to help us achieve IO pruning. For example, in Projection pushdown, we only scan the columns that need to be materialized. In Agg pushdown, we will directly read the required min, max, sum, and unique from the statistical information or the index and return them, avoiding the decompression of the data stream. For the predicate, we also support pushing the filter down, directly filtering through statistical information, and directly skipping stripes that do not meet the conditions. We support various operators, in/not in, and equivalent conversion of expressions.
In addition, we have optimized the performance of the storage format as follows:
Zero-copy: In order to convert the ORC data type to the PG data type, we copy the value of the fixed-length type, and directly convert the variable-length type to the PG datum as a pointer reference.
Batch Scan: Batch scan is used for columns. Instead of row-by-row access, one column is scanned first, and then the next column is scanned, which is more friendly to CPU cache.
Support Seek read: It is convenient to filter the jump in case of hit.
6. Local cache
DADI helps us achieve two capabilities, one is efficient cache management, and the other is unified storage access. Before getting to know DADI, we can first take a look at the comparison test between DADI and open source solutions from 2 dimensions of RT and throughput:
It can be seen that compared with the open source solution alluxio, DADI has an order of magnitude improvement in memory hit scene RT, and also has obvious advantages in throughput. In the scenario of hitting the disk, there are also obvious performance advantages. In some analysis scenarios, we will read file statistics frequently but in small amounts, and we will cache these statistics locally. This advantage brings a great improvement in overall performance.
For the performance advantages of DADI in the cache hit scenario, you can refer to the following architecture:
DADI SDK: access storage through standard read and write interfaces, and select short circuit read (short circuit read) or IPC process communication to access Local DADI Service, or access remote DADI Service according to whether the cache hits, corresponding to distributed cache service, as lib The library is embedded in the read and write process of ADB PG;
Cache Instance: manages local cache, cache files are abstracted into virtual block devices for access, data in memory and the current hot and cold of the disk are managed in units of blocks.
The core design here is:
Short-circuit read, read shared memory directly, avoid reading through IPC;
The data structure of whether the cache hits is also in the shared memory. Through reference count, combined with robust mutex to ensure the multi-thread safety of shared memory data;
Disk reading, 100us, + 27us is approximately equal to the disk reading itself rt, IPC uses shm communication, and does not use local socket communication.
Extremely low resource usage.
Memory: The memory used by DADI Service is between 100 and 200M. The reason is that the IPC implementation based on shared memory, data structures such as hash tables, avoid memory expansion under the multi-process architecture, streamlined coding method, and a memory page of 16k corresponds to a 4byte management structure ;
CPU: When the local DADI Service disk is full, the single-core CPU uses about 20%. The CPU is used on the SDK side, and the SDK rarely communicates with the Local DADI Service.
In addition, in order to better utilize the advantages of DADI in hitting memory, we have made the following optimizations in combination with row-column mixed storage:
Cache priority: supports high priority of statistical information, resident in memory, index information resident in local disk. Supports high-priority caching of dimension table data locally.
Fine-grained caching strategy: In order to avoid cold data access of large tables, resulting in all local hot data being replaced, large tables use dedicated cache areas.
File asynchronous prefetching: According to the query situation, the parsed data file is pre-read locally. This process does not affect the reading and writing of the current file and is asynchronous.
7. Vectorized execution
The ADB PG cloud-native version also supports the vectorized execution engine. The core is to increase the hit rate of data in the CPU cache by accumulating batches, reduce the number of function calls through codegen, reduce the jump of complex calculation instructions, and accelerate calculation through SIMD instructions. Memory pool management reduces memory copying between operators. For more information, please refer to [3].
8. Order perception
The order of data is mainly used in two aspects, based on ordered IO clipping, and the other is to minimize the sorting in the calculation process. IO clipping is mixed in rows and columns and there are many discussions. Here we mainly discuss the second point. Here The main work we do are:
Eliminate redundant sort operations. If the data itself is ordered and meets the sorting requirements, there is no need to add a sort operation.
Minimize the columns that need to be sorted. For example, if you want to sort {c1,c2,..cn}, if there is a predicate c1=5, the order is simplified to {c2,..cn}, avoiding one more field for sorting.
The order is pushed down. In the initialization phase, the descending intent sorting operation is pushed down as much as possible.
We use the following method to generate the sort scan operator. After querying SQL and parsing to generate AST, it will transform according to a series of heuristic rules to generate a physical execution plan:
First, according to the ordering requirements of different operators, such as (join/group by/distinct/order by), establish the interesting order of the operator (that is, the ordered input expected by this operator).
Secondly, the interesting order generated during the sort scan process will be pushed down to the lower operator as much as possible (sort-ahead) to meet the order attribute requirements as soon as possible.
If an operator has multiple interesting orders, it will try to merge them so that one sort can meet the requirements of multiple order attributes.
In addition, it is the implementation of the sort scan operator. The storage layer can only guarantee the strict order of the files, and the approximate order of the files. We use a multi-way merge algorithm to achieve this.
The problem here is that the multi-way merge of sort scan needs to read data one by one, which conflicts with vectorized batch scan and batch read of files. We use CBO to select the optimal execution plan.
9. Fine-grained parallelism
ADB PG is an MPP architecture, which can give full play to the parallel computing capabilities between nodes. The cloud-native version can help us achieve finer-grained parallelism in nodes because it divides data into buckets. Let's take join as an example to illustrate:
On the left is the execution plan without parallel join in the node. Two processes will be started, one for the build of hash join and the other for probe. For example, the data of each bucket in the above figure can be calculated in parallel. Since the data is divided according to the bucket, when the join key is distributed, the parallelism within the node can also perfectly hit the optimization of the local join.
Four: Performance Results
1 Scalability performance
2 Read and write performance
In order to test the performance, we used an instance of 4*4C specification, and conducted a performance comparison test between the new cloud native version of ADB PG and the storage elastic version.
write performance test
The test table uses the TPC-H lineitem table with scale factor = 500. By executing copy commands with different concurrent numbers at the same time, the command execution time is measured, and the throughput is obtained by dividing the total data volume by the command execution time.
Under single concurrency, the performance of the new version is similar to that of the storage elastic version, mainly because the resources are not full;
Under 4 concurrency, the throughput of the new version is twice that of the storage elasticity. The reason is that the sort key is defined in the lineitem table. The new version does not need to write WAL logs when writing data. In addition, batching and pipeline parallelism are faster than the elastic storage version. Writing, and then merging, it is necessary to write additional WAL when merging, which has certain advantages;
Under 8 concurrency, the new version is almost the same as 4 concurrency, mainly because 4C 4 concurrency has already used up the CPU, so increasing the concurrency will not improve.
read performance test
In order to comprehensively test the read performance, we tested three scenarios:
Full memory: A data set with TPCH sf of 10 is used, and a test data set of 10G will be generated.
Full local disk cache: A data set with TPCH sf of 500 is used, and a test data set of 500GB will be generated.
Half cache, half OSS: A dataset with TPCH sf of 2000 is used, and a test dataset of 2000 GB will be generated. (local disk cache 960GB)
The test results are as follows (the vertical axis is RT in ms)
full memory
Full local disk cache
Half local cache, half OSS
From the above test results:
Compared with the old elastic storage version, the cloud-native version has more than double the performance improvement, because of the acceleration effect brought by fine-grained parallelism;
For computing-intensive jobs such as TPCH, even if half of the data is cached, half of the OSS performance is good. The data volume of sf 2000 is 4 times that of sf 500, and the rt is increased to 2.8 times. The bandwidth bottleneck to OSS, and due to the optimization such as prefetching of reading itself.
Five: Summary
The new cloud-native version of AnalyticDB PostgreSQL fully pools physical resources, allocates storage and computing capabilities in units, and implements flexible deployment. This feature provides users with the ultimate cost-effectiveness, achieves the optimal distribution of computing power, and lowers the threshold for users to use it, allowing users to focus on business without having to spend a lot of energy on computing power and storage planning, and achieve experience upgrades.
Through the separation of storage and computing, users can easily adapt to computing-intensive or storage-intensive according to the business load model, store and charge according to usage, and avoid resource waste caused by the rigid integration of storage and computing;
Dynamically adapting to peaks and troughs of business load, the computing side of the cloud-native MPP architecture uses a shared-nothing architecture, which supports second-level elastic scaling capabilities, and shared storage makes the underlying storage independent of computing. This lowers the threshold for users to select specifications in the early stage, and reserves the flexibility of dynamic adjustment according to the business in the later stage;
Based on the separation of storage and computing, it provides data sharing capabilities, which truly breaks the boundaries of physical machines and allows data on the cloud to truly flow. For example, the real-time sharing of data across instances can support the use mode of one storage and multiple reads, which breaks the isolated island where data access between traditional data warehouse instances needs to be imported first, and then accessed, simplifying operations, improving efficiency, and reducing costs.
Six: follow-up plan
Based on the above storage separation architecture, we have three main directions in the future:
Completion of capabilities, this part is mainly to complete some limitations of the current version, such as primary key, index, materialized view, and the ability to complete writing;
The performance is continuously optimized, mainly optimizing the scene where the cache does not hit;
The cloud native architecture continues to be upgraded, which is mainly to further improve the user experience under the current storage and computing separation architecture;
In the cloud native upgrade, we mainly have two key directions:
Separation of storage and calculation is a further step towards Serverless, and there is no sense of expansion and contraction. The metadata and state will be further stripped from the computing nodes to the service layer, and the segment will be made stateless. The advantage of this is that the expansion and contraction can make the user feel indifferent. Another advantage is that the stateless segment is conducive to improving the system performance. Availability. At present, we still provide high availability through the active-standby mode. When there is a node failure, the cache failure performance of the active-standby switching will drop sharply. After the segment becomes stateless, we will directly take it out of the cluster and use the "shrinkage" method Continue to improve service.
Application data sharing across instances. In addition, for analytical business, the data scale is large, starting with TB, the traditional data warehouse adopts a chimney architecture, data redundancy, and data synchronization costs are high. We hope to provide cross-instance data sharing capabilities and reconstruct the data warehouse architecture.
Garner predicts that by 2022, 75% of all databases will be deployed or migrated to cloud platforms. Another authoritative organization, IDC, also predicts that by 2025, more than 50% of databases will be deployed on public clouds, and China will reach an astonishing 70% or more. After years of development, cloud databases have undergone a transformation from Cloud-Hosted (cloud hosting) to Cloud Native (cloud-native) models.
Cloud-Hosted: Based on market and industry cloud requirements, most vendors choose cloud hosting as the first step of evolution. This model will no longer require users to build their own IDC offline, but will rely on the cloud provider's standardized resources to migrate the data warehouse and provide a high degree of hosting, thereby liberating users from the management costs of the underlying hardware and the constraints of Lingplan resources. .
Cloud-Native: However, as more businesses are migrated to the cloud, underlying computing and storage resources are bound together, causing users to still need to consider unnecessary waste of resources during use. For example, an increase in computing resources will require storage association increase, resulting in ineffective costs. Users begin to expect that cloud resources can disassemble data warehouse resources in a more fine-grained manner, that is, decouple computing and storage capabilities and split them into salable units to meet business resource orchestration. At this point, the maximum value of cloud native is truly highlighted. We are no longer focusing on building a data warehouse with a balance of storage and calculation, but are oriented to user business, allowing large-scale computing or storage tilt, and independently deploying the resources required by the business. Deploy and sell according to the smallest unit. At this moment, we have truly entered the cloud-native era of data warehouses.
At the 2021 Yunqi Conference, Alibaba Cloud announced a data warehouse with a new cloud-native architecture [1]. This article introduces the evolution and exploration of the cloud-native data warehouse product AnalyticDB PostgreSQL (hereinafter referred to as ADB PG) from Cloud-Hosted to Cloud-Native, discusses the underlying design and thinking in order to realize real resource pooling and flexible sales, and covers the content of the product. Architecture design, key technologies, performance results, effect realization and follow-up plans. (The reading time of the full text is about 10 minutes)
Two: ADB PG cloud native architecture
In order to allow users to quickly adapt to the cloud data warehouse, we are currently adopting the design concept of the MPP architecture on the cloud, deploying the coordination node and computing node independently, but carrying them on a single ECS, realizing the integration of computing node storage and computing Deployment design, which is highly adaptable to the design architecture and self-built on the client side, can quickly and non-destructively migrate the data warehouse business to the cloud, which is very friendly to early cloud adaptation and meets the main requirement of parallel expansion of resources .
With the further evolution of cloud native, we provide a brand-new storage-computing separation architecture, which further splits the product into a service layer, computing layer, and shared storage layer. The architecture diagram is as follows:
Master coordination node: save global schema information and realize global transaction management;
Row storage engine: used to save metadata information, where metadata information mainly refers to the visibility information of shared storage files, including two parts:
One is the relationship between files and tables
The other is the delete bitmap of the deleted data
Based on line storage, we can inherit PG's local transaction capabilities, and are fully compatible with PG's transaction capabilities while adding, deleting, modifying and checking;
Local cache: By introducing DADI from the storage team to achieve high-performance local cache, the full name of DADI is Alibaba Cloud Data Accelerator for Disaggregated Infrastructure. Compared with open source products, the performance has an order of magnitude improvement;
Shared storage: We have borrowed some key designs from ClickHouse, and implemented a MergeTree-based row-column mixed storage at the storage layer. In addition, we have created a unified access interface based on the file interface for shared storage, and are highly compatible with both OSS and HDFS. Various forms of distributed file system;
When we are designing the architecture, compared with HAWQ, which is also derived from Greenplum, HAWQ saves metadata in the master. Every time we write, we bring the modified metadata to the master to update. When reading, we start with The master reads the required metadata, and then includes all the metadata in the execution plan, so that the segment can get the corresponding metadata, and the segment can be completely stateless.
But this design will bring 2 core problems:
Metadata inflation causes the master to become a bottleneck.
Limited by the performance of metadata, it cannot support high-concurrency real-time writing.
The reason why we don’t design this way is that we hope to support high-concurrency tasks in the future. ADB PG spent more than 2 years expanding Greenplum’s single-point master architecture to a multi-master. The core is to solve high-concurrency tasks. The problem of real-time writing, if the metadata is saved on the master, it will cause problems such as:
Metadata storage and access on the master can easily form a single point of bottleneck
It is necessary to do a lot of refactoring on the execution layer of ADB PG to bring metadata into the execution plan, which will drastically increase the bandwidth of the query plan itself, which is very unfriendly to small queries with high concurrency.
So we improved the architecture, distributed metadata to segments, avoided and realized:
The master's storage and reading and writing will not become a bottleneck
There is no need to refactor the execution layer, and the metadata is dispersed to reduce the bandwidth pressure of a single query.
Put the metadata on the segment on the distributed kv to solve the problem of metadata relocation for expansion and contraction.
The reason for using OSS for shared storage is that as the business data of a single user continues to grow, a sustainable storage solution is required, and OSS's low storage cost, high availability, and data persistence are the best choices.
Another advantage of using OSS is that you pay on demand. Users do not need to prefabricate the size of the storage space, how much data is stored, and how much to pay. After the data is deleted, there will be no charge. ESSD cloud disks usually need to calculate the storage water level based on the data, which cannot be done. The real on-demand supply of storage resources, and the inability to automatically shrink capacity, all of these violate our cloud-native design philosophy. But at the same time, the disadvantage of OSS is RT:
In order to solve the RT problem of OSS, we configure a certain proportion of local disks for computing nodes for access acceleration. In addition, we designed a high-performance row-column mixed storage, drawing on the core idea of ClickHouse mergetree storage, with order as the core, the files are absolutely ordered, and the files are relatively ordered. Through the asynchronous operation of merge, the file is realized. Merging and sorting, based on ordering, we designed 3 layers of statistical information in the file, and did a lot of IO clipping optimization.
Below we further introduce each technical point.
Three: key technologies
1. Elastic expansion
In order to achieve fast elastic scaling, our method is to organize data in hash buckets on shared storage. After scaling, the computing nodes and buckets are remapped through consistent hash. In order to solve the uniformity of bucket and segment allocation, and reduce the For the impact of cache failure after scaling, we have improved the traditional consistent hash algorithm to support dynamic mapping during scaling.
Divide the data into multiple shards according to the hash bucket, and remap the data on the object storage according to the sharding granularity. If the expansion of computing nodes exceeds the number of shards, the data can only be redistributed. To solve this problem, we support hash buckets that can be split and merged in the background to avoid redistribution of data.
The above is the remapping of "data" during expansion and contraction, and the metadata describing the visibility of data files is stored in the row table. We still use Greenplum's data redistribution strategy. The difference is that in order to speed up the metadata For redistribution, we have made optimizations such as parallel distribution.
Let's take capacity expansion as an example to further refine the process of capacity expansion:
Combined with technologies such as ECS resource pooling, network card parallel loading, and docker image preheating, the end-to-end time consumption within 16 nodes is close to 1 minute.
2. Hierarchical storage
The implementation of hierarchical storage is as follows:
As shown in the figure above, we divide storage resources into three layers, including memory, local disk, and shared storage.
Memory: It is mainly responsible for the acceleration of line storage access and the cache of file statistics;
Local disk: as persistent storage for row storage, and as a local accelerator for remote shared storage;
Remote shared storage: as a persistent storage of data.
3. Read and write process
The writing process is as follows:
The data written by the user is directly written to OSS through data batching, and a piece of metadata is recorded on the local disk at the same time. This piece of metadata records the correspondence between files and data tables. Metadata is implemented using PG's line storage table, and we save this information through the file metadata table.
When updating or deleting, we don’t need to directly modify the data on OSS. We do this by marking the deletion. The information marked for deletion is also stored in the local row storage table. We store this information through the visibility bitmap. Marking deletion will lead to a decrease in read performance. We use background merge to apply deletion information to files to reduce the impact of deletion on read performance.
When we write, we further divide the data on the segment according to the bucket, which will bring about the problem of small files. In order to solve the problem of small files, we have made the following optimizations:
Group flush: A batch of written data can be written to the same OSS file through group flush. Our OSS file adopts the ORC format, and different buckets are written to the corresponding strip;
The pipeline is asynchronous and parallel: coding and sorting are typical cpu-intensive tasks, and uploading to oss is a typical network IO-intensive task. We will parallel these two task types, and the task of uploading oss is executed as an asynchronous task. At the same time, the next batch of data is encoded and sorted to speed up the writing performance.
Because the remote persistent storage provides 12 nines of persistence, only the row storage that saves metadata has WAL logs and double copies to ensure reliability. The data itself is written to the shared storage without WAL logs and multiple copies. Due to the reduction of WAL logs and the master-slave synchronization of WAL logs, and through asynchronous parallelism and batch accumulation, in the batch writing scenario, our writing performance is basically equal to that of the ECS elastic storage version.
The reading process is as follows:
We get the OSS files to be scanned by reading the file metadata table.
Read the corresponding file according to the OSS file.
The read file filters out deleted data through the visibility bitmap of the metadata table.
In order to solve the delay caused by reading OSS, we also introduced DADI to help us implement cache management and encapsulate shared file access. When reading a file, it will first determine whether there is a local cache. If so, it will be read directly from the local disk. No It will go to OSS to read, and it will be cached locally after reading. When writing, it will directly write to OSS and write back to the local disk. Writing back is an asynchronous operation. We also manage the elimination of local cache data through DADI, which automatically eliminates cold data according to the LRU/LFU strategy.
Since the transaction is implemented using PG's row storage, it is fully compatible with ADB PG's transaction. The problem is that we need to redistribute this part of the data when expanding and shrinking. We redesigned the redistribution mechanism of this data , through pre-partitioning, parallel copy, point-to-point copy and other technologies, the expansion and contraction time is greatly shortened.
To summarize the performance optimization points:
Realize transaction ACID through local row storage table, support data block level concurrency;
Improve write throughput through Batch and pipeline parallelization;
Accelerated access to memory and local SSD multi-level cache based on DADI.
4. Visibility table
We store information related to shared storage files in File Metadata, and its structure is as follows:
Hash bucket: It is used to scan according to the bucket when relocating data during expansion and contraction. When querying, it is also a bucket followed by a bucket;
Level: It is the level of the merge tree. Level 0 represents the data written in real time, and this part of the data has a higher weight when merging;
Physical file id: It is the id corresponding to the file, 64 bytes because it is no longer associated with the segment, and no longer only needs to ensure the uniqueness of the table in the segment, but needs to be globally unique;
Stripe id: because an oss file can contain multiple bucket files, with stripe as the unit, it is convenient to merge multiple buckets written in a segment into one oss file. Avoid small oss files, resulting in performance degradation, and explosion of small oss files;
Total count: It is the number of file lines, which is also a weight for background merging, the larger the merging weight is, the lower it is.
Visibility bitmap records deleted file information
Start_row corresponds to 32k and corresponds to a delete bitmap. This 32000 4k, the 32k page used by the line storage can save 7 records.
Delete count is the number of deleted.
We don't need to visit oss, we can directly get the files that need to be merged, avoiding the delay caused by accessing oss, and oss also has a limited access to the throughput, so as to avoid triggering the flow limit of oss caused by frequent access.
5. Row and column mixed storage
The structure of Mergetree is shown on the left side of the figure above. The core is to merge small files into large orderly files through the background merge method, and when merging, we can rearrange the data, such as changing the orderly characteristics of the data. More optimization, refer to the follow-up ordered perception optimization. The difference with leveldb is:
The real-time writing of layer 0 will be merged, and the files of different buckets will be merged into a large file, and different buckets will fall into the corresponding stripe;
Merge will merge the files that meet the merge across layers. The files are strictly ordered, but the files are roughly ordered. The higher the number of layers, the larger the files, and the smaller the overlap between files.
We use the row-column mixed storage format for each file, and the specific storage format of the row-column mixed storage is on the right. We have made a lot of optimizations on the basis of ORC.
ORC file: An ORC file can contain multiple stripes, each stripe contains multiple row groups, each row group contains fixed records, and these records are stored independently by column.
Postscript: includes the description information PostScript of the file, file meta information (including the statistical information of the entire file, data dictionary, etc.), all stripe information and file schema information.
stripe: stripe is the segmentation of rows, grouping rows to form a stripe, and each read file is in units of row groups, saving the index and data of each column. It consists of index data, row data and stripe footer.
File footer: Save the location of the stripe, the statistics of each column in the stripe, and all stream types and locations.
Index data: Saves statistics at the row group level.
Data stream: A stream represents a valid piece of data in a file, including index and data.
The index stream saves the position and statistical information of each row group. The data stream includes multiple types of data. The specific types are determined by the column type and encoding method. The following two types of integer and string are used as examples:
For an Integer field, both a bitstream and a shapingstream are used. The bit stream is used to identify whether a certain value is null, and the shaping stream is used to save the integer value of the non-null record of the shaping field.
String type field, ORC writer will check at the beginning that the number of different contents in the field value accounts for the percentage of the total number of non-empty records is not more than 0.8, then use dictionary encoding, and the field value will be saved in a bit stream, a byte stream and in two shaping streams. Bit streams are also used to identify null values, byte streams are used to store dictionary values, a shaping stream is used to store the length of each entry in the dictionary, and another shaping stream is used to record field values. If dictionary encoding cannot be used, ORC writer will know that there are too few repeated values in this field, and the efficiency of dictionary encoding is not high. ORC writer will use a byte stream to save the value of the String field, and then use a shaping stream to save the value of each field byte length.
Three levels of statistical information are saved in the ORC file, namely file level, stripe level and row group level. The core of improving storage performance is to reduce IO. We implement various pushdowns based on ORC statistics and indexes to help us achieve IO pruning. For example, in Projection pushdown, we only scan the columns that need to be materialized. In Agg pushdown, we will directly read the required min, max, sum, and unique from the statistical information or the index and return them, avoiding the decompression of the data stream. For the predicate, we also support pushing the filter down, directly filtering through statistical information, and directly skipping stripes that do not meet the conditions. We support various operators, in/not in, and equivalent conversion of expressions.
In addition, we have optimized the performance of the storage format as follows:
Zero-copy: In order to convert the ORC data type to the PG data type, we copy the value of the fixed-length type, and directly convert the variable-length type to the PG datum as a pointer reference.
Batch Scan: Batch scan is used for columns. Instead of row-by-row access, one column is scanned first, and then the next column is scanned, which is more friendly to CPU cache.
Support Seek read: It is convenient to filter the jump in case of hit.
6. Local cache
DADI helps us achieve two capabilities, one is efficient cache management, and the other is unified storage access. Before getting to know DADI, we can first take a look at the comparison test between DADI and open source solutions from 2 dimensions of RT and throughput:
It can be seen that compared with the open source solution alluxio, DADI has an order of magnitude improvement in memory hit scene RT, and also has obvious advantages in throughput. In the scenario of hitting the disk, there are also obvious performance advantages. In some analysis scenarios, we will read file statistics frequently but in small amounts, and we will cache these statistics locally. This advantage brings a great improvement in overall performance.
For the performance advantages of DADI in the cache hit scenario, you can refer to the following architecture:
DADI SDK: access storage through standard read and write interfaces, and select short circuit read (short circuit read) or IPC process communication to access Local DADI Service, or access remote DADI Service according to whether the cache hits, corresponding to distributed cache service, as lib The library is embedded in the read and write process of ADB PG;
Cache Instance: manages local cache, cache files are abstracted into virtual block devices for access, data in memory and the current hot and cold of the disk are managed in units of blocks.
The core design here is:
Short-circuit read, read shared memory directly, avoid reading through IPC;
The data structure of whether the cache hits is also in the shared memory. Through reference count, combined with robust mutex to ensure the multi-thread safety of shared memory data;
Disk reading, 100us, + 27us is approximately equal to the disk reading itself rt, IPC uses shm communication, and does not use local socket communication.
Extremely low resource usage.
Memory: The memory used by DADI Service is between 100 and 200M. The reason is that the IPC implementation based on shared memory, data structures such as hash tables, avoid memory expansion under the multi-process architecture, streamlined coding method, and a memory page of 16k corresponds to a 4byte management structure ;
CPU: When the local DADI Service disk is full, the single-core CPU uses about 20%. The CPU is used on the SDK side, and the SDK rarely communicates with the Local DADI Service.
In addition, in order to better utilize the advantages of DADI in hitting memory, we have made the following optimizations in combination with row-column mixed storage:
Cache priority: supports high priority of statistical information, resident in memory, index information resident in local disk. Supports high-priority caching of dimension table data locally.
Fine-grained caching strategy: In order to avoid cold data access of large tables, resulting in all local hot data being replaced, large tables use dedicated cache areas.
File asynchronous prefetching: According to the query situation, the parsed data file is pre-read locally. This process does not affect the reading and writing of the current file and is asynchronous.
7. Vectorized execution
The ADB PG cloud-native version also supports the vectorized execution engine. The core is to increase the hit rate of data in the CPU cache by accumulating batches, reduce the number of function calls through codegen, reduce the jump of complex calculation instructions, and accelerate calculation through SIMD instructions. Memory pool management reduces memory copying between operators. For more information, please refer to [3].
8. Order perception
The order of data is mainly used in two aspects, based on ordered IO clipping, and the other is to minimize the sorting in the calculation process. IO clipping is mixed in rows and columns and there are many discussions. Here we mainly discuss the second point. Here The main work we do are:
Eliminate redundant sort operations. If the data itself is ordered and meets the sorting requirements, there is no need to add a sort operation.
Minimize the columns that need to be sorted. For example, if you want to sort {c1,c2,..cn}, if there is a predicate c1=5, the order is simplified to {c2,..cn}, avoiding one more field for sorting.
The order is pushed down. In the initialization phase, the descending intent sorting operation is pushed down as much as possible.
We use the following method to generate the sort scan operator. After querying SQL and parsing to generate AST, it will transform according to a series of heuristic rules to generate a physical execution plan:
First, according to the ordering requirements of different operators, such as (join/group by/distinct/order by), establish the interesting order of the operator (that is, the ordered input expected by this operator).
Secondly, the interesting order generated during the sort scan process will be pushed down to the lower operator as much as possible (sort-ahead) to meet the order attribute requirements as soon as possible.
If an operator has multiple interesting orders, it will try to merge them so that one sort can meet the requirements of multiple order attributes.
In addition, it is the implementation of the sort scan operator. The storage layer can only guarantee the strict order of the files, and the approximate order of the files. We use a multi-way merge algorithm to achieve this.
The problem here is that the multi-way merge of sort scan needs to read data one by one, which conflicts with vectorized batch scan and batch read of files. We use CBO to select the optimal execution plan.
9. Fine-grained parallelism
ADB PG is an MPP architecture, which can give full play to the parallel computing capabilities between nodes. The cloud-native version can help us achieve finer-grained parallelism in nodes because it divides data into buckets. Let's take join as an example to illustrate:
On the left is the execution plan without parallel join in the node. Two processes will be started, one for the build of hash join and the other for probe. For example, the data of each bucket in the above figure can be calculated in parallel. Since the data is divided according to the bucket, when the join key is distributed, the parallelism within the node can also perfectly hit the optimization of the local join.
Four: Performance Results
1 Scalability performance
2 Read and write performance
In order to test the performance, we used an instance of 4*4C specification, and conducted a performance comparison test between the new cloud native version of ADB PG and the storage elastic version.
write performance test
The test table uses the TPC-H lineitem table with scale factor = 500. By executing copy commands with different concurrent numbers at the same time, the command execution time is measured, and the throughput is obtained by dividing the total data volume by the command execution time.
Under single concurrency, the performance of the new version is similar to that of the storage elastic version, mainly because the resources are not full;
Under 4 concurrency, the throughput of the new version is twice that of the storage elasticity. The reason is that the sort key is defined in the lineitem table. The new version does not need to write WAL logs when writing data. In addition, batching and pipeline parallelism are faster than the elastic storage version. Writing, and then merging, it is necessary to write additional WAL when merging, which has certain advantages;
Under 8 concurrency, the new version is almost the same as 4 concurrency, mainly because 4C 4 concurrency has already used up the CPU, so increasing the concurrency will not improve.
read performance test
In order to comprehensively test the read performance, we tested three scenarios:
Full memory: A data set with TPCH sf of 10 is used, and a test data set of 10G will be generated.
Full local disk cache: A data set with TPCH sf of 500 is used, and a test data set of 500GB will be generated.
Half cache, half OSS: A dataset with TPCH sf of 2000 is used, and a test dataset of 2000 GB will be generated. (local disk cache 960GB)
The test results are as follows (the vertical axis is RT in ms)
full memory
Full local disk cache
Half local cache, half OSS
From the above test results:
Compared with the old elastic storage version, the cloud-native version has more than double the performance improvement, because of the acceleration effect brought by fine-grained parallelism;
For computing-intensive jobs such as TPCH, even if half of the data is cached, half of the OSS performance is good. The data volume of sf 2000 is 4 times that of sf 500, and the rt is increased to 2.8 times. The bandwidth bottleneck to OSS, and due to the optimization such as prefetching of reading itself.
Five: Summary
The new cloud-native version of AnalyticDB PostgreSQL fully pools physical resources, allocates storage and computing capabilities in units, and implements flexible deployment. This feature provides users with the ultimate cost-effectiveness, achieves the optimal distribution of computing power, and lowers the threshold for users to use it, allowing users to focus on business without having to spend a lot of energy on computing power and storage planning, and achieve experience upgrades.
Through the separation of storage and computing, users can easily adapt to computing-intensive or storage-intensive according to the business load model, store and charge according to usage, and avoid resource waste caused by the rigid integration of storage and computing;
Dynamically adapting to peaks and troughs of business load, the computing side of the cloud-native MPP architecture uses a shared-nothing architecture, which supports second-level elastic scaling capabilities, and shared storage makes the underlying storage independent of computing. This lowers the threshold for users to select specifications in the early stage, and reserves the flexibility of dynamic adjustment according to the business in the later stage;
Based on the separation of storage and computing, it provides data sharing capabilities, which truly breaks the boundaries of physical machines and allows data on the cloud to truly flow. For example, the real-time sharing of data across instances can support the use mode of one storage and multiple reads, which breaks the isolated island where data access between traditional data warehouse instances needs to be imported first, and then accessed, simplifying operations, improving efficiency, and reducing costs.
Six: follow-up plan
Based on the above storage separation architecture, we have three main directions in the future:
Completion of capabilities, this part is mainly to complete some limitations of the current version, such as primary key, index, materialized view, and the ability to complete writing;
The performance is continuously optimized, mainly optimizing the scene where the cache does not hit;
The cloud native architecture continues to be upgraded, which is mainly to further improve the user experience under the current storage and computing separation architecture;
In the cloud native upgrade, we mainly have two key directions:
Separation of storage and calculation is a further step towards Serverless, and there is no sense of expansion and contraction. The metadata and state will be further stripped from the computing nodes to the service layer, and the segment will be made stateless. The advantage of this is that the expansion and contraction can make the user feel indifferent. Another advantage is that the stateless segment is conducive to improving the system performance. Availability. At present, we still provide high availability through the active-standby mode. When there is a node failure, the cache failure performance of the active-standby switching will drop sharply. After the segment becomes stateless, we will directly take it out of the cluster and use the "shrinkage" method Continue to improve service.
Application data sharing across instances. In addition, for analytical business, the data scale is large, starting with TB, the traditional data warehouse adopts a chimney architecture, data redundancy, and data synchronization costs are high. We hope to provide cross-instance data sharing capabilities and reconstruct the data warehouse architecture.
Related Articles
-
A detailed explanation of Hadoop core architecture HDFS
Knowledge Base Team
-
What Does IOT Mean
Knowledge Base Team
-
6 Optional Technologies for Data Storage
Knowledge Base Team
-
What Is Blockchain Technology
Knowledge Base Team
Explore More Special Offers
-
Short Message Service(SMS) & Mail Service
50,000 email package starts as low as USD 1.99, 120 short messages start at only USD 1.00