Flink Remote Shuffle open source

1. Why do we need Flink Remote Shuffle?

1.1 Background

The proposal and implementation of Flink Remote Shuffle stems from the increasing demand we have observed for the integration of streaming and batching and cloud native.

Since real-time processing can greatly improve user experience and increase product competitiveness in the market, more and more user business scenarios include both real-time and offline processing requirements. If different frameworks are used for stream processing and batch processing, it will bring a lot of inconvenience to users in framework learning, code development, and online operation and maintenance. At the same time, for many application scenarios, because real-time processing is limited by delayed data (for example, users may wait a long time to fill in comments) or business logic upgrades, offline tasks must be used for data correction, and two different frameworks are used to write two Code logic is prone to problems with inconsistent calculation results.

In response to these problems, Flink proposes a stream-batch integrated data model, which uses a set of APIs to complete the processing of real-time data and offline data. In order to support this goal, Flink designs and implements a stream-batch unified DataStream API [1] + Table / SQL API [2] + Connector [3][4], and supports stream-batch integrated scheduling at the execution layer [5] Batch execution mode optimized for batch processing [6]. In order to support the Batch mode, Flink also needs to be able to implement efficient and stable Blocking Shuffle. Flink's built-in Blocking Shuffle continues to rely on the upstream TaskManager to provide data reading services for the downstream after the upstream ends. This will cause the TaskManager to not be released immediately, reducing resource utilization, and causing the stability of the Shuffle service to be affected by the stability of Task execution. Influence.

On the other hand, because cloud native can better support offline and online mixing to improve cluster resource utilization, provide a unified operation and maintenance operation interface to reduce operation and maintenance costs, and support automatic scaling of jobs through resource dynamic orchestration, more and more More and more users start to use K8s to manage their cluster resources. Flink actively embraces cloud native. In addition to providing native support for K8s [7][8], Flink provides an Adaptive Scheduler [9] that dynamically scales according to the amount of resources, and gradually promotes the separation of State storage and computing [10]. In order to make the Batch mode better support cloud-native, the Shuffle process is the largest user of local disks. How to realize the separation of storage and computing of Blocking Shuffle, reduce the occupation of local disks, and make computing resources and storage resources no longer coupled with each other , is also a problem that must be solved.

Therefore, in order to better support stream-batch integration and cloud native, it is the only way to realize data transmission between tasks by using an independent Shuffle service.

1.2 Advantages of Flink Remote Shuffle

Flink Remote Shuffle is designed and implemented based on the above ideas. It supports many important features, including:

Separation of storage and computing: The separation of storage and computing enables independent scaling of computing resources and storage resources. Computing resources can be released immediately after computing is completed. Shuffle stability is no longer affected by computing stability.
Supports multiple deployment modes: supports deployment in Kubernetes, Yarn, and Standalone environments.
A similar Flink Credit-Based flow control mechanism is adopted to realize zero-copy data transmission, maximize the use of managed memory (managed memory) to avoid OOM, and improve system stability and performance.

Realized many optimizations including load balancing, disk IO optimization, data compression, connection multiplexing, small packet merging, etc., and achieved excellent performance and stability.

Supports Shuffle data correctness verification, and can tolerate restarts of Shuffle processes and even physical nodes.
Combined with FLIP-187: Flink Adaptive Batch Job Scheduler [11], it can support dynamic execution optimization, such as dynamically determining the concurrency of operators.

1.3 Production practice
Starting from Double Eleven in 2020, many core tasks within Ali began to choose the Flink-based stream-batch integration processing link. This is also the first time in the industry that a large-scale production practice of stream-batch integration has been completed. Through stream-batch integrated processing technology, the problem of stream-batch processing caliber consistency in scenarios such as Tmall marketing engine is solved, and the efficiency of data report development is increased by 4 to 10 times. The resource cost is doubled by shaving peaks and filling valleys at night.

As an important part of the streaming-batch integration technology, Flink Remote Shuffle has been launched since its launch. The largest cluster size has reached more than 1,000 units. It has steadily supported Tmall Marketing Engine, Tmall International and other multiple On the business side, the operation scale exceeds the PB level, which fully proves the stability and performance of the system.

2. Design and implementation of Flink Remote Shuffle

2.1 Overall Architecture of Flink Remote Shuffle

Flink Remote Shuffle is implemented based on Flink's unified plug-in Shuffle interface. As a stream-batch integrated data processing platform, Flink can adapt to a variety of different Shuffle strategies in different scenarios, such as network-based online Pipeline Shuffle, TaskManager-based Blocking Shuffle, and remote service-based Remote Shuffle.

These shuffle strategies are quite different in terms of transmission methods and storage media, but they have many common requirements in terms of the life cycle of datasets, metadata management and notification of downstream tasks, and data distribution strategies. In order to provide unified support for different types of Shuffle and simplify the implementation of new Shuffle strategies including Flink Remote Shuffle, Flink introduces a plug-in Shuffle architecture [12].

As shown in the figure below, a Shuffle plug-in is mainly composed of two parts, namely ShuffleMaster responsible for resource application and release on the JobMaster side, and InputGate and ResultPartition responsible for actual data reading and writing on the TaskManager side. After the scheduler applies for resources through ShuffleMaster, it is managed by PartitionTracker, and when the upstream and downstream tasks are started, it carries the descriptor of the Shuffle resource to describe the location of data output and reading.

Based on Flink's unified plug-in Shuffle interface, Flink Remote Shuffle provides data shuffle services through a separate cluster. The cluster adopts a classic master-slave structure, in which ShuffleManager is the master node of the entire cluster, responsible for managing worker nodes, and allocating and managing Shuffle data sets. As the slave node of the cluster, ShuffleWorker is responsible for the actual reading, writing and cleaning of the data set.

When the upstream task starts, the Flink scheduler will apply for resources from the ShuffleManager through the RemoteShuffleMaster plug-in, and the ShuffleManager will select the appropriate Worker to provide services according to the type of data set and the load of each Worker. When the scheduler gets the corresponding Shuffle resource descriptor, it will carry the descriptor when starting the upstream Task. The upstream Task sends data to the corresponding ShuffleWorker for persistent storage according to the ShuffleWorker address recorded in the descriptor. In contrast, when the downstream Task is started, it will read from the corresponding ShuffleWorker according to the address recorded in the descriptor, thus completing the entire data transmission process.

As a long-running service, the system's error tolerance and self-healing capabilities are critical. Flink Remote Shuffle monitors ShuffleWorker and ShuffleMaster through heartbeat and other mechanisms, and deletes and synchronizes the state of the data set when abnormalities such as heartbeat timeout and IO failure occur, thereby maintaining the final consistency of the entire cluster state. For more information about exception handling, please refer to Flink Remote Shuffle related documents [13].

2.2 Data Shuffle protocol and optimization
Data remote shuffle can be divided into two stages of reading and writing. In the data writing phase, the output data of the upstream computing task is written to the remote ShuffleWorker; in the data reading phase, the downstream computing task reads the output of the upstream computing task from the remote ShuffleWorker and processes it. The data shuffle protocol defines the data type, granularity, constraints, and processes in this process. Overall, the data write-read process is as follows:

In the whole process of data reading and writing, various optimization methods are implemented, including data compression, flow control, reducing data copying, using managed memory, etc.:

Credit-based flow control: Flow control is an important issue that needs to be considered in the producer-consumer model. The purpose is to avoid slow consumption and endless accumulation of data. Flink Remote Shuffle adopts a credit-based flow control mechanism similar to Flink, that is, the data sender will only send the data when the data receiver has enough buffers to receive the data. In the process of continuously processing data, the data receiving end will also feed back the released buffer to the sending end to continue sending new data, so that continuous reciprocation realizes streaming data transmission, similar to the sliding window mechanism of TCP. The credit-based flow control mechanism can well avoid invalid disk placement when the downstream receiving buffer is insufficient, and can also avoid affecting other logical links due to congestion of one logical link in the scenario of multiplexing TCP connections. If you are interested in this mechanism, please refer to Flink's blog [14].

Data compression: Data compression is a simple and effective optimization method, and its effect has been widely used and proven, so it is a must. Flink Remote Shuffle also implements data compression. Specifically, data is compressed before being written out to the remote ShuffleWorker by the producer, and decompressed after being read by the consumer from the remote ShuffleWorker. In this way, the purpose of reducing network and file IO can be achieved at the same time, while reducing the occupation of network bandwidth and disk storage space, and improving IO efficiency.
Reduce data copying: When performing network and file IO, Flink maximizes the use of direct memory (Direct Memory), which reduces the copying of Java heap memory and improves efficiency. It is also conducive to reducing the dynamic application of direct memory. Helps improve stability.

Use managed memory: For Shuffle data transmission and large blocks of memory used by file IO, Flink Remote Shuffle uses pre-applied managed memory, that is, pre-applies for memory to establish a memory pool, and subsequent memory application releases are performed in the memory pool. This reduces the overhead of dynamic application and release of memory (system calls and GC), and more importantly, helps to avoid OOM problems and greatly enhances the stability of the system.

TCP connection multiplexing: For data read or write connections from the same Flink computing node to the same remote ShuffleWorker, the same physical TCP connection will be reused, which helps reduce the number of network connections and improve the stability of data read and write.

2.3 Storage and file IO optimization

For disk shuffle, especially on mechanical hard disks, file IO will become an important bottleneck, and optimizing file IO will achieve a good acceleration effect.

In addition to the data compression mentioned above, a widely used technical solution is to merge small files or small data blocks, thereby increasing the sequential reading and writing of files, avoiding excessive random reading and writing, and finally optimizing file IO performance. For direct Shuffle between non-remote computing nodes, systems including Spark have realized the optimization of merging small pieces of data into large pieces.

As for the data merging scheme of the remote Shuffle system, according to our research, it was first proposed by Microsoft & LinkedIn & Quantcast in a paper Sailfish [15], and later included Princeton & Facebook’s Riffle [16], Facebook’s Cosco [17] ], LinkedIn’s Magnet [18], and Alibaba EMR’s Spark Remote Shuffle [19] all implement similar optimization ideas, that is, push the Shuffle data sent from different upstream computing tasks to the same downstream computing task to the same remote Shuffle service Nodes are merged, and downstream computing tasks can directly pull the merged data from these remote Shuffle service nodes.

In addition to this optimization idea, we proposed another optimization idea in the direct Shuffle implementation between computing nodes in Flink, that is, Sort-Spill plus IO scheduling. Simply put, the output data of computing tasks fills up the memory buffer After sorting the data (Sort), the sorted data is written (Spill) to the file, and avoid writing multiple files during the writing process, but always append data to the same file, and when the data is read In the process, increase the scheduling of data read requests, always read data according to the offset order of the file, and meet the read requests. Under the optimal situation, the complete sequential reading of data can be realized. The figure below shows the basic storage structure and IO scheduling process. For more specific details, please refer to the Flink blog [20] or the Chinese version [21].


Both options have some advantages and disadvantages:

In terms of fault tolerance, the data merging scheme has a lower tolerance for data loss. Since the same file contains the data generated by the merging of all concurrent computing tasks, once a file is lost, all concurrent producers need to be re-run, and the overhead is huge. Therefore, in order to To avoid this overhead, you may need to use backups to avoid recalculation, but backups also mean more file IO (reducing performance) and more storage space. In the IO scheduling scheme, for data damage or loss, only the lost data needs to be regenerated. In addition, for the failure processing of producer tasks, the way of data merging is more complicated, because it is necessary to clean up or mark the failed data segments, and then skip these data when reading, or deduplicate when reading, skip these data. For the IO scheduling scheme, it is only necessary to discard the data files generated by the failed producers.

In terms of performance, in general, both can achieve good file IO throughput. However, in special cases, the IO scheduling scheme also has some shortcomings. For example, IO scheduling depends on the data request of the consumer computing task. Pulling it up will affect the sequential reading of data and reduce file IO performance. Also, if the data itself needs to be sorted, the way the data is merged will be more beneficial because the data that needs to be sorted is in the same file. Similarly, if you need to write data to external systems such as distributed file systems, data merging is also more beneficial, because these external systems are not easy to achieve IO scheduling optimization.

In terms of the number of files, the number of files in the way of data merging is equal to the number of consumer tasks, and the number of program files for IO scheduling is equal to the number of producer tasks.

The abstraction of Flink Remote Shuffle does not exclude any optimization strategy. In fact, Flink Remote Shuffle can be regarded as an intermediate data storage service that can perceive Map-Reduce semantics. Its basic data storage unit is Data Partition. There are two basic types of data partitions, MapPartition and ReducePartition . The data contained in MapPartition is generated by an upstream computing task and may be consumed by multiple downstream computing tasks. The following diagram shows the generation and consumption of MapPartition:

The ReducePartition is generated by merging the output of multiple upstream computing tasks and consumed by a single downstream computing task. The following diagram shows the production and consumption of ReducePartition:

3. Deployment, use and evaluation

3.1 Multi-environment deployment and operation and maintenance

Supporting deployment in multiple environments and meeting differentiated deployment requirements is an important capability. Specifically, Flink Remote Shuffle supports three deployment modes: Kubernetes, YARN, and Standalone, which can meet the deployment environment requirements of most users. In each deployment mode, some convenience scripts and templates are available for users. For more detailed information, please refer to the documents: Kubernetes mode deployment [22], YARN mode deployment [23] and Standalone mode deployment [24]. Among them, the deployment of Kubernetes mode and YARN mode realizes the high availability of the master node (ShuffleManager), and the high availability of the master node deployed in Standalone mode will be supported in future versions.

In addition, the Metric system of Flink Remote Shuffle also provides several important monitoring indicators for users to monitor the running status of the entire system, including the number of active nodes, the total number of jobs, the number of available buffers on each node, and the number of data partitions , the number of network connections, network throughput, JVM indicators and other information, more monitoring indicators will be added in the future to facilitate the user's operation and maintenance operations. Users can directly access the Metric service of each process (ShuffleManager & ShuffleWorker) to query the corresponding indicator data. For details, please refer to the user documentation [25]. In the future, Metric indicator reporting capabilities will be provided, allowing users to actively report indicators to external systems such as Prometheus.

Basically, the deployment and operation and maintenance of Flink Remote Shuffle are relatively simple. In the future, we will continue to improve the deployment and operation and maintenance experience, simplify information collection and problem location, improve automation, and reduce operation and maintenance costs.

3.2 Multi-version compatibility

Since the remote Shuffle system is divided into two parts, the client and the server, the server runs as an independent cluster, and the client runs on the Flink cluster as an agent for Flink jobs to access the remote Shuffle service. In the deployment mode, there may be Many users access the same set of Shuffle services through different Flink clusters, so multi-version compatibility is an issue that users are more concerned about. The version of the Shuffle service itself will be continuously upgraded with new features or optimizations. If there is an incompatibility between the client and the server, the easiest way is to let the clients of different users also upgrade together, but this often requires the user's fit, not always possible.

It is best to be able to absolutely guarantee compatibility between versions. In order to maximize this, Flink Remote Shuffle has also done a lot of work, including:

Version information and reserved fields: Add version information and reserved fields to all protocol messages, which is conducive to maintaining compatibility when protocol fields are subsequently changed;
Increase the storage format version: the storage format version is retained in the stored data, so that the new version of the Shuffle storage node can directly take over the old data, avoiding the overhead of data regeneration;
Different processing for different versions: By doing different processing for different versions, the new version can be compatible with the logic of the old version, and the server can also use this to monitor the use of the old version of the client;
Compatible version service discovery: The client's service discovery can allow multiple versions of the Shuffle service to run at the same time, and will always look for services that are compatible with its own version.
Through these efforts, we hope to achieve full compatibility between different versions and avoid unnecessary "surprises". Of course, if you expect to use more new features and optimizations in the new version, you still need to upgrade the client version.

3.3 Stability and Performance Evaluation

Production applications show that Flink Remote Shuffle has good stability and performance. This is mainly due to many performance and stability optimizations.

The design and optimization that can improve stability include: the separation of storage and calculation makes Shuffle stability not affected by calculation stability; Credit-based flow control can send data according to the processing capacity of consumers to prevent consumers from being overwhelmed; connection multiplexing, small packets Optimizations such as merging and active network connection health checks improve network stability; maximizing the use of managed memory greatly avoids the possibility of OOM; data verification makes the system tolerant of process and even physical node restarts.

In terms of performance, data compression, load balancing, and file IO optimization have all greatly improved the performance of data shuffle. In the case of a small amount of data, since most of the Shuffle data is stored in the cache of the operating system, the performance of Flink Remote Shuffle is close to that of direct Shuffle between computing nodes, and there is not much difference. In the case of large data volume, thanks to the centralized decision-making ability (load balancing of ShuffleManager nodes, and a single ShuffleWorker node uniformly manages the IO of the entire physical machine), the performance of Flink Remote Shuffle is even better. The screenshot below shows Flink Remote Shuffle Disk IO information while running the job (TPC-DS q78):

It can be seen from the figure that we used sdd, sde, sdf, sdg, sdi, and sdk disks, and the disk throughput is still relatively high, and we will continue to optimize it later.

Related Articles

Explore More Special Offers

  1. Short Message Service(SMS) & Mail Service

    50,000 email package starts as low as USD 1.99, 120 short messages start at only USD 1.00

phone Contact Us