Efficient and Stable Universal Incremental Checkpoint

I. Overview

The original design intention of Generic Log-Based Incremental Checkpointing is that we separate the full state snapshot from the incremental checkpoint mechanism, and ensure that each checkpointing can be completed stably and quickly by continuously uploading the incremental Changelog, thereby reducing the The interval between Checkpointing improves the end-to-end delay of the Flink system. In terms of expansion, there are mainly three improvements as follows:

Shorter end-to-end latency: especially for Transactional Sinks. Transactional Sink can only complete the two-phase commit when Checkpoint is completed, so reducing the Checkpointing interval means more frequent commits and shorter end-to-end delays.

More stable checkpoint completion time: Currently checkpoint completion time depends largely on the size of the (incremental) state that needs to be persisted during checkpointing. In the new design, we ensure the stability of Checkpoint completion by continuously uploading increments to reduce the data that needs to be persisted during Checkpoint Flush.

Fault-tolerant recovery requires less data to be rolled back: the shorter the interval between checkpointing, the less data needs to be reprocessed after each fault-tolerant recovery.

How did that happen? We know that the main factors affecting the Flink Checkpointing time are as follows:

Speed of Checkpoint Barrier flow and alignment;
The time required to persist state snapshots to non-volatile highly available storage (such as S3).
Readers who are not familiar with the Flink Checkpoint mechanism can refer to 1.

Unaligned Checkpoint[2] introduced in Flink version 1.12 and Buffer Debloating[3] introduced in version 1.14 mainly solve the first problem above, especially in the case of back pressure. The Incremental Checkpoint [4] introduced earlier is to reduce the size of the persistent storage state required for each Checkpointing to reduce the second influencing factor, but it is not completely possible in practice: the existing Incremental Checkpoint is It is done based on RocksDB, and RocksDB will do compaction regularly for space enlargement and read performance considerations. Compaction will generate new, relatively large files, which will increase the time required for uploading. Each physical node (Task) that executes Flink jobs has at least one RocksDB instance, so the probability of Checkpoint being delayed will increase as the number of physical nodes increases. As a result, in Flink's large-scale jobs, almost every time Checkpointing is completed, it may be delayed by a certain node, as shown in the figure below.

It is also worth mentioning that under the existing Checkpointing mechanism, Task will take a state snapshot only after receiving at least one Checkpoint Barrier and start to persist the state snapshot to high-availability storage, thus increasing the Checkpoint completion time, as shown in the figure below shown.


In the new design, we can avoid this limitation and speed up the checkpoint completion time by continuously uploading incremental changelogs. Let's take a look at the detailed design.

2. Design

The core idea of Generic Log-Based Incremental Checkpointing is to introduce State Changelog (state change log), which can persist the state in a more fine-grained manner:

The operator writes two copies when updating the state, one update is written to the state table State Table, and one copy is incrementally written to the State Changelog.

Checkpoint becomes composed of two parts. The first part is the currently persisted State Table on remote storage, and the second part is the incremental State Changelog.

The persistence of the State Table is independent of the Checkpointing process, and it will be persisted by the background thread periodically, which we call the process of Materialization.

When doing Checkpoint, just ensure that the newly added State Changelog is persisted.

In the new design, the amount of data that needs to be uploaded when doing Checkpoint becomes very small, which can not only make Checkpoint more stable, but also make it more frequent. The whole workflow is shown in the figure below:

Generic Log-Based Incremental Checkpointing is similar to the WAL mechanism of traditional database systems:

Incremental changes (insert/update/delete) to data are written to the Transaction Log. Once this part of the changed log has been synchronized to persistent storage, we can consider the Transaction to be complete. This process is similar to the Checkpointing process in the above method.

At the same time, in order to facilitate data query, data changes will also be persisted in the data table (Table) asynchronously. Once the relevant part of the Transaction Log is also persisted in the data table, the relevant part of the Transaction Log can be deleted. This process is similar to the State Table persistence process in our approach.

This mechanism similar to WAL can effectively improve the speed of Checkpoint completion, but it also brings some additional overhead:

Additional network IO and additional Changelog persistent storage overhead;
Additional memory usage caused by caching Changelog;
Fault-tolerant recovery requires additional replay of the Changelog to potentially increase recovery time.
We will also analyze the impact of these three aspects in the following Benchmark comparison. Especially for point 3, the increase in fault-tolerant recovery time caused by the additional replay of the Changelog will be compensated to a certain extent by more frequent Checkpoints, because more frequent Checkpoints mean that the processed data that needs to be played back after fault-tolerant recovery is more few.

3. Changelog storage (DSTL)

A very important component of Generic Log-Based Incremental Checkpointing is the State Changelog storage part, which we call Durable Short-term Log (DSTL, short-term Log). DSTL needs to meet the following characteristics:

short-term persistence

State Changelog is a part of Checkpoint, so it also needs to be stored persistently. At the same time, the State Changelog only needs to save the Changelog from the latest state table persistence to the current Checkpoint, so it only needs to save data for a short period of time (a few minutes).

Write frequency is much higher than read frequency

The Changelog needs to be read only in the case of Restore or Rescale. In most cases, there is only an append operation, and once written, the data cannot be modified.

very short write latency

State Changelog is introduced to make Checkpoint faster (within 1s). Therefore, a single write request needs to be completed at least within the expected Checkpoint time.

Ensure consistency

If we have multiple copies of the State Changelog, there will be consistency problems between multiple copies. Once the State Changelog of a copy is persisted and confirmed by JM, it is necessary to use this copy as a benchmark to ensure semantic consistency when restoring.

From the above characteristics, we can also see why we named the Changelog storage as DSTL short-term log.

3.1 Selection of DSTL scheme

DSTL can be implemented in a variety of ways, such as distributed logs (Kafka), distributed file systems (DFS), and even databases. In the Generic Log-Based Incremental Checkpointing MVP version released by Flink 1.15, we choose DFS to implement DSTL based on the following considerations:

No additional external dependencies: Currently, Flink Checkpoint is persisted in DFS, so implementing DSTL with DFS does not introduce additional external components.
No additional state management: In the current design, the state management of DSTL is integrated with the Flink Checkpointing mechanism, so no additional state management is required.
DFS natively provides persistence and consistency guarantees: if a multi-copy distributed log is implemented, these are additional costs that need to be considered.

On the other hand, using DFS has the following disadvantages:

Higher latency: DFS generally has higher latency than distributed log systems that write to local disks.
Network I/O limitation: Most DFS providers will limit the rate and rate of single-user DFS writes due to cost considerations, which may cause network overload in extreme cases.
After some preliminary experiments, we believe that the performance of most current DFS implementations (such as S3, HDFS, etc.) can meet 80% of the use cases, and more data will be provided in the following Benchmark.

3.2 DSTL architecture
The following figure shows the DFS-based DSTL architecture diagram using RocksDB as an example. The status update is double-written through the Changelog State Backend, one is written to RocksDB, and the other is written to DSTL. RocksDB will perform materialization regularly, that is, upload the current SST file to DFS; while DSTL will continuously write the state change to DFS, and complete the flush during Checkpointing, so that the checkpoint completion time only depends on the amount of data required to be flushed. It should be noted that Materialization is completely independent of the Checkpointing process, and Materialization can also be much slower than Checkpointing. The system default value is 10 minutes.

Here are a few more issues worth discussing:

state cleanup problem

As mentioned earlier, in the new architecture, a Checkpoint consists of two parts: 1) State Table and 2) State Change Log. Both parts need to be cleaned up as needed. 1) The cleaning of this part reuses Flink's existing Checkpoint mechanism; 2) The cleaning of this part is relatively complicated, especially in the current design of the State Change Log, in order to avoid the problem of small files, it uses TM as the granularity. In the current design, we clean up the State Change Log in two parts: first, the data in the Change Log itself needs to be deleted after the State Table is materialized; The existing Flink Checkpoint cleanup mechanism [4].

DFS-related questions

small file problem
One problem with DFS is that each checkpoint will create many small files, and because the Changleog State Backend can provide more frequent checkpoints, the small file problem will become a bottleneck. To alleviate this situation, we write all State Changes of the same job on the same Task Manager to the same file. Therefore, the same Task Manager will share the same State Change Log.

Long Tail Latency Problem

To address DFS high long tail latency, DFS write requests are retried if they cannot be completed within the allowed timeout (1 second by default).

4. Analysis of Benchmark test results
The improvement of Generic Log-Based Incremental Checkpointing for Checkpoint speed and stability depends on the following factors:

The ratio of the incremental part of the State Change Log to the full state size, the smaller the increment, the better.
The ability to upload state increments without interruption. This is related to the state access mode. In extreme cases, if the operator only updates the Flink State Table before Checkpointing, the Changelog will not play much role.
The ability to group and batch upload changelogs from multiple Tasks. Changelog batch writing to DFS can reduce the number of files that need to be created and reduce the load of DFS, thereby improving stability.
The ability of the underlying State Backend to deduplicate updates of the same key before flushing the disk. Because the state change log saves the state update, not the final value, the capability of the underlying State Backend will increase the ratio of the Changelog increment to the full state size of the State Table.
The speed of writing persistent storage DFS, the faster the writing speed, the less obvious the improvement brought by the Changelog.

4.1 Benchmark configuration

In the Benchmark experiment, we use the following configuration:

4.2 ValueState Workload

Our first part of the experiment is mainly aimed at the load with different Key values for each update; this kind of load is due to the reasons for the above-mentioned points 2 and 4, the improvement of the Changelog is more obvious: the checkpoint completion time is shortened by 10 times (99.9 pct), the checkpoint size increased by 30%, and the recovery time increased by 66% - 225%, as shown in the table below.

Let's take a closer look at the Checkpoint Size section:

Table 2: Comparison of Checkpoint related indicators based on ValueState Workload's Changelog (on/off)

Checkpointed Data Size refers to the size of the uploaded data after the Checkpoint Barrier is received and the Checkpointing process starts. For Changelog, most of the data has been uploaded before the Checkpointing process starts, so this is why this indicator is much smaller when Changelog is turned on than when it is turned off.
Full Checkpoint Data Size is the total size of all files that make up the checkpoint, including files shared with previous checkpoints. Compared with the usual Checkpoint, the format of the Changelog has not been compressed and is not compact enough, so it takes up more space.

4.3 Window Workload

Sliding Window is used here. As shown in the following table, Changelog speeds up checkpoint completion time by about 3 times; but the storage amplification is much higher (consumed space is close to 45 times):

Table 3: Comparison of Checkpoint-related indicators based on Window Workload's Changelog (on/off)

The main reasons for the enlargement of the storage space of Full Checkpoint Data are:

For the Sliding Window operator, each piece of data will be added to multiple sliding windows, thus causing multiple updates. The write amplification problem of Changelog will be even bigger.
As mentioned earlier, if the underlying State Backend (such as RocksDB) has a stronger ability to update and deduplicate the same key before flushing the disk, the size of the snapshot will be smaller relative to the Changelog. In the extreme case of the Sliding Window operator, the sliding window is cleared due to failure. If the update and cleanup happen within the same checkpoint, it is likely that the data in that window is not included in the snapshot. This also means that the faster the window can be cleared, the smaller the size of the snapshot is likely to be.

5. Conclusion

Flink version 1.15 implements the MVP version of Generic Log-Based Incremental Checkpointing. Based on DFS, this version can provide checkpoint time in seconds and greatly improves the stability of checkpoint, but it also increases the cost of space to a certain extent, essentially exchanging space for time. Version 1.16 will be further improved to make it available for production, for example, we can speed up the recovery time through Local Recovery and file caching. On the other hand, the Changelog State Backend interface is universal, and we can use the same interface to connect to faster storage to achieve shorter latency, such as Apache Bookkeeper. In addition, we are studying other applications of Changelog, such as applying Changelog to Sink to achieve general end-to-end exactly-once, etc.

Related Articles

Explore More Special Offers

  1. Short Message Service(SMS) & Mail Service

    50,000 email package starts as low as USD 1.99, 120 short messages start at only USD 1.00

phone Contact Us