Flink's new generation of stream computing and fault tolerance

1. The critical path of highly available stream computing

The two-way axis in the above figure is a map of big data application delays over time. The farther to the right, the shorter the time delay requirements, and the lower the delay requirements are to the left. At the beginning of Flink's birth, it was probably in the middle of the above figure. It can be understood that the flow calculation corresponds to the right, and the batch calculation corresponds to the left. In the past one or two years, Flink's application graph has been greatly expanded to the left, which is what we often call stream-batch integration; at the same time, we have never stopped pushing the graph toward a more real-time direction.

Flink started with streaming computing, so what does it mean to advance in a more real-time direction? What is more real-time and extreme streaming computing?

In the case of normal processing, the Flink engine framework itself has almost no additional overhead except for periodically taking Checkpoint snapshots, and a large part of Checkpoint snapshots are asynchronous, so Flink is very efficient under normal processing, with end-to-end delays Around 100 milliseconds. Just because it needs to support efficient processing, Flink has a relatively high cost when doing fault-tolerant recovery and rescale: the entire job needs to be stopped and then restored from the past snapshot checkpoint as a whole. This process takes about a few seconds. In the job state In relatively large cases, it will reach the minute level. If you need to warm up or start other service processes, the time will be even longer.

Therefore, the key point of Flink's extreme stream computing is the fault-tolerant recovery part. The ultimate flow computing mentioned here refers to scenarios that have certain requirements for latency, stability, and consistency, such as risk control and security. This is also the problem that Fault Tolerance 2.0 aims to solve.

2. Fault Tolerance 2.0 and key issues

Fault-tolerant recovery is a full-link problem, including failure detect, job cancel, new resource application scheduling, state recovery and reconstruction, etc. At the same time, if you want to recover from the existing state, you must do Checkpoint during normal processing, and make it lightweight enough so that it will not affect normal processing.

Fault tolerance is also a multi-dimensional issue. Different users and different scenarios have different requirements for fault tolerance, mainly including the following aspects:

Data consistency (Data Consistency), some applications such as online machine learning can tolerate partial data loss;

Latency, in some scenarios, the requirements for end-to-end latency are not so high, so the work to be done during normal processing and fault-tolerant recovery can be averaged;

Recovery behavior (Recovery Behavior), such as a large screen or real-time update of the report, may not need to be fully restored quickly, and more importantly, the first piece of data should be restored quickly;

Cost (Cost), users are willing to pay different prices for fault tolerance according to their own needs. To sum up, we need to consider this issue from different angles.

In addition, fault tolerance is not only a problem on the Flink engine side. The combination of Flink and cloud-native is an important direction for Flink in the future, and our dependence on cloud-native also determines the design and direction of fault tolerance. We expect to take advantage of the convenience brought by cloud native through very simple weak dependencies, such as across region durability, and finally be able to elastically deploy stateful Flink applications like native stateless applications.

Based on the above considerations, we also have different focuses and directions in Flink Fault Tolerance 2.0.

First, from the perspective of scheduling, every time an error is restored, all task nodes corresponding to the global snapshot will not be rolled back, but only a single or part of the nodes that failed will be restored. This pair needs to be warmed up or It is necessary for scenarios where a single node takes a long time to initialize, such as online machine learning scenarios. Some work related to this, such as Approximate Task-local Recovery has been launched in VVP; Exactly-once Task-local Recovery, we have also achieved some results.

Next, we will focus on Checkpoint and the parts related to cloud native.

3. Data recovery process in Flink

So, what exactly does fault tolerance solve? In my opinion, its essence is to solve the problem of data recovery.

Flink data can be roughly divided into the following three categories. The first is meta information, which is equivalent to the minimum set of information required for a Flink job to run, including Checkpoint address, Job Manager, Dispatcher, Resource Manager, etc. Fault tolerance is guaranteed by the high availability of systems such as Kubernetes/Zookeeper, and is not within the scope of fault tolerance we are discussing. After the Flink job runs, it will read data from the data source and write it to the Sink. The data that flows in the middle is called the processed intermediate data Inflight Data (the second type). For stateful operators such as aggregation operators, after processing the input data, operator state data (the third category) will be generated.

Flink will periodically take snapshots of the state data of all operators and upload them to the durable and stable mass storage (Durable Bulk Store). This process is called Checkpoint. When an error occurs in a Flink job, it will roll back to a previous snapshot checkpoint for Checkpoint recovery.

We are currently doing a lot of work to improve the efficiency of Checkpointing, because in actual work, most Oncall or work order issues at the engine layer are basically related to Checkpoint, and various reasons will cause Checkpointing to time out.

Let's briefly review the Checkpointing process. Students who are familiar with this part can skip it directly. The Checkpointing process is divided into the following steps:

Step 1: Checkpoint Coordinate is inserted into the Checkpoint Barrier from the Source side (the yellow vertical bar in the picture above).

Step 2: The barrier will flow downstream with the intermediate data processing. When passing through the operator, the system will take a synchronous snapshot of the current state of the operator, and upload the snapshot data to the remote storage asynchronously. In this way, the influence of all the previous input data of the barrier on the operator has been reflected in the state of the operator. If the operator status is large, it will affect the time to complete Checkpointing.

Step 3: When an operator has multiple inputs, the operator needs to get all the input barriers before starting to take a snapshot, which is the part of the blue box in the above figure. It can be seen that if there is backpressure during the alignment process, the data flow in the intermediate processing will be slow, and those lines without backpressure will also be blocked, and Checkpoint will be done very slowly, or even impossible.

Step 4: After the intermediate state data of all operators are successfully uploaded to the remote stable storage, a complete Checkpoint is truly completed.

From these four steps, we can see that there are two main factors that affect the fast and stable checkpointing. One is the slow flow of intermediate data processed, and the other is that the operator status data is too large, resulting in slow uploading. Let’s talk about it below. Talk about how to solve these two factors.

4. Stable, fast and efficient Checkpointing

For the slow flow of intermediate data, you can:

Find a way not to be blocked by intermediate data: Unaligned Checkpoint - directly skip the blocked intermediate data;
Or make the intermediate data less enough: Buffer Debloating.
In view of the large state data, we need to make the uploaded data state small enough every time we do Checkpoint: Generalized Log-Based Incremental Checkpoint.
Each solution is described in detail below.

4.1 Unaligned Checkpoint

The principle of Unaligned Checkpoint is to instantly push the skipped intermediate data of the Barrier inserted from the Source to the Sink, and put the skipped data together in the snapshot. So for Unaligned Checkpoint, its state data includes not only the state data of the operator, but also the intermediate data processed, which can be understood as a complete instantaneous snapshot of the entire Flink Pipeline, as shown in the yellow box in the above figure. Although Unaligned Checkpoint can do Checkpoint very quickly, it needs to store additional Pipeline Channel intermediate data, so the state that needs to be stored will be larger. Unaligned Checkpoint was released last year in Flink-1.11 version. Flink-1.12 and 1.13 versions support Rescaling of Unaligned Checkpoint and dynamic switching from Aligned Checkpoint to Unaligned Checkpoint.

4.2 Buffer Debloating

The principle of Buffer Debloating is to reduce the data in the upstream and downstream caches without affecting the throughput and latency. After observation, we found that the operator does not need a large input/output buffer. Caching too much data does not help much except that the job fills up the entire pipeline when the data flow is slow, and the job memory exceeds OOM.

A simple estimate can be made here. For each task, whether it is output or input, the total number of buffers we have is roughly the number of exclusive buffers corresponding to each channel multiplied by the number of channels plus the number of common floating buffers. The total number of buffers is multiplied by the size of each buffer, and the result is the total size of the local buffer pool. Then we can substitute the system default value into the calculation, and we will find that the concurrency is slightly larger and the data shuffle is more several times, and the flowing data in the middle of the entire job can easily reach several Gigabytes.

In practice, we don't need to cache so much data, we only need enough data to ensure that the operator does not idle, which is exactly what Buffer Debloating does. Buffer Debloating can dynamically adjust the size of the total upstream and downstream buffers, and minimize the buffer size required by the job without affecting performance. The current strategy is that the upstream will dynamically cache the data that the downstream can process in about one second. In addition, Buffer Debloating is also good for Unaligned Checkpoint. Because Buffer Debloating reduces the data flowing in the middle, when Unaligned Checkpoint takes a snapshot, the intermediate data that requires additional storage will also be reduced.

The above figure is a time comparison chart of Checkpointing time changing with Debloat Target in the case of Buffer Debloating under back pressure. Debloat Target refers to the data that can be processed by the downstream within the "expected time" of the upstream cache. In this experiment, the Flink job has a total of 5 Network Exchanges, so the total checkpointing time is approximately equal to 5 times the Debloat Target, which is basically consistent with the experimental results.

4.3 Generalized Log-Based Incremental Checkpoint
As mentioned earlier, the size of the state will also affect the time to complete Checkpointing, because Flink's Checkpointing process consists of two parts: synchronous snapshot and asynchronous upload. The synchronization process is usually very fast, and it is enough to flush the state data in memory to disk. However, the part of asynchronously uploading status data is related to the amount of uploaded data, so we introduced Generalized Log-Based Incremental Checkpoint to control the amount of data that needs to be uploaded for each snapshot.

For a stateful operator, after its internal state changes, the update will be recorded in the State Table, as shown in the figure above. When checkpointing occurs, take RocksDB as an example, the State Table will be flushed to the disk, and the disk file will be uploaded to the remote storage asynchronously. Depending on the checkpoint mode, the uploaded part can be a full checkpoint or a checkpoint incremental part. But no matter which mode it is, the size of the uploaded file is strongly bound to the State Backend storage. For example, although RocksDB also supports incremental checkpoints, once multi-layer compaction is triggered, many new files will be generated. In this case, the incremental part will be even larger than a complete checkpoint, so the upload time is still uncontrollable.

Since it is the upload process that causes Checkpointing to time out, separating the upload process from the Checkpointing process can solve the problem. This is actually what Generalized Log-Based Incremental Checkpoint wants to do: in essence, it completely separates the Checkpointing process from the State Backend storage compaction.

The specific implementation method is as follows: For a stateful operator, in addition to recording the state update in the State Table, we will also write an increment to the State Changelog, and asynchronously flush them to the remote storage. In this way, Checkpoint becomes composed of two parts. The first part is the State Table that is currently materialized and stored on the remote storage, and the second part is the incremental part that has not yet been materialized. Therefore, when actually doing Checkpoint, the amount of data that needs to be uploaded will become less and more stable. Not only can Checkpoint be made more stable, but it can also be done more frequently. The end-to-end delay can be greatly shortened. Especially for Exactly Once Sink, because a complete Checkpoint needs to be completed before the two-phase commit can be completed.

5. Cloud Native Fault Tolerance and Elastic Scaling

In the context of cloud native, rapid expansion and contraction is a major challenge for Flink, especially after the Flink-1.13 version introduced the Re-active Scaling mode, Flink jobs need to do Scaling-In/Out frequently, so Rescaling has become a Re-active Scaling mode. -active's main bottleneck. The problems to be solved by Rescaling and Failover are similar to a large extent: for example, after a machine is removed, the system needs to quickly perceive, reschedule and restore the state, etc. Of course, there are also differences. During failover, you only need to restore the state and pull the state back to the operator; but during rescaling, because the topology will change the parallelism, you need to redistribute the state.


When restoring the state, we first need to read the state data from the remote storage to the local, and then redistribute the state according to the read data. As shown in the figure above, the entire process will take more than 30 minutes for a single concurrency when the state is slightly larger. And in practice, we found that the time required for state reallocation is much longer than the time for reading state data from remote storage.

So how is the state redistributed? The state of Flink is segmented using Key Group as the smallest unit, which can be understood as mapping the Key Space of the state to a set of positive integers starting from 0. This set of positive integers is the Key Group Range. This Key Group Range is related to the maximum concurrency allowed by the operator. As shown in the figure above, when we change the operator concurrency from 3 to 4, the state of the reassigned Task1 is spliced from parts of the original two Task states, and this spliced state is continuous and has no intersection, so we can use this feature to do some optimization.

As can be seen from the figure above, after optimization, the optimization effect of the DB Rebuild part is still very obvious, but this part of the work is still in the exploratory stage, and there are many problems that have not been resolved, so there is no clear community plan for the time being.

Finally, briefly review the content of this article. We first discussed why fault tolerance is required, because fault tolerance is the key path of Flink stream computing; then we analyzed the factors that affect fault tolerance. Fault tolerance is a full-link problem, including Failure Detection, Job Canceling, new resource application scheduling, status Recovery and reconstruction, etc., need to weigh and think about this issue from multiple dimensions; currently our focus is mainly on how to do Checkpoint stably and quickly, because many practical problems are related to doing Checkpoint; finally we discussed how to integrate Some exploratory work that combines fault tolerance with elastic scaling in the context of cloud native.

Related Articles

Explore More Special Offers

  1. Short Message Service(SMS) & Mail Service

    50,000 email package starts as low as USD 1.99, 120 short messages start at only USD 1.00

phone Contact Us