Introduction to Flink Sort-Shuffle Implementation
1. Introduction to Data Shuffle
Data shuffle is an important stage of batch data processing jobs. In this stage, the output data of upstream processing nodes will be persisted to external storage, and then downstream computing nodes will read and process the data. These persistent data are not only a form of data exchange between computing nodes, but also play an important role in error recovery.
Currently, there are two batch data shuffle models adopted by existing large-scale distributed computing systems, namely the hash-based method and the sort-based method:
The core idea of the hash-based method is to write the data sent to different concurrent consumption tasks downstream into a separate file, so that the file itself becomes a natural boundary to distinguish different data partitions;
The core idea based on the sort method is to first write the data of all partitions together, and then use sort to distinguish the boundaries of different data partitions.
We introduced the sort-based batch shuffle implementation into Flink in version 1.12 of Flink and continued to optimize its performance and stability in the future; by version 1.13 of Flink, sort-shuffle was already available for production.
2. The significance of introducing Sort-Shuffle
One of the important reasons why we introduce the implementation of sort-shuffle in Flink is that Flink's original hash-based implementation is not available for large-scale batch jobs. This is also proven by other existing large-scale distributed computing systems:
In terms of stability: For high-concurrency batch jobs, the hash-based implementation will generate a large number of files, and read and write these files concurrently, which will consume a lot of resources and put a lot of pressure on the file system. The file system needs to maintain a large amount of file metadata, which will generate unstable risks such as file handle and inode exhaustion.
In terms of performance: For high-concurrency batch jobs, concurrent reading and writing of a large number of files means a large number of random IOs, and the actual amount of data read and written for each IO may be very small, which is a huge challenge for IO performance. On disk, this makes data shuffling an easy performance bottleneck for batch jobs.
By introducing a sort-based batch data shuffle implementation, the number of concurrently read and written files can be greatly reduced, which is conducive to better data sequential reading and writing, thereby improving the stability and performance of Flink's large-scale batch processing jobs. In addition, the new sort-shuffle implementation can reduce memory buffer consumption. For the hash-based implementation, each data partition requires a read-write buffer, and memory buffer consumption is directly proportional to concurrency. The sort-based implementation can achieve decoupling of memory buffer consumption and job concurrency (although larger memory may bring higher performance).
More importantly, we have implemented a new storage structure and IO optimization for reading and writing, which makes Flink's batch data shuffle more advantageous than other large-scale distributed data processing systems. The following sections will introduce Flink's sort-shuffle implementation and the results obtained in more detail.
3. Implementation of Flink Sort-Shuffle
Similar to the batch data sort-shuffle implementation of other distributed systems, the entire shuffle process of Flink is divided into several important stages, including writing data to the memory buffer, sorting the memory buffer, and writing the sorted data out Read shuffle data to and from files and send downstream. However, compared with other systems, Flink's implementation has some fundamental differences, including multi-segment data storage format, omitting data merging process, and data reading IO scheduling. All of these make the implementation of Flink have better performance.
1. Design goals
Throughout the implementation of Flink sort-shuffle, we consider the following points as the main design goals:
1.1 Reduce the number of files
As discussed above, the hash-based implementation generates a large number of files, and reducing the number of files is beneficial for stability and performance. The Sort-Spill-Merge method is widely adopted by distributed computing systems to achieve this goal. First, the data is written into the memory buffer. When the memory buffer is full, the data is sorted, and the sorted data is written out to a In the file, the total number of files is: (total data volume/memory buffer size), so the number of files is reduced. When all the data is written out, the generated files are merged into one file, thereby further reducing the number of files and increasing the size of each data partition (good for sequential reading).
Compared with the implementation of other systems, Flink's implementation has an important difference, that is, Flink always appends data to the same file, instead of writing multiple files and then merging them. The advantage of this is that there is always only one file, and the number of files Minimization is achieved.
1.2 Open fewer files
Too many files opened at the same time will consume more resources, and at the same time, it will easily lead to the problem of insufficient file handles, resulting in poor stability. Therefore, opening fewer files is beneficial to improve system stability. For data writing, as described above, each concurrent task always has only one file open by always appending to the same file. For data reading, although each file needs to be read by a large number of downstream concurrent tasks, Flink still achieves the goal of opening each file only once by only opening the file once and sharing the file handle among these concurrent reading tasks.
1.3 Maximize sequential read and write
The sequential reading and writing of files is critical to the IO performance of files. By reducing the number of shuffle files, we have reduced random file IO to some extent. In addition, Flink's batch data sort-shuffle also implements more IO optimizations to maximize the sequential reading and writing of files. During the data write phase, better sequential writes are achieved by aggregating the data buffers to be written out into larger batches and writing out via the wtitev system call. In the data reading phase, by introducing read IO scheduling, data reading requests are always served in the order of file offsets to maximize the sequential reading of files. Experiments show that these optimizations greatly improve the performance of batch data shuffle.
1.4 Reduce read and write IO amplification
The traditional sort-spill-merge method increases the size of the read data block by merging the generated multiple files into a larger file. Although this implementation scheme has brought benefits, it also has some shortcomings. The final point is to amplify the read and write IO. For the data shuffle between computing nodes, if no error occurs, it only needs to write and Data is read once, but data merging causes the same data to be read and written multiple times, resulting in an increase in the total amount of IO and a large consumption of storage space.
The implementation of Flink avoids the process of merging files by continuously adding data to the same file and its unique storage structure. Although the size of a single data block is smaller than the size after merging, it avoids the overhead of merging files and combines Flink’s unique Some IO scheduling can eventually achieve higher performance than the sort-spill-merge scheme.
1.5 Reduce memory buffer consumption
Similar to the implementation of sort-shuffle in other distributed computing systems, Flink uses a fixed-size memory buffer for data caching and sorting. The size of this memory buffer has nothing to do with concurrency, so that the size of the memory buffer required for upstream shuffle data writing is decoupled from concurrency. Combined with another memory management optimization FLINK-16428, it can simultaneously realize the concurrency-independent memory buffer consumption of downstream shuffle data reading, thereby reducing the memory buffer consumption of large-scale batch jobs. (Note: FLINK-16428 applies to both batch and streaming jobs)
2. Implementation details
2.1 Memory data sorting
In the sort-spill phase of shuffle data, each piece of data is first serialized and written into the sort buffer. When the buffer is full, all binary data in the buffer are sorted in the order of the data partitions. Thereafter, the sorted data is written out to the file in the order of the data partitions. Although the data itself is not currently sorted, the interface of the sort buffer is generalized enough to implement subsequent potentially more complex sorting requirements. The interface of the sort buffer is defined as follows:
In the sorting algorithm, we chose bucket-sort with low complexity. Specifically, a 16-byte metadata is inserted in front of each piece of serialized data. Including 4-byte length, 4-byte data type and 8-byte pointer to the next piece of data in the same data partition. The structure is shown in the figure below:
When reading data from the buffer, you only need to read all the data belonging to this data partition according to the chain index structure of each data partition, and these data maintain the order when the data is written. In this way, reading all the data according to the order of the data partitions can achieve the goal of sorting according to the data partitions.
2.2 File storage structure
As mentioned earlier, the shuffle data generated by each parallel task will be written to a physical file. Each physical file contains multiple data regions, and each data region is generated by a sort-spill of the data buffer. In each data block, all data belonging to different data partitions (consumed by different parallel tasks of downstream computing nodes) are sorted and aggregated according to the sequence number of the data partitions. The figure below shows the detailed structure of the shuffle data file. Among them (R1, R2, R3) are 3 different data blocks, which correspond to 3 sort-spill writes of data. There are 3 different data partitions in each data block, which will be read by (C1, C2, C3) 3 different parallel consumer tasks. That is to say data B1.1, B2.1 and B3.1 will be processed by C1, data B1.2, B2.2 and B3.2 will be processed by C2, and data B1.3, B2.3 and B3.3 will be processed by C3 deal with.
Similar to other distributed processing system implementations, in Flink, each data file also corresponds to an index file. The index file is used to index the data (data partition) belonging to each consumer when reading. The index file contains the same data region as the data file. In each data region, there are the same number of index items as the data partition. Each index item contains two parts, corresponding to the offset of the data file and the length of the data. as an optimization. Flink caches up to 4M of index data for each index file. The corresponding relationship between data files and index files is as follows:
2.3 Read IO scheduling
In order to further improve file IO performance, based on the above storage structure, Flink further introduces an IO scheduling mechanism, which is similar to the elevator algorithm of disk scheduling. Flink's IO scheduling is always scheduled according to the file offset order of IO requests. More specifically, if a data file has n data regions, each data region has m data partitions, and m downstream computing tasks read the data file, then the following pseudocode shows Flink's IO scheduling algorithm workflow:
2.4 Data broadcast optimization
Data broadcasting refers to sending the same data to all parallel tasks of downstream computing nodes. A common application scenario is broadcast-join. Flink's sort-shuffle implementation optimizes this process so that only one copy of the broadcast data is saved in the memory sort buffer and shuffle file, which can greatly improve the performance of data broadcast. More specifically, when writing a piece of broadcast data to the sort buffer, the piece of data will only be serialized and copied once, and when the data is written out to the shuffle file, only one copy of the data will be written. In the index file, for the data index items of different data partitions, they all point to the same piece of data in the data file. The figure below shows all the details of data broadcast optimization:
2.5 Data Compression
Data compression is a simple and effective optimization method. Test results show that data compression can improve the overall performance of TPC-DS by more than 30%. Similar to Flink's hash-based batch shuffle implementation, data compression is performed in units of network buffers, and data compression does not span data partitions, that is to say, data sent to different downstream parallel tasks are compressed separately. It happens after the data is sorted and before it is written out, and the downstream consumer task decompresses the data after receiving it. The following figure shows the entire process of data compression:
4. Test results
1. Stability
The implementation of the new sort-shuffle greatly improves the stability of Flink running batch jobs. In addition to solving the instability problems of potential file handles and inode exhaustion, it also solves some known problems of Flink's original hash-shuffle, such as FLINK-21201 (creating too many files causes the main thread to block), FLINK- 19925 (IO operations performed in network netty threads cause network stability to be affected), etc.
2. Performance
We ran the TPC-DS 10T data scale test under the concurrency of 1000 scale. The results show that, compared with Flink's original batch data shuffle implementation, the new data shuffle implementation can achieve 2-6 times performance improvement. If calculation is excluded Time, only statistical data shuffle time can be up to 10 times performance improvement. The following table shows the detailed data of the performance improvement:
On our test cluster, the data read and write bandwidth of each mechanical hard disk can reach 160MB/s:
Note: Our test environment is configured as follows. Since we have a large amount of memory, the actual data shuffle of some jobs with a small amount of shuffle data is only for reading and writing memory. Therefore, the above table only lists some shuffle operations with a large amount of data and improved performance. Obvious query:
5. Tuning parameters
In Flink, sort-shuffle is not enabled by default. If you want to enable it, you need to reduce the configuration of this parameter: taskmanager.network.sort-shuffle.min-parallelism. The meaning of this parameter is that if the number of data partitions (one computing task concurrently needs to send data to several downstream computing nodes) is lower than this value, the implementation of hash-shuffle will be used, and if it is higher than this value, sort-shuffle will be enabled. In practical application, on mechanical hard disk, it can be configured as 1, that is, sort-shuffle is used.
Flink does not enable data compression by default. For batch jobs, it is recommended to enable it in most scenarios unless the data compression rate is low. The enabled parameter is taskmanager.network.blocking-shuffle.compression.enabled.
For shuffle data writing and data reading, memory buffers are required. Among them, the size of the data write buffer is controlled by taskmanager.network.sort-shuffle.min-buffers, and the data read buffer is controlled by taskmanager.memory.framework.off-heap.batch-shuffle.size. The data write buffer is split from the network memory. If you want to increase the data write buffer, you may also need to increase the total size of the network memory to avoid errors of insufficient network memory. The data read buffer is split from the off-heap memory of the framework. If you want to increase the data read buffer, you may also need to increase the off-heap memory of the framework to avoid direct memory OOM errors. Generally speaking, a larger memory buffer can bring better performance. For large-scale batch jobs, data write buffers and read buffers of several hundred megabytes are sufficient.
Data shuffle is an important stage of batch data processing jobs. In this stage, the output data of upstream processing nodes will be persisted to external storage, and then downstream computing nodes will read and process the data. These persistent data are not only a form of data exchange between computing nodes, but also play an important role in error recovery.
Currently, there are two batch data shuffle models adopted by existing large-scale distributed computing systems, namely the hash-based method and the sort-based method:
The core idea of the hash-based method is to write the data sent to different concurrent consumption tasks downstream into a separate file, so that the file itself becomes a natural boundary to distinguish different data partitions;
The core idea based on the sort method is to first write the data of all partitions together, and then use sort to distinguish the boundaries of different data partitions.
We introduced the sort-based batch shuffle implementation into Flink in version 1.12 of Flink and continued to optimize its performance and stability in the future; by version 1.13 of Flink, sort-shuffle was already available for production.
2. The significance of introducing Sort-Shuffle
One of the important reasons why we introduce the implementation of sort-shuffle in Flink is that Flink's original hash-based implementation is not available for large-scale batch jobs. This is also proven by other existing large-scale distributed computing systems:
In terms of stability: For high-concurrency batch jobs, the hash-based implementation will generate a large number of files, and read and write these files concurrently, which will consume a lot of resources and put a lot of pressure on the file system. The file system needs to maintain a large amount of file metadata, which will generate unstable risks such as file handle and inode exhaustion.
In terms of performance: For high-concurrency batch jobs, concurrent reading and writing of a large number of files means a large number of random IOs, and the actual amount of data read and written for each IO may be very small, which is a huge challenge for IO performance. On disk, this makes data shuffling an easy performance bottleneck for batch jobs.
By introducing a sort-based batch data shuffle implementation, the number of concurrently read and written files can be greatly reduced, which is conducive to better data sequential reading and writing, thereby improving the stability and performance of Flink's large-scale batch processing jobs. In addition, the new sort-shuffle implementation can reduce memory buffer consumption. For the hash-based implementation, each data partition requires a read-write buffer, and memory buffer consumption is directly proportional to concurrency. The sort-based implementation can achieve decoupling of memory buffer consumption and job concurrency (although larger memory may bring higher performance).
More importantly, we have implemented a new storage structure and IO optimization for reading and writing, which makes Flink's batch data shuffle more advantageous than other large-scale distributed data processing systems. The following sections will introduce Flink's sort-shuffle implementation and the results obtained in more detail.
3. Implementation of Flink Sort-Shuffle
Similar to the batch data sort-shuffle implementation of other distributed systems, the entire shuffle process of Flink is divided into several important stages, including writing data to the memory buffer, sorting the memory buffer, and writing the sorted data out Read shuffle data to and from files and send downstream. However, compared with other systems, Flink's implementation has some fundamental differences, including multi-segment data storage format, omitting data merging process, and data reading IO scheduling. All of these make the implementation of Flink have better performance.
1. Design goals
Throughout the implementation of Flink sort-shuffle, we consider the following points as the main design goals:
1.1 Reduce the number of files
As discussed above, the hash-based implementation generates a large number of files, and reducing the number of files is beneficial for stability and performance. The Sort-Spill-Merge method is widely adopted by distributed computing systems to achieve this goal. First, the data is written into the memory buffer. When the memory buffer is full, the data is sorted, and the sorted data is written out to a In the file, the total number of files is: (total data volume/memory buffer size), so the number of files is reduced. When all the data is written out, the generated files are merged into one file, thereby further reducing the number of files and increasing the size of each data partition (good for sequential reading).
Compared with the implementation of other systems, Flink's implementation has an important difference, that is, Flink always appends data to the same file, instead of writing multiple files and then merging them. The advantage of this is that there is always only one file, and the number of files Minimization is achieved.
1.2 Open fewer files
Too many files opened at the same time will consume more resources, and at the same time, it will easily lead to the problem of insufficient file handles, resulting in poor stability. Therefore, opening fewer files is beneficial to improve system stability. For data writing, as described above, each concurrent task always has only one file open by always appending to the same file. For data reading, although each file needs to be read by a large number of downstream concurrent tasks, Flink still achieves the goal of opening each file only once by only opening the file once and sharing the file handle among these concurrent reading tasks.
1.3 Maximize sequential read and write
The sequential reading and writing of files is critical to the IO performance of files. By reducing the number of shuffle files, we have reduced random file IO to some extent. In addition, Flink's batch data sort-shuffle also implements more IO optimizations to maximize the sequential reading and writing of files. During the data write phase, better sequential writes are achieved by aggregating the data buffers to be written out into larger batches and writing out via the wtitev system call. In the data reading phase, by introducing read IO scheduling, data reading requests are always served in the order of file offsets to maximize the sequential reading of files. Experiments show that these optimizations greatly improve the performance of batch data shuffle.
1.4 Reduce read and write IO amplification
The traditional sort-spill-merge method increases the size of the read data block by merging the generated multiple files into a larger file. Although this implementation scheme has brought benefits, it also has some shortcomings. The final point is to amplify the read and write IO. For the data shuffle between computing nodes, if no error occurs, it only needs to write and Data is read once, but data merging causes the same data to be read and written multiple times, resulting in an increase in the total amount of IO and a large consumption of storage space.
The implementation of Flink avoids the process of merging files by continuously adding data to the same file and its unique storage structure. Although the size of a single data block is smaller than the size after merging, it avoids the overhead of merging files and combines Flink’s unique Some IO scheduling can eventually achieve higher performance than the sort-spill-merge scheme.
1.5 Reduce memory buffer consumption
Similar to the implementation of sort-shuffle in other distributed computing systems, Flink uses a fixed-size memory buffer for data caching and sorting. The size of this memory buffer has nothing to do with concurrency, so that the size of the memory buffer required for upstream shuffle data writing is decoupled from concurrency. Combined with another memory management optimization FLINK-16428, it can simultaneously realize the concurrency-independent memory buffer consumption of downstream shuffle data reading, thereby reducing the memory buffer consumption of large-scale batch jobs. (Note: FLINK-16428 applies to both batch and streaming jobs)
2. Implementation details
2.1 Memory data sorting
In the sort-spill phase of shuffle data, each piece of data is first serialized and written into the sort buffer. When the buffer is full, all binary data in the buffer are sorted in the order of the data partitions. Thereafter, the sorted data is written out to the file in the order of the data partitions. Although the data itself is not currently sorted, the interface of the sort buffer is generalized enough to implement subsequent potentially more complex sorting requirements. The interface of the sort buffer is defined as follows:
In the sorting algorithm, we chose bucket-sort with low complexity. Specifically, a 16-byte metadata is inserted in front of each piece of serialized data. Including 4-byte length, 4-byte data type and 8-byte pointer to the next piece of data in the same data partition. The structure is shown in the figure below:
When reading data from the buffer, you only need to read all the data belonging to this data partition according to the chain index structure of each data partition, and these data maintain the order when the data is written. In this way, reading all the data according to the order of the data partitions can achieve the goal of sorting according to the data partitions.
2.2 File storage structure
As mentioned earlier, the shuffle data generated by each parallel task will be written to a physical file. Each physical file contains multiple data regions, and each data region is generated by a sort-spill of the data buffer. In each data block, all data belonging to different data partitions (consumed by different parallel tasks of downstream computing nodes) are sorted and aggregated according to the sequence number of the data partitions. The figure below shows the detailed structure of the shuffle data file. Among them (R1, R2, R3) are 3 different data blocks, which correspond to 3 sort-spill writes of data. There are 3 different data partitions in each data block, which will be read by (C1, C2, C3) 3 different parallel consumer tasks. That is to say data B1.1, B2.1 and B3.1 will be processed by C1, data B1.2, B2.2 and B3.2 will be processed by C2, and data B1.3, B2.3 and B3.3 will be processed by C3 deal with.
Similar to other distributed processing system implementations, in Flink, each data file also corresponds to an index file. The index file is used to index the data (data partition) belonging to each consumer when reading. The index file contains the same data region as the data file. In each data region, there are the same number of index items as the data partition. Each index item contains two parts, corresponding to the offset of the data file and the length of the data. as an optimization. Flink caches up to 4M of index data for each index file. The corresponding relationship between data files and index files is as follows:
2.3 Read IO scheduling
In order to further improve file IO performance, based on the above storage structure, Flink further introduces an IO scheduling mechanism, which is similar to the elevator algorithm of disk scheduling. Flink's IO scheduling is always scheduled according to the file offset order of IO requests. More specifically, if a data file has n data regions, each data region has m data partitions, and m downstream computing tasks read the data file, then the following pseudocode shows Flink's IO scheduling algorithm workflow:
2.4 Data broadcast optimization
Data broadcasting refers to sending the same data to all parallel tasks of downstream computing nodes. A common application scenario is broadcast-join. Flink's sort-shuffle implementation optimizes this process so that only one copy of the broadcast data is saved in the memory sort buffer and shuffle file, which can greatly improve the performance of data broadcast. More specifically, when writing a piece of broadcast data to the sort buffer, the piece of data will only be serialized and copied once, and when the data is written out to the shuffle file, only one copy of the data will be written. In the index file, for the data index items of different data partitions, they all point to the same piece of data in the data file. The figure below shows all the details of data broadcast optimization:
2.5 Data Compression
Data compression is a simple and effective optimization method. Test results show that data compression can improve the overall performance of TPC-DS by more than 30%. Similar to Flink's hash-based batch shuffle implementation, data compression is performed in units of network buffers, and data compression does not span data partitions, that is to say, data sent to different downstream parallel tasks are compressed separately. It happens after the data is sorted and before it is written out, and the downstream consumer task decompresses the data after receiving it. The following figure shows the entire process of data compression:
4. Test results
1. Stability
The implementation of the new sort-shuffle greatly improves the stability of Flink running batch jobs. In addition to solving the instability problems of potential file handles and inode exhaustion, it also solves some known problems of Flink's original hash-shuffle, such as FLINK-21201 (creating too many files causes the main thread to block), FLINK- 19925 (IO operations performed in network netty threads cause network stability to be affected), etc.
2. Performance
We ran the TPC-DS 10T data scale test under the concurrency of 1000 scale. The results show that, compared with Flink's original batch data shuffle implementation, the new data shuffle implementation can achieve 2-6 times performance improvement. If calculation is excluded Time, only statistical data shuffle time can be up to 10 times performance improvement. The following table shows the detailed data of the performance improvement:
On our test cluster, the data read and write bandwidth of each mechanical hard disk can reach 160MB/s:
Note: Our test environment is configured as follows. Since we have a large amount of memory, the actual data shuffle of some jobs with a small amount of shuffle data is only for reading and writing memory. Therefore, the above table only lists some shuffle operations with a large amount of data and improved performance. Obvious query:
5. Tuning parameters
In Flink, sort-shuffle is not enabled by default. If you want to enable it, you need to reduce the configuration of this parameter: taskmanager.network.sort-shuffle.min-parallelism. The meaning of this parameter is that if the number of data partitions (one computing task concurrently needs to send data to several downstream computing nodes) is lower than this value, the implementation of hash-shuffle will be used, and if it is higher than this value, sort-shuffle will be enabled. In practical application, on mechanical hard disk, it can be configured as 1, that is, sort-shuffle is used.
Flink does not enable data compression by default. For batch jobs, it is recommended to enable it in most scenarios unless the data compression rate is low. The enabled parameter is taskmanager.network.blocking-shuffle.compression.enabled.
For shuffle data writing and data reading, memory buffers are required. Among them, the size of the data write buffer is controlled by taskmanager.network.sort-shuffle.min-buffers, and the data read buffer is controlled by taskmanager.memory.framework.off-heap.batch-shuffle.size. The data write buffer is split from the network memory. If you want to increase the data write buffer, you may also need to increase the total size of the network memory to avoid errors of insufficient network memory. The data read buffer is split from the off-heap memory of the framework. If you want to increase the data read buffer, you may also need to increase the off-heap memory of the framework to avoid direct memory OOM errors. Generally speaking, a larger memory buffer can bring better performance. For large-scale batch jobs, data write buffers and read buffers of several hundred megabytes are sufficient.
Related Articles
-
A detailed explanation of Hadoop core architecture HDFS
Knowledge Base Team
-
What Does IOT Mean
Knowledge Base Team
-
6 Optional Technologies for Data Storage
Knowledge Base Team
-
What Is Blockchain Technology
Knowledge Base Team
Explore More Special Offers
-
Short Message Service(SMS) & Mail Service
50,000 email package starts as low as USD 1.99, 120 short messages start at only USD 1.00