New developments in Flink Runtime for streaming and batch integration

1. Flow and batch integration

The goal of stream-batch integration is to provide a set of unified processing APIs for limited data and unlimited data, including Datastream API and Table/SQL API. The processing of limited data corresponds to offline processing, while the processing of unlimited data corresponds to online processing.

There are two main reasons why such a stream-batch integrated processing API is needed:

First of all, with the continuous development of real-time computing, most enterprise data processing pipelines are composed of offline processing and online processing. Using the same set of development APIs can reduce the learning and maintenance of stream processing and batch job development. cost;

In addition, in many scenarios, the user's stream processing job may be limited by delayed data or online logic changes. For example, it may take a long time for the user to initiate a comment or the online processing logic may need to be upgraded. In this case, it is necessary to use offline jobs to correct the results of the previous processing, that is, the case of backfill.

In this case, if two different APIs are used, it will be difficult to maintain the consistency of processing results, so users need a unified API to solve the above problems.

Flink's stream-batch API consists of two parts: Datastream API and Table/SQL API. Table/SQL API is a relatively high-level API that mainly provides standard SQL and its equivalent table operations, while datastream is relatively low-level. , the user can explicitly operate the operator time and state, and these two sets of APIs can be converted and used together.


For the two sets of APIs, Flink provides two different execution modes:

The streaming execution mode is implemented by retaining the state of the operator and performing incremental calculations when new data arrives. It can be used for processing both limited and unlimited data sets, and can support arbitrary processing logic, such as salt operation, which allows all historical data to be retained and supports retraction. When a piece of new data arrives, it can update the retained historical data and Reorder all the data received in history, and finally perform retraction on the previously sent results. For example, the reduce operator can be further optimized to avoid actually storing infinite historical states. In addition, in the incremental calculation, since the data arrives out of order, its access to sql is also out of order, which may lead to random io. Finally, the stream processing mode relies on timed checkpoints to support failover, which also results in certain processing overhead.

Therefore, for the processing of limited data sets, we also provide a dedicated batch processing mode. Operators are processed step by step, so they can only be used for limited data processing. In this case, the operator implementation can perform specific optimizations, such as sorting the data first, and then processing them one by one according to the key, so as to avoid the problem of infinite state random io.

Flink can guarantee that the processing results of the same limited input data can be consistent under the two execution modes. In addition, it also provides a unified pipelined region scheduler, a unified shuffle service plug-in interface, and a unified connector interface for the two different modes, providing unified support for the two interfaces.

At present, the architecture of Flink is shown in the figure above. Whether it is in terms of API or specific implementation, it has achieved the state of integration of stream and batch as a whole.

2. Semantic Enhancement and Improvement

For the definition of stream-batch integration above, we have also carried out continuous improvement and optimization in recent versions. The first part is about the enhancement and improvement of stream-batch integration semantics.

The first is to support the continued checkpoint work after some tasks end in streaming mode.

The end of the operation under the current process can be divided into two situations:

If the source is finite, the job will eventually be executed;

In the case of unlimited sources, the user can terminate the job through the stop-with-savepoint--drain command and retain a savepoint. If the drain parameter is not specified, the drain operation will not be performed. In this case, the savepoint is generally reserved for restarting the job, and it is not a case of job termination.

The previous Flink did not support checkpointing after some tasks ended, because this would cause two problems:

First, the sink of the two-phase commit relies on checkpoints to achieve end-to-end data consistency in streaming mode. In this case, the sink of the two-phase commit will first write the data into a temporary file or an external transaction. Only after the internal checkpoint of Flink succeeds, on the premise that the data before the checkpoint will not be replayed, the two-phase commit The sink can safely commit the actual transaction by renaming the file or committing the transaction. If checkpoint cannot be done after some tasks are completed, then the last part of data cannot always be submitted, and the consistency of the processing results between stream mode and batch mode cannot be guaranteed.

Second, for a mixed job that includes both limited data sources and unlimited data sources, if a checkpoint cannot be performed after part of the execution is completed, then once a failover occurs in subsequent executions, large overhead will be incurred due to rollback.

In order to solve the above problems, we need to support the function of checkpoint after some tasks are completed, and modify the process of job completion so that tasks that use two-phase submission can finally wait for a checkpoint to complete before exiting. For the case of normal end, you can wait for the completion of the next checkpoint and then exit; for the case of drain, you can wait for the completion of savepoint and then exit.

In order to enable checkpoint after some tasks are completed, we have modified the checkpoint process. First, re-identify new source tasks, that is, those tasks whose predecessor tasks have terminated but have not terminated themselves, and then send barriers from these tasks to perform normal checkpoint operations. Since the state in checkpoint is recorded in units of jobvertext, if all tasks in a jobvertext have ended, a special mark ver will be recorded in its state, and if some tasks end, all running tasks will be retained state is used as the jobvertext state, and all other jobvertext processing flows are consistent with normal checkpoints. After the job restarts after a failover occurs, the completely terminated jobvertext will be skipped, and the processing logic for other tasks is consistent with the normal processing logic.

Based on the above work, we also reorganized the process and semantics after the job is completed. In order to ensure that the sink of the two-phase commit can exit after submitting the last part of data, these tasks must be able to wait for the last checkpoint before exiting. Under the current logic, when the job ends naturally, it will first send the max watermark, and then send the EndOfPadtitionEvent. After a task receives the endofPadtitionEvent, it will call the endOfEInput(), close() and dispose() operations of the operator respectively. If you want to insert a checkpoint at the end, the best way is to insert it after the close method. Because here, the assignment has done all the work.

But there is a difference in the actual scenario, because a savepoint will be triggered in the actual scenario. After the savepoint is successful, the source will call the analytical method to end the execution and send the max watermark EndOfPadtitionEvent. The subsequent logic is consistent with the checkpoint case. Since the savepoint has been performed before, it will be very redundant if the checkpoint is performed after the close. In this case, the more appropriate way is to end the job first, then perform the savepoint operation, and submit the last part of the data at the same time as the savepoint operation.

But there is also a problem. If you want to keep the last savepoint, all tasks must wait for the same savepoint to end. In the case of a natural end, different tasks can wait for different checkpoints to exit. But in the case of savepoint, EndOfPadtitionEvent has been sent before the end of the job, which will close the network communication between tasks, so after the job is terminated, it is no longer possible to do savepoint from the original source.

In order to solve these problems, it is necessary to be able to notify all tasks to end without closing the network connection, and then initiate a savepoint operation after all tasks are finished, and close the network connection after success, so that all tasks can wait for the same savepoint state at the end And end.

To support this modification, we introduced a new EndOfDataEvent. After the task receives the EndOfDataEvent, it will call the previous processing in the EndOfPadtitionEvent. After that, the source will immediately send a barrier to trigger the savepoint operation, and the operator will execute the exit logic after it ends.

In addition, we have also renamed the previously ambiguous close() and dipose() operations to finish() and close() respectively, where finish() will only be called when the task ends normally, and close() will be called when the job ends normally and abnormally.

In the semantic part, another work we have done is Hybrid source.

Hybrid source supports users to switch back to limited stream data for processing after reading historical batch data. It is suitable for stream-batch transfer operations under the condition of consistent processing logic. That is to say, when the real-time data has been placed on the disk and the user needs to perform backfill, and the stream batch processing logic is consistent, the user can conveniently use the hybrid source to implement the job.

3. Performance optimization

In addition to work on semantics, we also made some performance optimizations at the runtime layer.

3.1 Scheduling deployment performance optimization

The first is about the performance optimization of the scheduling part. Due to the all-to-all connection in Flink, there will be n² edges between two operators with a concurrency of n. These n² edges are explicitly stored in the memory of jm, and many scheduling and deployment logics will also directly depend on It does processing, resulting in jm memory space and time and space complexity of many computations are on². This exacerbates the performance issues of scheduling and deployment as batch jobs generally have a larger size and are more fine-grained in scheduling.

In order to solve this problem, we use the symmetry of the all to all edge to reconstruct the data structure and calculation logic in the memory, and introduce the data structure of consumergroup to replace the previous executionEdge to carry out the connection relationship between operators. Unified description. This method no longer repeats the information describing the symmetry of the heap, thus avoiding the complexity of n². Based on this new description, we no longer maintain the executionEdge in memory.

In addition, we adjusted many scheduling algorithms, such as calculating the pipeline region, calculating subsequent tasks that need to be scheduled after a task ends, etc., and reduced their time complexity to O(n).

There is also some special logic in the process of calculating the pipeline region. Flink contains two types of edges in the job dag graph, pipeline edges and blocking edges. The former requires that upstream and downstream tasks must be started simultaneously and transmit data through the network, while the latter requires upstream and downstream tasks to be started sequentially and transmit data through files. Before scheduling, you first need to calculate the pipeline region. Generally speaking, you can interrupt according to the blocking side, and put all the tasks connected through the pipeline side into the same region, but there is a problem with this logic, as shown in the above figure. , because the task of concurrent 1 and the task of concurrent 2 are divided into two regions through the blocking edge, if it is directly interrupted by the blocking edge, it will be divided into two regions. And because there is an all-to-all shuffle relationship between task1 and task2, there will be a circular dependency problem on the graph composed of regions, and deadlock will occur during scheduling.

In the previous practice, we used the tarjan strong Unicom branch algorithm to identify this kind of environmental dependence. At that time, the identification was carried out directly on the execution graph, so its time complexity was O(n²), because the all to all edge exists n² connections. Through further analysis, it is found that if the authentication and identification of the pipeline is performed directly in the jobgraph, as long as there are all to all edges in the graph, there must be a circular dependency, so it can be judged directly on the jobgraph to identify all the all to all edges. Then process the non-all to all edges on the executiongraph.

In this way, the complexity of circular dependency identification can be reduced to O(n).

3.2 Deployment performance optimization

Another part of optimization is about deployment performance.

Flink will carry its shuffle descriptors when deploying tasks. For upstream, shuffle descriptors describe where data is produced, and for downstream, it describes where data needs to be pulled. The number of shuffle descriptors and ExcutionEdge is equal, so this order of magnitude is also O(n²). When calculating and serializing storage in memory, shuffle descriptors will consume a lot of CPU and memory, and the main thread will be stuck, causing TM and memory exhaustion problems.

However, due to the symmetry between upstream and downstream, many shuffle descriptors are actually duplicated. We can reduce the number of maintenance by caching shuffle descriptors.

In addition, in order to further prevent the shuffle descriptors from being too large due to excessive concurrency, resulting in memory oom, we use BlobServer to transmit shuffle descriptors instead.

After realizing the above optimization, we use a 10000×10000 all to all two-level job to test. It can be seen that the scheduling and memory usage has been reduced by more than 90%, and the deployment time has been reduced by more than 65%, which greatly improves the efficiency of scheduling and deployment. performance.

The optimization of stream execution mode scheduling and deployment greatly reduces the restart time when a job fails over, but once a failover occurs, it still takes a certain amount of time to redeploy, initialize, and load state. In order to further reduce this time, we are trying to restart only the failed node when the job fails over. The difficulty lies in how to ensure data consistency, which we are currently exploring.

Another part of runtime optimization is to dynamically adjust the buffer size through Buffer Debloating in streaming mode, so as to reduce the time required for checkpoint buffer alignment under back pressure and avoid checkpoint timeout. If back pressure occurs, when the amount of data cached in the middle of the job is too large, the size of the buffer can be reduced in time to control the size of the data in the middle cache, so as to avoid blocking the barrier due to data processing.

4. Remote Shuffle

Shuffle is a very important part of batch job execution. Because cloud native can provide a unified operation and maintenance API, reduce operation and maintenance overhead, and provide better support in the case of offline mixing and dynamic scaling, Flink has been used in recent years. This version is also actively embracing cloud native, such as providing a complete implementation of Flink on k8s and a schedule that supports dynamic scaling. However, since Flink shuffle needs to use local disks, if we want to support cloud-native Flink, we also need to implement shuffle that separates storage and computing. The architecture of separating storage and computing can make computing resources and storage resources scale independently, preventing the task manager from exiting immediately after the computing is completed, thereby improving the utilization rate of the entire resource. At the same time, it can also prevent the TM from exiting due to task execution failure and affect the stability of the shuffle file service, thus affecting the downstream execution.

In response to the above-mentioned requirements for separating shuffle from storage and computing, we have also developed a remote shuffle service internally. This function has been launched internally at the beginning of this year. After a period of trials, we open-sourced this system some time ago, and this system will be introduced in detail below.

Flink can support a variety of different scenarios, and the shuffle in different scenarios is quite different in terms of storage medium transmission and deployment methods. For example, in stream processing mode, Flink generally adopts a network-based online transmission method. Data is cached in the memory of the upstream TM and sent in time when the downstream task has free buffers. In the analysis and processing mode, in order to support the step-by-step operation of operators, Flink also needs to support the file-based offline transmission method, which is written into the offline file first, and then sent to the downstream task through the network after the downstream task is started. The offline file can exist The local TM can also exist in the remote service.

In addition, different shuffles also have many common requirements in terms of lifecycle management, metadata management, and data distribution strategies. All shuffles require the scheduler to apply for and record corresponding shuffle resources when starting upstream tasks. It is also necessary for the scheduler to carry the resource descriptor of the shuffle while deploying the downstream tasks, so that the downstream tasks can smoothly read the corresponding data. Finally, shuffle also relies on the scheduler to clean up its resources when a specific life cycle such as end or execution fails.

In order to provide unified support for different shuffle, Flink has introduced a plug-in shuffle architecture since version 1.9. A shuffle plug-in is mainly composed of two parts. The shuffle master is responsible for interacting with the scheduler on the jm side, so as to realize the function of applying for and releasing shuffle resources; and the result partition and input gate are respectively used as the data write and read ends, according to the scheduler The provided shuffle resource descriptor to output data to or read from a specific location. In all shuffle implementations, the common part is implemented by Flink. The scheduler will record the applied shuffle resources through the partition track, and maintain the life cycle of the shuffle resources according to the execution mode of the job.

Through a unified plug-in shuffle interface, Flink can simplify the complexity of different shuffle implementations and allow different shuffles to freely choose the actual storage and transmission methods.

Based on Flink's unified plug-in shuffle interface, the overall architecture of Flink remote shuffle is shown in the figure above. Its shuffle service is provided by a separate cluster, in which the shuffle manager acts as the master node of the entire cluster, responsible for managing worker nodes, and assigning and managing shuffle data sets. As the slave node of the entire cluster, the Shuffle worker is responsible for reading, writing and cleaning the data set. The Shuffle manager also monitors the Shuffle worker and the Shuffle master through the heartbeat, and deletes and synchronizes data when the heartbeat times out, so that the state of the data set in the cluster be consistent.

We have also made a lot of optimizations to the transfer process. The network part is implemented based on the credit-based protocol, which is similar to Flink's current network transmission protocol. We have also implemented a series of optimizations such as tcp connection multiplexing, compression controllable memory management, and zero copy. For the io part, we provide a mapPartition storage format that supports io scheduling optimization. Through io scheduling optimization, its access speed on hdd reaches above 140M/s.

In addition, we are currently developing a reducepartition storage format based on pre-merge, which will pre-merge data according to the downstream and store it on a specific worker. When the downstream cannot all start at the same time, it can achieve better results than mapPartition.

In terms of deployment, Remote shuffle can support a variety of different deployment methods. In addition, we also provide protocol compatibility between versions, so that when the server is upgraded, there is no need to upgrade the client. Finally, we also provide common metric operations in the system, and more operation and maintenance tools are also being actively developed.

5. Summary and Outlook

In general, Flink currently has the data processing capability of stream-batch integration that can be launched online. In the future, we will further improve this capability and provide support in more stream-batch integration scenarios. For example, compared to Hybrid source, backfill In some scenarios, if the logic of stream batch processing is inconsistent, we are also considering ways to support the way of retaining state after the end of batch processing to start stream processing jobs.

In addition, we will further improve the stability and performance of the entire system, and consider more deeply the essential differences between stream mode and batch mode, as well as the deeper connotation of stream-batch integration.

Related Articles

Explore More Special Offers

  1. Short Message Service(SMS) & Mail Service

    50,000 email package starts as low as USD 1.99, 120 short messages start at only USD 1.00

phone Contact Us