Analysis of Flink 1.15's new functional architecture: efficient and stable general incremental checkpoint

1、 Overview

The original intention of Generic Log Based Incremental Checkpointing is that we separate the full state snapshot from the incremental checkpoint mechanism. By continuously uploading the Incremental Changelog, we can ensure that each Checkpointing can be completed stably and quickly, thus reducing the interval between Checkpointing and improving the end-to-end delay of the Flink system. Broadly speaking, there are three major improvements:

1: Shorter end-to-end delay: especially for Transactional Sink. Transactional Sink can only complete two-phase submission when the checkpoint is completed. Therefore, reducing the checkpoint interval means more frequent submission and shorter end-to-end delay.

2: More stable checkpoint completion time: At present, the checkpoint completion time depends largely on the size of the (incremental) state that needs to be persisted during checkpoint. In the new design, we ensure the stability of checkpoint completion by continuously uploading incremental data to reduce the persistent data required for checkpoint flush.

3: Less data needs to be rolled back for fault tolerant recovery: the shorter the interval between checkpointing, the less data needs to be reprocessed after each fault tolerant recovery.

How did you do that? We know that the main factors affecting Flink Checkpointing time are as follows:

1: Checkpoint Barrier flow and alignment speed;

2; The time required to persist state snapshots to non-volatile highly available storage (such as S3).

Readers who do not know much about the Flink Checkpoint mechanism can refer to 1.

The Unaligned Checkpoint [2] introduced in Flink 1.12 and Buffer Deblocking [3] introduced in Flink 1.14 mainly solve the first problem above, especially in the case of backpressure. The incremental checkpoint [4] introduced earlier is to reduce the size of the persistent storage state required for each checkpoint, so as to reduce the second influencing factor. However, it is not completely possible in practice: the existing incremental checkpoint is completed based on RocksDB, which regularly performs comparisons for space amplification and read performance. Comparison will generate new and relatively large files, which will increase the time required for uploading. Each physical node (Task) executing Flink job has at least one RocksDB instance, so the probability of checkpoint being delayed will increase with the increase of physical nodes. As a result, in Flink's large jobs, almost every time a checkpoint is completed, it may be delayed due to a node, as shown in the following figure.

In addition, it is worth mentioning that under the existing checkpoint mechanism, a task will only take a state snapshot and start to persist the state snapshot to the highly available storage after receiving at least one checkpoint barrier, thus increasing the checkpoint completion time, as shown in the following figure.

In the new design, we can avoid this limitation and accelerate the checkpoint completion time by continuously uploading incremental changelogs. Now let's take a look at the detailed design.

2、 Design

The core idea of Generic Log Based Incremental Checkpointing is to introduce the State Changelog, which allows more fine-grained persistence of the state:

1: The operator writes double copies when updating the state, one copy of the update is written to the state table, and the other is written to the state changelog incrementally.

2: Checkpoint is composed of two parts. The first part is the current persistent state table on remote storage, and the second part is the incremental state changelog.

3: The persistence of the State Table is independent of the checkpoint process, and it is periodically persisted by the background thread. We call it the materialization process.

4: When doing a checkpoint, just ensure that the new State Changelog is persistent.

In the new design, the amount of data uploaded when doing checkpoint becomes less, which can not only make checkpoint more stable, but also more frequent. The whole workflow is shown in the following figure:

Generic Log Based Incremental Checkpointing is similar to the WAL mechanism of traditional database systems:

1: The incremental changes (insert/update/delete) of data will be written to the Transaction Log. Once the changed logs are synchronized to the persistent storage, we can consider that the Transaction has been completed. This process is similar to the Checkpointing process in the above method.

2: At the same time, in order to facilitate data query, data changes will also be asynchronously persisted in the data table. Once the relevant parts in the Transaction Log are also persisted in the data table, the relevant parts in the Transaction Log can be deleted. This process is similar to the State Table persistence process in our method.

This mechanism similar to WAL can effectively improve the speed of checkpoint completion, but it also brings some additional overhead:

1: Additional network IO and additional Changelog persistent storage overhead;

2: The additional memory usage brought by the cache changelog;

3: Fault tolerant recovery requires additional replay of the Changelog, which increases the potential recovery time.

In the Benchmark comparison, we will also analyze the impact of these three aspects. Especially for the third point, the increase in fault tolerance recovery time caused by the additional replaying of the Changelog will, to some extent, be offset by more frequent checkpoints, because more frequent checkpoints mean less processing data needs to be replayed after fault tolerance recovery.

3、 Changelog storage (DSTL)

An important component of Generic Log Based Incremental Checkpointing is the storage of the State Changelog, which we call the Durable Short term Log (DSTL). DSTL needs to meet the following characteristics:

Short term persistence

The state changelog is a part of the checkpoint, so it also needs to be able to persist. At the same time, the State Changelog only needs to save the changelog from the last persisted State Table to the current checkpoint, so it only needs to save the data for a short time (several minutes).

The writing frequency is much higher than the reading frequency

Only in the case of Restore or Rescale, you need to read the Changelog. In most cases, you only need to append, and once written, the data cannot be modified.

Very short write latency

State Changelog is introduced to make checkpoint faster (within 1s). Therefore, a single write request needs to be completed at least within the expected checkpoint time.

Ensure consistency

If we have multiple copies of the State Changelog, the consistency problem between multiple copies will occur. Once the state changelog of a replica is persisted and confirmed by JM, the replica needs to be used as a benchmark to ensure semantic consistency during recovery.

From the above features, we can also see why we name the Changelog storage as DSTL Short Storage Log.

3.1 Selection of DSTL scheme

DSTL can be implemented in many ways, such as distributed logging (Kafka), distributed file system (DFS), and even database. In the Generic Log Based Incremental Checkpointing MVP released by Flink 1.15, we choose DFS to implement DSTL based on the following considerations:

No additional external dependencies: At present, Flink Checkpoint is persistent in DFS, so implementing DSTL with DFS does not introduce additional external components.

No additional state management: The state management of DSTL in the current design scheme is integrated with the Flink Checkpointing mechanism, so no additional state management is required.

DFS natively provides persistence and consistency assurance: if multiple copies of distributed logs are implemented, these are additional costs to consider.

On the other hand, using DFS has the following disadvantages:

Higher latency: DFS generally has higher latency than distributed log systems that write to local disks.

Network I/O limitation: Most DFS suppliers will limit the current and speed of single user DFS writes due to cost considerations. In extreme cases, network overload may occur.

After some preliminary experiments, we believe that the performance of most current DFS implementations (such as S3, HDFS, etc.) can meet 80% of the use cases. Benchmark will provide more data later.

3.2 DSTL architecture

The following figure shows the architecture of DFS based DSTL with RocksDB as an example. Status updates are written to RocksDB and DSTL through the Changelog State Backend. RocksDB will regularly carry out materialization, that is, upload the current SST file to DFS; The DSTL will continue to write the state change to DFS and complete flushing during Checkpointing. In this way, the checkpoint completion time only depends on the amount of data required for flushing. It should be noted that Materialization is completely independent of the process of Checkpointing, and the frequency of Materialization can be much slower than that of Checkpointing. The default value of the system is 10 minutes.

Here are a few questions that are worth discussing:

Status Cleanup Issues

It was mentioned earlier that in the new architecture, a checkpoint consists of two parts: 1) State Table and 2) State Change Log. Both parts need to be cleaned as needed. 1) The cleaning of this part reuses the existing checkpoint mechanism of Flink; 2) The cleaning of this part is relatively complex, especially in the current design of the State Change Log, in order to avoid the problem of small files, the granularity is TM. In the current design, we clean up the State Change Log in two parts: first, the data of the Change Log itself needs to delete its corresponding part after the State Table is materialized; Second, the cleaning of the part of the Change Log that becomes the Checkpoint is integrated into the existing Flink Checkpoint cleaning mechanism [4].

DFS related issues

Small file problem

One problem with DFS is that each checkpoint will create many small files. Because the Changleog State Backend can provide more frequent checkpoints, the problem of small files will become a bottleneck. To alleviate this situation, we write all the state changes of the same job on the same Task Manager to the same file. Therefore, the same Task Manager will share the same State Change Log.

Long tail delay problem

In order to solve the problem of DFS high long tail delay, DFS write requests will retry when they cannot be completed within the allowed timeout (default is 1 second).

4、 Benchmark test result analysis

The improvement of Checkpoint speed and stability by Generic Log Based Incremental Checkpointing depends on the following factors:

The ratio of the state change log increment to the full state size. The smaller the increment, the better.

The ability to continuously upload status increments. This is related to the state access mode. In extreme cases, if the operator only updates the Flink State Table before Checkpointing, the Changelog does not play a significant role.

The ability to batch upload changelogs from multiple tasks in groups. Changelog group batch writing DFS can reduce the number of files to be created and reduce DFS load, thus improving stability.

The ability of the underlying State Backend to de duplicate updates to the same key before disk flushing. Because the state change log saves state updates rather than final values, the ability of the underlying State Backend will increase the ratio of the changelog increment to the full state size of the State Table.

The speed of writing persistent storage DFS. The faster the writing speed is, the less obvious the improvement brought by the Changelog.

4.1 Benchmark Configuration

In the Benchmark experiment, we used the following configuration:

Operator parallelism: 50

Running time: 21h

State Backend:RocksDB (Incremental Checkpoint Enabled)

Persistent storage: S3 (Presto plugin)

Machine model: AWS m5. xlarge (4 slots per TM)

Checkpoint interval: 10ms

State Table Materialization Interval: 3m

Input Rate:50K Events /s

4.2 ValueState Workload

In the first part of our experiment, we mainly focus on the load with different key values each time; Due to the reasons of points 2 and 4 above, the changelog improves significantly: the checkpoint completion time is reduced by 10 times (99.9 pct), the checkpoint size is increased by 30%, and the recovery time is increased by 66% - 225%, as shown in the following table.

Table 1: Comparison of indicators of Changelog based on ValueState Workload

Let's take a closer look at Checkpoint Size:

Table 2: Comparison of Checkpoint Related Indicators of Changelog (Open/Close) Based on ValueState Workload

Checkpointed Data Size refers to the size of the uploaded data after receiving the Checkpoint Barrier and starting the Checkpointing process. For the Changelog, most data has been uploaded before the Checkpointing process starts. This is why the indicator is much smaller when the Changelog is enabled than when the Changelog is closed.

Full Checkpoint Data Size is the total size of all files constituting the checkpoint, including the files shared with the previous checkpoint. Compared with the common checkpoint, the format of the changelog has not been compressed and is not compact enough, so it takes up more space.

4.3 Window Workload

The Sliding Window is used here. As shown in the following table, Changelog accelerates checkpoint completion time by about 3 times; But the storage amplification is much higher (consumes nearly 45 times of space):

The main reasons for enlarging the storage space of Full Checkpoint Data are:

For the Sliding Window operator, each data will be added to multiple Sliding windows, thus causing multiple updates. The problem of write amplification of the changelog will be even greater.

As mentioned earlier, if the underlying State Backend (such as RocksDB) has a strong ability to update and de duplicate the same key before disk flushing, the size of the snapshot will be smaller relative to the changelog. In the extreme case of the Sliding Window operator, the sliding window will be cleared due to failure. If the update and cleanup occur in the same checkpoint, it is likely that the data in this window is not included in the snapshot. This also means that the faster the window is cleared, the smaller the size of the snapshot may be.

5、 Conclusion

Flink version 1.15 implements the MVP version of Generic Log Based Incremental Checkpointing. Based on DFS, this version can provide checkpoint time of about seconds and greatly improve the stability of checkpoint, but it also increases the cost of space to some extent. In essence, space is used for time. Version 1.16 will be further improved to make it available for production. For example, we can speed up recovery time through Local Recovery and file caching. On the other hand, the Changelog State Backend interface is universal. We can use the same interface to connect faster storage to achieve shorter latency, such as Apache Bookkeeper. In addition, we are studying other applications of Changelog, such as applying Changelog to Sink to achieve universal end-to-end exactly once.

Related Articles

Explore More Special Offers

  1. Short Message Service(SMS) & Mail Service

    50,000 email package starts as low as USD 1.99, 120 short messages start at only USD 1.00

phone Contact Us