Flink's new generation of stream computing and fault tolerance - stage summary and outlook

Introduction: Apache Flink engine architect and Alibaba storage engine team leader Mei Yuan's speech at FFA 2021

Abstract: This article is compiled from the speech of Mei Yuan, Apache Flink engine architect and Alibaba storage engine team leader, at the core technology special session of Flink Forward Asia 2021. The content of this speech focuses on Flink 's high availability to discuss the core issues and technology selection of Flink 's new generation of stream computing, including:
1.Critical Path of Flink High Availability Stream Computing
2.Fault Tolerance 2.0 and Key Issues
3.data recovery process
4.Stable, fast and efficient Checkpointing
5.Fault tolerance and elastic scaling under cloud native

1.【Flink's new generation of stream computing and fault tolerance】 Critical Path of High Availability Flow Computing



The two-way axis in the above figure is the graph of the time delay of big data applications. The further to the right, the shorter the time delay requirement is, and the farther to the left, the lower the delay requirement. At the beginning of Flink 's birth, it was probably in the middle of the above figure. It can be understood that the right side corresponds to streaming computing, and the left side corresponds to batch computing. In the past year or two, Flink 's application graph has been greatly expanded to the left, which is what we often call the integration of streams and batches; at the same time, we have never stopped advancing the graph in a more real-time direction.
Flink started with streaming computing, so what does it mean to move to a more real-time direction? What is more real-time and extreme streaming computing?
In the case of normal processing, the Flink engine framework itself has almost no additional overhead except for regularly taking snapshots of checkpoints, and a large part of the snapshots of checkpoints are asynchronous, so Flink is very efficient under normal processing, and the end-to-end delay around 100ms. Because of the need to support efficient processing, Flink will cost a lot when doing fault-tolerant recovery and rescale: the entire job needs to be stopped and then restored from the past snapshot checkpoint as a whole . This process takes about a few seconds. When the state is relatively large, it will reach the minute level. It takes longer if you need to warm up or start other service processes.
Therefore, the key point of Flink 's extreme stream computing is the fault-tolerant recovery part. The ultimate stream computing mentioned here refers to scenarios that have certain requirements for latency, stability, and consistency, such as risk control security. This is also the problem that Fault Tolerance 2.0 will solve.

2. 【Flink's new generation of stream computing and fault tolerance】Fault Tolerance 2.0 and key issues



Fault-tolerant recovery is a whole-link problem, including failure detect, job cancel, new resource application scheduling, state recovery, and reconstruction. At the same time, if you want to recover from the existing state, you must do Checkpoint during the normal processing, and make it lightweight enough so that it will not affect the normal processing.
Fault tolerance is also a multi-dimensional problem. Different users and different scenarios have different requirements for fault tolerance, including the following aspects:
•Data Consistency, some applications such as online machine learning can tolerate partial data loss;
•Latency. In some scenarios, the end-to-end delay requirements are not so high, so the work to be done during normal processing and fault-tolerant recovery can be comprehensively averaged;
•Recovery behavior (Recovery Behavior), such as in the scenario of large screen or real-time report update, may not require full recovery quickly, and more importantly, recover the first piece of data quickly;
•Cost: Users are willing to pay different prices for fault tolerance according to their own needs. In summary, we need to consider this issue from a different perspective.
In addition, fault tolerance is not just an issue on the Flink engine side. The combination of Flink and cloud native is an important direction for Flink in the future. Our dependence on cloud native also determines the design and trend of fault tolerance. We expect to take advantage of the convenience brought by cloud native through very simple weak dependencies, such as across region durability, and finally be able to deploy stateful Flink applications as elastically as native stateless applications.
Based on the above considerations, our work in Flink Fault Tolerance 2.0 also has different focuses and directions.
First, from the perspective of scheduling, every time an error is recovered, all task nodes corresponding to the global snapshot will not be rolled back, but only a single or part of the failed nodes will be recovered. This pair needs to be warmed up or It is necessary for scenarios where the initialization time of a single node is very long, such as online machine learning scenarios. Some related work such as Approximate Task-local Recovery has been launched in VVP; Exactly-once Task-local Recovery, we have also achieved some results.
Next, let's focus on Checkpoint and the parts related to cloud native.

3. 【Flink's new generation of stream computing and fault tolerance】Data recovery process in Flink



So, what exactly does fault tolerance solve? In my opinion, its essence is to solve the problem of data recovery.

Flink data can be roughly divided into the following three categories. The first is meta information, which is equivalent to the minimum set of information required to run a Flink job, including Checkpoint address, Job Manager, Dispatcher, Resource Manager, etc. Fault tolerance is guaranteed by the high availability of systems such as Kubernetes/Zookeeper, and is not within the scope of fault tolerance we are discussing. After the Flink job runs, it will read data from the data source and write it to the sink. The data flowing in the middle is called the processed intermediate data Inflight Data (the second type). For stateful operators such as aggregation operators, the operator state data (the third category) is generated after processing the input data.
Flink will periodically take snapshots of the state data of all operators and upload them to the durable and stable mass storage (Durable Bulk Store). This process is called checkpointing. When an error occurs in a Flink job, it will roll back to a snapshot checkpoint in the past for Checkpoint recovery.
We currently have a lot of work to improve the efficiency of Checkpointing, because in actual work, most of the Oncall or work order problems in the engine layer are basically related to Checkpoint, and various reasons will cause Checkpointing to time out.
The following is a brief review of the Checkpointing process. Students who are familiar with this part can skip it directly. The process of Checkpointing is divided into the following steps:

Step 1: Checkpoint Coordinate inserts Checkpoint Barrier (yellow vertical bar in the picture above) from the Source side.

Step 2: The barrier will flow downstream with the intermediate data processing. When it flows through the operator, the system will make a synchronous snapshot of the current state of the operator, and asynchronously upload the snapshot data to the remote storage. In this way, all the influences of Barrier's previous input data on the operator have been reflected in the state of the operator. If the operator state is very large, it will affect the completion time of Checkpointing.

Step 3: When an operator has multiple inputs, the operator needs to get all the input barriers before starting to take snapshots, which is the part of the blue box in the above figure. It can be seen that if there is back pressure during the alignment process, the flow of intermediate processing data will be slow, and those lines without back pressure will also be blocked, and Checkpoint will be very slow or even impossible.

Step 4: After the intermediate state data of all operators are successfully uploaded to the remote stable storage, a complete Checkpoint is truly completed.
As can be seen from these four steps, there are two main factors that affect the fast and stable checkpointing. One is the slow flow of intermediate data processed, and the other is that the operator state data is too large, resulting in slow uploading. Let’s talk about one Talk about how to solve these two factors.

4. 【Flink's new generation of stream computing and fault tolerance】Stable, fast and efficient Checkpointing



For the slow flow of intermediate data, you can:
1.Find a way not to be blocked by intermediate data: Unaligned Checkpoint - skip the blocked intermediate data directly;
2.Or make the data in the middle less enough: Buffer Debloating.
3.For the status data is too large, we need to make the data status uploaded every time checkpoint small enough: Generalized Log-Based Incremental Checkpoint.
Each solution is described in detail below.

4.1 Unaligned Checkpoint

The principle of Unaligned Checkpoint is to instantly push the intermediate data skipped by the Barrier inserted from the Source to the Sink, and the skipped data will be placed in the snapshot together. So for Unaligned Checkpoint, its state data includes not only the operator's state data, but also the processed intermediate data, which can be understood as a complete instantaneous snapshot of the entire Flink Pipeline, as shown in the yellow box in the above figure. Although Unaligned Checkpoint can do Checkpoint very quickly, it needs to store additional intermediate data of Pipeline Channel, so the state that needs to be stored will be larger. Unaligned Checkpoint was released in Flink-1.11 version last year. Flink-1.12 and 1.13 versions support Rescaling of Unaligned Checkpoint and dynamic switching from Aligned Checkpoint to Unaligned Checkpoint.
4.2 Buffer Debloating
The principle of Buffer Debloating is to reduce upstream and downstream cached data without affecting throughput and latency. After observation, we found that the operator does not require a large input/output buffer. Caching too much data doesn't help much other than making the job fill up the entire pipeline when the data flow is slow, making the job's memory overrun OOM.

A simple estimate can be made here. For each task, whether it is output or input, our total number of buffers is probably the number of exclusive buffers corresponding to each channel multiplied by the number of channels plus the number of public floating buffers. The total number of buffers is multiplied by the size of each buffer, and the result is the size of the total local buffer pool. Then we can calculate the default value of the system, and we will find that the concurrency is slightly larger and the data is shuffled several times, and the flowing data in the middle of the entire job can easily reach several Gigabytes.
In practice, we don't need to cache so much data, we only need enough data to ensure that the operator does not idle, which is exactly what Buffer Debloating does. Buffer Debloating can dynamically adjust the size of the total upstream and downstream buffers to minimize the buffer size required by the job without affecting performance. The current strategy is that the upstream will dynamically cache the data that the downstream can process in about a second. In addition, Buffer Debloating is also good for Unaligned Checkpoint . Because Buffer Debloating reduces the data flowing in the middle, when the Unaligned Checkpoint takes a snapshot, the intermediate data that needs to be additionally stored will also be less.

The above picture is a time comparison chart of Checkpointing time varying with Debloat Target under the condition of back pressure of Buffer Debloating. Debloat Target refers to the data that can be processed by the downstream within the "expected time" of the upstream cache. In this experiment, the Flink job has a total of 5 Network Exchanges, so the total time required for Checkpointing is approximately equal to 5 times the Debloat Target, which is basically consistent with the experimental results.
4.3 Generalized Log-Based Incremental Checkpoint
As mentioned earlier, the state size also affects the time to complete Checkpointing, because Flink 's Checkpointing process consists of two parts: synchronous snapshots and asynchronous uploads. The synchronization process is usually very fast, and the state data in memory is flushed to disk. But the part of asynchronously uploading state data is related to the amount of data uploaded, so we introduced Generalized Log-Based Incremental Checkpoint to control the amount of data that needs to be uploaded for each snapshot.

For a stateful operator, after its internal state changes, the update will be recorded in the State Table, as shown in the figure above. When Checkpointing occurs, taking RocksDB as an example, the State Table will be flushed to the disk, and the disk file will be asynchronously uploaded to the remote storage. Depending on the Checkpoint mode, the uploaded part can be a full Checkpoint or an incremental Checkpoint. But no matter which mode it is, the size of the uploaded file is strongly bound to the State Backend storage. For example , although RocksDB also supports incremental checkpoints, once a multi-layer compaction is triggered, many new files will be generated. In this case, the incremental part will even be larger than a complete checkpoint, so the upload time is still uncontrollable.

Since it is the upload process that causes the Checkpointing to time out, separating the upload process from the Checkpointing process can solve the problem. This is actually what Generalized Log-Based Incremental Checkpoint wants to do: in essence, it completely separates the Checkpointing process from the State Backend Storage Compaction.
The specific implementation method is as follows: For a stateful operator, in addition to recording the state update in the State Table, we also write an increment to the State Changelog, and asynchronously flush them to the remote storage. In this way, Checkpoint becomes composed of two parts, the first part is the State Table that has been materialized and stored on the remote storage, and the second part is the incremental part that has not been materialized. Therefore, when doing Checkpoint, the amount of data that needs to be uploaded will become less and more stable, which can not only make Checkpoint more stable, but also make it more frequent. End-to-end delay can be greatly reduced. Especially for Exactly Once Sink, because it is necessary to complete the complete Checkpoint before completing the two-phase submission.

5. 【Flink's new generation of stream computing and fault tolerance】Fault tolerance and elastic scaling under cloud native


In the context of cloud native, rapid scaling is a major challenge for Flink . Especially after the Re-active Scaling mode was introduced in Flink-1.13, Flink jobs need to perform Scaling-In/Out frequently, so Rescaling has become a Re-active Scaling mode. -active is the main bottleneck. The problems to be solved by rescaling and fault tolerance (Failover) are largely similar: for example, after a machine is removed, the system needs to quickly perceive it, and it needs to be rescheduled and restored to the state. Of course, there are differences. In Failover, you only need to restore the state and pull the state back to the operator; but in Rescaling, because the topology will change the parallelism, the state needs to be redistributed.

When the state is restored, we first need to read the state data from the remote storage to the local, and then redistribute the state according to the read data. As shown in the figure above, if the state of the whole process is slightly larger, a single concurrency will exceed 30 minutes. And in practice, we found that the time required for state redistribution is much greater than the time required to read state data from remote storage.

So how is the state reassigned? The state of Flink is divided by Key Group as the smallest unit, which can be understood as mapping the Key Space of the state to a positive integer set starting from 0. This positive integer set is the Key Group Range. This Key Group Range is related to the maximum concurrency allowed by the operator. As shown in the figure above, when we change the operator concurrency from 3 to 4, the state of the reassigned Task1 is spliced by a part of the original two Task states, and this splicing state is continuous and has no intersection, so we can use this feature to do some optimizations.

As can be seen from the above figure, after the optimization, the optimization effect of DB Rebuild is still very obvious, but this part of the work is still in the exploratory stage, and there are many problems that have not yet been solved, so there is no clear community plan for the time being.
Finally, briefly review the content of this article. We first discussed why fault tolerance is needed, because fault tolerance is the key path of Flink flow computing; then we analyzed the factors that affect fault tolerance. Fault tolerance is a whole-link problem, including Failure Detection, Job Canceling, new resource application scheduling, status Restoration and reconstruction, etc., need to be weighed and considered from multiple dimensions. At present, our focus is mainly on how to do checkpointing stably and quickly, because many practical problems are now related to doing checkpointing. Finally, we discussed how to Some exploratory work on the combination of fault tolerance and elastic scaling in the context of cloud native.


Copyright statement: The content of this article is contributed by Alibaba Cloud's real-name registered users. The copyright belongs to the original author. The Alibaba Cloud developer community does not own the copyright and does not assume the corresponding legal responsibility. For specific rules, please refer to the " Alibaba Cloud Developer Community User Service Agreement " and " Alibaba Cloud Developer Community Intellectual Property Protection Guidelines ". If you find any content suspected of plagiarism in this community, fill out the infringement complaint form to report it. Once verified, this community will delete the allegedly infringing content immediately.

Related Articles

Explore More Special Offers

  1. Short Message Service(SMS) & Mail Service

    50,000 email package starts as low as USD 1.99, 120 short messages start at only USD 1.00