New Progress of Flink Runtime for Stream-batch Integration

New Progress of Flink Runtime for Stream-batch Integration introduction

Stream-batch Integration . Alibaba technical expert Gao Yun (Yun Qian )'s sharing at FFA 2021
stream-batch integration is to provide a unified set of processing APIs for limited data and unlimited data, including Datastream API and Table/SQL API. The processing of limited data corresponds to offline processing, while the processing of unlimited data corresponds to online processing.

Need for such a set of integrated processing APIs:
•First of all, with the continuous development of real-time computing, most enterprise data processing pipelines are composed of offline processing and online processing. Using the same set of development APIs can reduce the learning and maintenance in the process of stream processing and batch job development. cost;
•In addition, in many scenarios, the user's stream processing job may be limited by delayed data or online logic changes. For example, it may take a long time for the user to initiate a comment or the online processing logic may need to be upgraded. In this case, it is necessary to use an offline job to correct the previously processed results, that is, the case of backfill.
In this case, if two sets of different APIs are used, it will be difficult to maintain the consistency of processing results, so users need a unified set of APIs to solve the above problems.

Stream-batch Integration . Flink The streaming-batch API consists of the Datastream API and the Table/SQL API. The Table/SQL API is a relatively high-level API, mainly providing standard SQL and its equivalent table operations, while datastream is relatively low-level. The user can explicitly operate the operator time and state, and the two sets of APIs can be used in combination with each other.

the two sets of APIs, Flink provides two different execution modes:

Stream-batch Integration .The stream execution mode is implemented by performing incremental calculations when new data arrives such as the state of the reserved operator. It can be used for the processing of limited data sets and unlimited data sets at the same time, can support arbitrary processing logic, such as salt operation, it allows to retain all historical data and supports retraction, when a new piece of data arrives, it can update the retained historical data and Reorder all the data received in the history, and finally perform retraction on the results sent before . The reduce operator, for example, can be further optimized to avoid actually storing infinite historical states. In addition, in incremental computing, since the data arrives out of order, its access to SQL is also out of order, which may lead to random io . Finally, the stream processing mode relies on timed checkpoints to support failover, which also results in a certain processing overhead.

Stream-batch Integration . Therefore, for the processing of limited data sets, we also provide a dedicated batch mode, and the operators are processed in a step-by-step manner, so they can only be used for limited data processing. In this case, the operator implementation can perform specific optimizations, such as sorting the data first, and then processing them one by one by key, so as to avoid the problem of infinite state random io .
Flink can guarantee that in the two execution modes, the processing results of the same limited input data can be consistent. In addition, it also provides a unified pipelined region scheduler, a unified shuffle service plug-in interface, and a unified connector interface for two different modes, providing unified support for the two interfaces.

The current architecture of Flink is shown in the figure above. Both in terms of API and specific implementation, it has achieved the state of stream-batch integration as a whole .

2. Stream-batch integration.Semantic enhancement and improvement

For the above definition of stream-batch integration, we have also continued to improve and optimize it in recent versions. The first part is about the enhancement and improvement of stream-batch integration semantics.

The first is to support continuing to do checkpoint work after some tasks are completed in streaming mode.

The end of the job under the current process can be divided into two situations:
•If the source is limited, the job will eventually be executed;
•In the case of unlimited sources, the user can terminate the job with the stop-with- savepoint --drain command and keep a savepoint . If the drain parameter is not specified, the drain operation will not be performed. In this case, the savepoint is generally reserved to restart the job, and it does not belong to the case of job termination.
The previous Flink did not support checkpointing after part of the task, because this would cause two problems:

Stream-batch Integration .First, two-phase commit sinks rely on checkpoints in streaming mode to achieve end-to-end data consistency. In this case, the sink of two-phase commit will first write the data to the temporary file or external transaction. Only after the checkpoint inside Flink is successful, the data before the checkpoint will not be replayed, the two-phase commit will be performed. The sink can safely commit the actual transaction by renaming the file or committing the transaction. If the checkpoint cannot be done after part of the tasks, the last part of the data cannot be submitted, and the consistency of the processing results in the stream mode and the batch mode cannot be guaranteed.

Second, for a mixed job that includes both limited data sources and unlimited data sources, if checkpoint cannot be performed after part of the execution, once failover occurs in subsequent executions, it will cause large overhead due to rollback.

In order to solve the above problems, we need to support the function of checkpointing after the end of some tasks, and modify the process of the end of the job, so that the tasks submitted in two stages can wait for a checkpoint to complete before exiting. For normal termination , you can wait for the next checkpoint to complete and exit; for drain, you can wait for savepoint to complete and exit.

In order to realize checkpoint after some tasks are finished, we have modified the checkpoint process. First, re-identify the new source task, that is, those tasks whose previous tasks have been terminated but have not yet terminated, and then start to send barriers for normal checkpoint operations from these tasks. Since the state in the checkpoint is recorded in the unit of jobvertext, if all tasks in a jobvertext have ended, a special mark ver will be recorded in its state . If part of the task ends, all running tasks will be retained. state is used as jobvertext state, and the processing flow of all other jobvertexts is consistent with normal checkpoints. After the job fails over and restarts, the completely terminated jobvertext will be skipped , and the processing logic of other tasks is consistent with the normal processing logic.

Based on the above work, we have also re-organized the process and semantics after the end of the job. In order to ensure that sinks committed in two phases can exit after submitting the last part of the data, these tasks must be able to wait for the last checkpoint before exiting. Under the current logic, when the job ends naturally, the max watermark will be sent first, and then the EndOfPadtitionEvent will be sent . After a task receives the endofPadtitionEvent , it will call the endOfEInput (), close() and dispose() operations of the operator respectively. If you want to insert a checkpoint at the end, the best way is to insert it after the close method. Because here, the homework has done all the work.
However, it is different in the actual scene, because a savepoint will be triggered in the actual scene. After the savepoint is successful, the source will call the analytical method to end the execution and send the max watermark EndOfPadtitionEvent . The subsequent logic is consistent with the checkpoint case. Since savepoint has been performed before , if checkpoint is performed after close, it will be very redundant. A more appropriate way in this case is to perform the end of the job first, then perform the savepoint operation, and submit the last part of the data at the same time as the savepoint operation.

Stream-batch Integration .But this also has a problem. If you want to keep the last savepoint , then all tasks must wait for the same savepoint to end. In the case of natural end, different tasks can wait for different checkpoints to exit. But in the case of savepoint , the EndOfPadtitionEvent has been sent before the job ends , which will close the network communication between tasks, so after the job is terminated, the savepoint can no longer be done from the original source .

Stream-batch Integration .In order to solve these problems, it must be possible to notify all tasks to end without closing the network link, and then initiate a savepoint operation after all tasks end, and then close the network link after success, so that all tasks can wait for the last same savepoint state And end.

To support this modification, we have introduced a new EndOfDataEvent . After the task receives the EndOfDataEvent , it will call the previous processing in the EndOfPadtitionEvent . After that, the source will immediately send a barrier to trigger the savepoint operation, and the operator will execute the exit logic after it ends.

In addition, we also renamed the previously ambiguous close() and dispose () operations to finish() and close() respectively, where finish() will only be called when the task ends normally , while close() will be called both when the job ends normally and when it ends abnormally.

In the semantic part, another work we do is Hybrid source.

Stream-batch Integration .Hybrid source supports users to read historical batch data , and then switch back to limited stream data for processing. It is suitable for stream-batch transfer operations under the condition of consistent processing logic. That is, when the real-time data has been placed on the disk and the user needs to perform backfill, and the stream batch processing logic is consistent, the user can easily use the hybrid source to implement the job.

Stream-batch Integration.Performance optimization

In addition to the semantic work, we also made some performance optimizations at the runtime layer.

3.1 Scheduling and deployment performance optimization

The first is about the performance optimization of the scheduling part. Due to the all-to-all connection relationship in Flink , there will be n² edges between two operators with n concurrency. These n² edges are explicitly stored in jm 's memory, and many scheduling and deployment logic will also directly depend on It processes, resulting in jm memory space and time and space complexity of many computations are on². This can exacerbate scheduling and deployment performance issues as batch jobs are generally larger in size and scheduling is more fine-grained.

In order to solve this problem, we use the symmetry of the all to all edge to reconstruct the data structure and calculation logic in the memory, and introduce the data structure of the comsumergroup to replace the previous executionEdge to carry out the connection relationship between the operators. Unified description. In this way, the information describing the symmetry of the heap is not repeated, thereby avoiding the complexity of n². Based on this new description, we no longer maintain the executionEdge in memory .

In addition, we adjusted many scheduling algorithms, such as calculating the pipeline region, calculating the subsequent tasks that need to be scheduled after a task ends, and so on, reducing their time complexity to O(n).

There is also some special logic in the process of calculating the pipeline region. Flink includes two kinds of edges in the job dag graph, pipeline edge and blocking edge. The former requires that upstream and downstream tasks must be started at the same time and transmit data through the network, while the latter requires upstream and downstream tasks to start sequentially and transmit data through files. Before scheduling , the pipeline region needs to be calculated first. Generally speaking, it can be interrupted according to the blocking edge, and all tasks connected by the pipeline edge can be placed in the same region, but there is a problem with this logic, as shown in the figure above. , because the task of concurrency 1 and the task of concurrency 2 are divided into two regions through the blocking edge, if it is directly interrupted by the blocking edge, it will be divided into two regions. And because there is an all-to-all shuffle relationship between task1 and task2, there will be a circular dependency problem on the graph composed of regions, and a deadlock will occur during scheduling.

In the previous practice, we used the tarjan strong-connection branch algorithm to identify this environmental dependency. At that time, the identification was performed directly on the excutiongraph , so its time complexity was O(n²), because the all-to-all edge existed n² connection. Through further analysis, it is found that if the pipeline authentication and identification is performed directly in the jobgraph , as long as there are all to all edges in the graph, there must be a circular dependency, so it can be directly judged on the jobgraph to identify all all to all edges, Then, the edges that are not all to all are processed on the executiongraph.
In this way, the complexity of circular dependency identification can be reduced to O(n).

3.2Stream-batch Integration. Deployment performance optimization

Another part of optimization is about deployment performance.
Flink will carry its shuffle descriptors when deploying a task. For upstream, shuffle descriptors describe the location of data output, and for downstream, it describes the location where data needs to be pulled. The number of shuffle descriptors and ExecutionEdges is equal, so this order of magnitude is also O(n²). When serializing and storing calculations in memory, shuffle descriptors will consume a lot of CPU and memory, jam the main thread, and cause TM and memory exhaustion problems.

However, due to the symmetry of upstream and downstream, many shuffle descriptors are actually duplicates. We can reduce the amount of maintenance by caching shuffle descriptors.

In addition, in order to further prevent the shuffle descriptors from being too large due to excessive concurrency, resulting in memory oom , we use BlobServer to transfer shuffle descriptors.

After implementing the above optimizations, we used a 10000×10000 all to all two-level job for testing. It can be seen that the scheduling and memory usage have been reduced by more than 90%, and the deployment time has been reduced by more than 65%, which greatly improves the scheduling and deployment efficiency. performance.

optimization of scheduling and deployment in streaming execution mode greatly reduces the restart time when a job fails over, but once a failover occurs, it still takes a certain amount of time to redeploy, initialize, and load state. To further reduce this time, we are trying to restart only the failing node when the job fails over. The difficulty lies in how to ensure data consistency, which we are currently exploring.

Another part of runtime optimization is to dynamically adjust the buffer size through Buffer Debloating in streaming mode, so as to reduce the time required for checkpoint buffer alignment under backpressure and avoid checkpoint timeout. If back pressure is generated, when the amount of data cached in the middle of the job is too large, the size of the buffer can be reduced in time to control the size of the data in the middle cache, so as to avoid blocking the barrier due to data processing.

4. Remote Shuffle

Shuffle is a very important part of batch job execution. Since cloud native can provide a unified operation and maintenance API, reduce operation and maintenance overhead, and provide better support in the case of offline co-location and dynamic scaling, Flink has been in recent several years. This version is also actively embracing cloud native, such as providing a complete implementation of Flink on k8s and a schedule that supports dynamic scaling. However, since Flink shuffle needs to use local disks, if we want to support cloud-native Flink , we also need to implement shuffle that separates storage and computing. The architecture that separates storage and computing can scale computing resources and storage resources independently, preventing the task manager from being unable to exit immediately after computing is completed, thereby improving the utilization of the entire resource. At the same time, it can also avoid the failure of task execution to cause TM to exit and affect the stability of the shuffle file service, thereby affecting the downstream execution.

In response to the above-mentioned requirements for separating shuffle from storage and computation, we have also developed the remote shuffle service internally, which has been launched internally at the beginning of this year. After a period of trial, we have open sourced this system some time ago, and we will introduce this system in detail below.

Stream-batch Integration.Flink can support a variety of different scenarios, and shuffle in different scenarios is quite different in terms of storage medium transmission and deployment methods. For example, in the stream processing mode, Flink generally adopts the network-based online transmission method. The data is cached in the memory of the upstream TM and sent in time when the downstream task has free buffers. In the analysis processing mode, in order to support the step-by-step operation of operators, Flink also needs to support the file-based offline transmission method, which is written to the offline file first, and then sent to the downstream task through the network after the downstream task is started. The offline file can exist. In the local TM, it can also exist in the remote service.
In addition, different shuffles also have many common requirements in terms of lifecycle management, metadata management, and data distribution strategies. All shuffles require the scheduler to apply for corresponding shuffle resources and record them when starting upstream tasks. The scheduler also needs to carry the resource descriptor of shuffle while deploying the downstream task, so that the downstream task can read the corresponding data smoothly. Finally, shuffle also relies on the scheduler to clean up its resources when a specific life cycle such as the end or execution fails.

In order to provide unified support for different shuffles, Flink has introduced a plug- in shuffle architecture since version 1.9. A shuffle plug-in is mainly composed of two parts. The shuffle master is responsible for interacting with the scheduler on the jm side , so as to realize the function of applying and releasing shuffle resources; while the result partition and input gate are used as the write and read sides of the data respectively. According to the scheduler The provided shuffle resource descriptor to output data to or read from a specific location. In all shuffle implementations, the common parts are implemented by Flink uniformly. The scheduler records the shuffle resources that have been applied for through partition track, and maintains the life cycle of shuffle resources according to the execution mode of the job.
Through a unified plug-in shuffle interface, Flink can simplify the complexity of different shuffle implementations, and allow different shuffles to be freely selected in terms of actual storage and transmission methods.

Stream-batch Integration.Based on Flink 's unified plug-in shuffle interface, the overall architecture of Flink remote shuffle is shown in the figure above. Its shuffle service is provided by a separate cluster, in which the shuffle manager is the master node of the entire cluster, responsible for managing worker nodes, and distributing and managing shuffle data sets. As the slave node of the entire cluster, the Shuffle worker is responsible for reading, writing and cleaning the data set. The Shuffle manager also monitors the Shuffle worker and Shuffle master through the heartbeat, and deletes and synchronizes the data when the heartbeat times out, so as to make the state of the data set in the cluster. be consistent.

Stream-batch Integration.We have also made a lot of optimizations to the transfer process. The network part is implemented based on the credit-based protocol, which is similar to Flink 's current network transmission protocol. We also implement a series of optimizations such as tcp connection multiplexing, compression controllable memory management and zero copy. In the io part, we provide a mapPartition storage format that supports io scheduling optimization . Through io scheduling optimization, its access speed on hdd reaches more than 140M/s.

In addition, we are currently developing a storage format based on pre-merge reducepartition , which will pre-merge data according to the downstream and store it on a specific worker. When the downstream cannot all start at the same time, it can achieve better results than mapPartition .

Stream-batch Integration.In terms of deployment, Remote shuffle can support a variety of different deployment methods. In addition, we also provide protocol compatibility between versions, so that when the server is upgraded, there is no need to upgrade the client. Finally, we also provide the operation of common metrics in the system, and more operation and maintenance tools are also under active development.

V. Summary and Outlook

In general, Flink currently has the data processing capability of stream-batch integration that can be launched. In the future, we will further improve this capability and provide support in more stream-batch fusion scenarios, such as hybrid source, in backfill In the scenario of , if the logic of stream batch processing is inconsistent, we are also considering supporting the way to retain the state for starting stream processing jobs after batch processing.

Stream-batch Integration.In addition, we will further improve the stability and performance of the entire system, and consider the essential differences between streaming mode and batch mode, as well as the deeper connotation of stream-batch integration .

Related Articles

Explore More Special Offers

  1. Short Message Service(SMS) & Mail Service

    50,000 email package starts as low as USD 1.99, 120 short messages start at only USD 1.00