Fault-tolerance in Flink

1. Stateful flow computing

stream computing

Stream computing means that there is a data source that can continuously send messages, and at the same time, there is a resident program that runs the code. After receiving a message from the data source, it will process it and output the result to the downstream.

Distributed Stream Computing

Distributed stream computing refers to dividing the input stream in a certain way, and then using multiple distributed instances to process the stream.

State in Stream Computing

Computing can be divided into stateful and stateless. Stateless computing only needs to process a single event, while stateful computing needs to record and process multiple events.

Take a simple example. For example, an event consists of two parts: event ID and event value. If the processing logic is to parse and output its event value every time an event is obtained, then this is a stateless calculation; on the contrary, if every time a state is obtained, After parsing its value, it needs to be compared with the previous event value, and it is output only when it is greater than the previous event value. This is a stateful calculation.

There are many states in stream computing. For example, in the deduplication scenario, all primary keys will be recorded; or in the window calculation, the data that has entered the window and has not yet been triggered is also the state of the flow calculation; in the machine learning/deep learning scenario, the trained model and Parameter data is the status of stream computing.

2. Globally consistent snapshots

Globally consistent snapshots are a mechanism that can be used for backup and failure recovery of distributed systems.

global snapshot

What is a global snapshot

The global snapshot is firstly a distributed application, which has multiple processes distributed on multiple servers; secondly, it has its own processing logic and status inside the application; thirdly, the applications can communicate with each other; fourthly, in the This kind of distributed application has an internal state, and when the hardware can communicate, the global state at a certain moment is called a global snapshot.

Why global snapshots are needed

First, use it as a checkpoint, you can regularly back up the global state, and when the application fails, you can use it to restore;

Second, do deadlock detection. After the snapshot is taken, the current program continues to run, and then the snapshot can be analyzed to see if the application is in a deadlock state, and if so, it can be dealt with accordingly.

Examples of global snapshots

The figure below is an example of a global snapshot in a distributed system.

P1 and P2 are two processes, and there are pipelines for sending messages between them, which are C12 and C21 respectively. For the P1 process, C12 is the channel through which it sends messages, which is called an output channel; C21 is the channel through which it receives messages, which is called an input channel.

In addition to pipes, each process has a local state. For example, there are three variables XYZ and corresponding values in the memory of each process of P1 and P2. Then the local state of the P1 and P2 processes and the pipeline state for sending messages between them is an initial global state, which can also be called a global snapshot.

Suppose P1 sends a message to P2, asking P2 to change the status value of x from 4 to 7, but this message is in the pipeline and has not yet reached P2. This state is also a global snapshot.

Next, P2 has received the message from P1, but has not processed it. This state is also a global snapshot.

Finally, P2 that received the message changes the local value of X from 4 to 7, which is also a global snapshot.

So when an event occurs, the global state will change. Events include processes sending messages, processes receiving messages, and processes modifying their own state.

2. Globally consistent snapshots

Suppose there are two events, a and b, in absolute time, if a occurs before b, and b is included in the snapshot, then a is also included in the snapshot. A global snapshot that satisfies this condition is called a globally consistent snapshot.

2.1 Implementation method of globally consistent snapshot

Clock synchronization cannot achieve globally consistent snapshots; although global synchronization can be achieved, its shortcomings are also very obvious. It will stop all applications and affect global performance.

3. Asynchronous Global Consistency Snapshot Algorithm – Chandy-Lamport

The asynchronous globally consistent snapshot algorithm Chandy-Lamport can achieve globally consistent snapshots without affecting the running of applications.

The system requirements for Chandy-Lamport are as follows:

First, it does not affect the operation of the application, that is, it does not affect the sending and receiving of messages, and there is no need to stop the application;

Second, each process can record local state;

Third, the recorded state can be collected in a distributed manner;

Fourth, any process can initiate a snapshot

At the same time, there is a prerequisite for the execution of the Chandy-Lamport algorithm: the messages are ordered and not repeated, and the reliability of the messages can be guaranteed.

3.1 Chandy-Lamport algorithm process

The algorithm flow of Chandy-Lamport is mainly divided into three parts: initiation snapshot, distributed execution snapshot and termination snapshot.

Initiate snapshot

Any process can initiate a snapshot. As shown in the figure below, when P1 initiates a snapshot, the first step is to record the local state, that is, take a snapshot of the local, and then immediately send a marker message to all its output channels without a time gap. The marker message is a special message, which is different from the messages passed between applications.

After the Marker message is sent, P1 will start to record all input channel messages, which is the message of the C21 pipeline shown in the figure.

Distributed Execution Snapshots

As shown in the figure below, assume that when Pi receives the marker message from Cki, that is, the marker message sent by Pk to Pi. Can be divided into two situations:

Case 1: This is the first marker message received by Pi from other pipelines. It will first record the local state, and then record the C12 pipeline as empty, that is to say, if it sends a message from P1 later, it will not Included in this snapshot, and at the same time immediately send marker messages to all its output channels. Finally start recording messages from all input channels except Cki.

It is mentioned above that Cki messages are not included in real-time snapshots, but real-time messages will still occur, so in the second case, if Pi has received marker messages before, it will stop recording Cki messages, and at the same time, all previously recorded Cki messages will be saved. The message is saved as the final state of Cki in this snapshot.

terminate snapshot

There are two conditions for terminating a snapshot:

First, all processes have received marker messages and recorded them in local snapshots;

Second, all processes have received marker messages from their n-1 input channels and recorded the pipeline status.


When the snapshot terminates, the snapshot collector (Central Server) begins to collect snapshots of each part to form a globally consistent snapshot.

example show

In the example in the figure below, some states occur internally, such as A, which does not interact with other processes. The internal state is the message sent by P1 to itself, and A can be considered as C11=[A->].

How is the Chandy-Lamport globally consistent snapshot algorithm implemented?

Assuming that a snapshot is initiated from p1, when it initiates a snapshot, it first takes a snapshot of the local state, which is called S1, and then immediately sends marker messages to all its output channels, namely P2 and P3, and then records all of them. The message of the input channel, that is, the message from P2 and P3 and itself.

As shown in the legend, the vertical axis is the absolute time. According to the absolute time, why is there a time difference between P3 and P2 when they receive the marker message? Because if this is a distributed process in a real physical environment, the network conditions between different nodes are different, which will lead to differences in message delivery time.

P3 receives the marker message first, and it is the first marker message it receives. After receiving the message, it will first take a snapshot of the local state, then mark the C13 pipeline as close, and at the same time start sending marker messages to all its output channels, and finally it will send all input channels from except C13 The message starts to be logged.

It is P1 that receives the marker information sent by P3, but this is not the first marker it receives, it will immediately close the pipeline from the C31 channel, and take the current record message as a snapshot of the channel, and then receive the information from The news of P3 will not be updated in this snapshot status.

Next P2 receives a message from P3, which is the first marker message it receives. After receiving the message, it first takes a snapshot of the local state, then marks the C32 pipeline as close, and at the same time starts sending marker messages to all its output channels, and finally it will send all the input channels except C32 Messages begin to be logged.

Let’s look at the message received by P2 from P1. This is not the first marker message received by P2, so it will close all input channels and record the status of the channel.

Next look at P1 receiving a message from P2, which is not the first message it has received either. Then it will close all input channels and use the logged message as status. Then there are two states in it, one is C11, which is a message sent by oneself; the other is C21, which is sent by H in P2 to P1D.

At the last point in time, P3 receives a message from P2, which is not the first message it receives, and the operation is the same as described above. During this time P3 has an event J locally, and it will also have J as its state.

When all processes have recorded the local state, and all the input pipes of each process have been closed, then the global consistency snapshot is over, that is, the global state recording of the past point in time is completed.

3.3 The relationship between Chandy-Lamport and Flink

Flink is a distributed system, so Flink will use globally consistent snapshots to form checkpoints to support failure recovery. The main differences between Flink's asynchronous global consistency snapshot algorithm and the Chandy-Lamport algorithm are as follows:

First, Chandy-Lamput supports strongly connected graphs, while Flink supports weakly connected graphs;

Second, Flink uses a tailored (Tailored) Chandy-Lamput asynchronous snapshot algorithm;

Third, Flink's asynchronous snapshot algorithm does not need to store Channel state in DAG scenarios, thus greatly reducing the storage space of snapshots.

3. Flink's fault tolerance mechanism

Fault tolerance is to restore to the state before the error. There are three types of stream computing fault-tolerant consistency guarantees: Exactly once, At least once, and At most once.

Exactly once means that each event will only affect the state once. The "once" here is not strictly end-to-end once, but means that it is only processed once inside Flink, excluding source and sink processing.

At least once means that each event will affect the state at least once, that is, there is the possibility of repeated processing.

At most once means that each event will affect the state at most once, that is, the state may be lost when an error occurs.

End-to-end Exactly once
Exactly once means that the result of the job is always correct, but it is likely to be produced multiple times; so its requirement is to have a replayable source.

End-to-end Exactly once means that the job result is correct and will only be produced once. It requires not only a replayable source, but also a transactional sink and idempotent output results.

Flink's state fault tolerance

Many scenarios will require the semantics of Exactly once, that is, processing and processing only once. How to ensure semantics?

An Exactly Once Fault-Tolerant Approach for Simple Scenarios

The method for a simple scenario is as shown in the figure below. The method is to record the local state and record the offset of the source, that is, the location of the Event log.

State Fault Tolerance in Distributed Scenarios

In a distributed scenario, we need to generate globally consistent snapshots for multiple operators with local state without interrupting operations. The job topology of Flink distributed scenarios is quite special. It is a directed acyclic and weakly connected graph. You can use the tailored Chandy-Lamport, that is, only record all input offsets and the status of each operator, and rely on rewindable source (can be The source of backtracking, that is, the earlier time point can be read through offset), so there is no need to store the state of the channel, which can save a lot of storage space in the presence of aggregation logic.

Finally, restore. Restoration is to reset the location of the data source, and then restore the state of each operator from the checkpoint.

3. Flink's distributed snapshot method

First, insert a Checkpoint barrier into the source data stream, which is the marker message in the Chandy-Lamport algorithm mentioned above. Different Checkpoint barriers will naturally divide the stream into multiple segments, and each segment contains Checkpoint data. ;

There is a global coordinator in Flink. Unlike Chandy-Lamport, which can initiate snapshots for any process, this centralized coordinator will inject checkpoint barriers into each source, and then start snapshots. After each node receives the barrier, because it does not store the Channel state in Flink, it only needs to store the local state.

After the Checkpoint is completed, each concurrency of each operator will send a confirmation message to the Coordinator. When the confirmation messages of all tasks are received by the Checkpoint Coordinator, the snapshot ends.

4. Process demonstration

As shown in the figure below, assuming that Checkpoint N is injected into the source, the source will first record the offset of the partition it is processing.

As time goes by, it will send the Checkpoint barrier to two concurrent downstreams. When the barrier reaches the two concurrents, the two concurrents will record their local states in the Checkpoint respectively:

Finally, the barrier reaches the final subtask, and the snapshot is complete.

This is a relatively simple scene demonstration. Each operator only has a single-stream input. Let’s look at the more complex scene in the figure below, where an operator has multiple input streams.

When an operator has multiple inputs, it is necessary to align the barriers. How to align the Barrier? As shown in the figure below, in the original state on the left, when one of the barriers arrives, the barrier on the other barrier command is still in the pipeline and has not yet arrived. Block directly, and then wait for the data processing of another stream. When another stream arrives, the previous stream will be unblocked and the barrier will be sent to the operator.

In this process, the effect of blocking one of the streams is to cause it to generate back pressure. Barrier alignment causes backpressure and pauses the operator's data processing.

If the data pipeline that has received the barrier is not blocked during the alignment process, and the data continues to flow in, then the data belonging to the next Checkpoint will be included in the current Checkpoint. If a fault occurs and the source is rewinded, some data will be lost. There will be repeated processing, which is at-least-once. If you can receive at-least-once, you can choose other side effects that can avoid barrier alignment. In addition, asynchronous snapshots can be used to minimize task pauses and support multiple Checkpoints at the same time.

5. Snapshot trigger

Synchronous uploading of local snapshots to the system requires a state copy-on-write mechanism.

If data processing resumes after taking a snapshot of the metadata information, how to ensure that the restored application logic will not modify the uploaded data during the data upload process? In fact, the processing of different state storage backends is different. The Heap backend will trigger the copy-on-write of data, and for the RocksDB backend, the LSM feature can ensure that the snapshotted data will not be modified.

4. State management of Flink

1. Flink state management

First, you need to define a state. In the example below, define a Value state first.

When defining the state, the following information needs to be given:

*Status identification ID
*Status data type
* local state backend registration state
* Local state backend read and write state

2. Flink state backend

Also called state backend, there are two types of Flink state backends;


The first type, JVM Heap, the data in it exists in the form of Java objects, and reading and writing are also done in the form of objects, so the speed is very fast. But there are also two disadvantages: the first disadvantage is that the space required for object storage is many times the size of the serialized and compressed data on the disk, so it takes up a lot of memory space; the second disadvantage is that although reading and writing No serialization is required, but it needs to be serialized when forming a snapshot, so its asynchronous snapshot process will be slower.

The second type, RocksDB, needs to be serialized when reading and writing, so its reading and writing speed is relatively slow. But it has an advantage. The LSM-based data structure will form an sst file after the snapshot. Its asynchronous checkpoint process is the process of file copy, and the CPU consumption will be relatively low.

Related Articles

Explore More Special Offers

  1. Short Message Service(SMS) & Mail Service

    50,000 email package starts as low as USD 1.99, 120 short messages start at only USD 1.00

phone Contact Us