Regional Checkpoint optimization practice

1. Single point recovery mechanism

In ByteDance's real-time recommendation scenario, we use Flink to stitch user features and user behaviors in real time, and the stitching samples are used as the input of the real-time model. The delay and stability of splicing services directly affect the recommendation effect of online products to users. In Flink, this kind of splicing service is similar to the implementation of dual-stream Join. Any failure of any Task or node in the Job will cause the entire Failover occurs in the job, which affects the real-time recommendation effect of the corresponding business.

Before introducing single point recovery, let's review Flink's failover strategy.


Only restart the task that has an error, which is suitable for the situation where there is no connection between tasks, and the application scenarios are limited.


This strategy divides all tasks in the job into several regions. When a Task fails, it tries to find the minimum set of Regions that need to be restarted for failure recovery. Compared with the global restart fault recovery strategy, this strategy requires fewer tasks to be restarted for fault recovery in some scenarios.

If you use the Region-Failover strategy, but because the Job is a fully connected topology, it is itself a large region. Restarting a region is equivalent to restarting the entire job, so we consider whether we can use the Flink Individual-task-failover strategy to replace the Region-failover strategy? The Individual-task-failover strategy is completely inapplicable in this topology. Therefore, we need to design and develop a new Failover strategy for scenarios with the following characteristics:

Multi-Stream Join Topology

Large traffic (30M QPS), high concurrency (16K*16K)

A small amount of partial data loss is allowed for a short period of time

High requirements for continuous data output

Before describing the technical solution, let's take a look at Flink's existing data transmission mechanism.

Looking from left to right (SubTaskA):

When data flows in, it will be received by RecordWriter first

According to the information of the data, such as key, RecordWriter shuffle the data and select the corresponding channel

Load the data into the buffer and put it into the buffer queue corresponding to the channel
Send downstream through Netty Server

Downstream Netty Client receives data

According to the partition information in the buffer, forward it to the corresponding downstream channel
The data is taken out of the buffer by InputProcessor, and the operator logic is executed
According to the ideas proposed above, we need to solve the following problems:

How to make upstream Task aware of downstream Failure

After the downstream Task fails, how to make the upstream Task send data to the normal Task

After the upstream Task fails, how to make the downstream Task continue to consume the data in the buffer

How to deal with incomplete data in upstream and downstream

How to establish a new connection

Solutions are proposed for the above problems.

■ How to make upstream Task aware of downstream Failure

The downstream SubTask actively transmits the failure information to the upstream, or the upstream Netty Server can also perceive that the TM is closed. An X is used in the figure to indicate an unavailable SubPartition.

First, make SubPartition1 and the corresponding view (a structure used by Netty Server to fetch SubPartition data) unavailable.

Afterwards, when Record Writer receives new data and needs to send data to SubPartition1, an availability judgment needs to be made at this time. When the SubPartition status is available, it will be sent normally, and if it is not available, the data will be discarded directly.

■ The upstream Task receives a new connection from the downstream Task

After the downstream subTask is rescheduled and started, it sends a Partition Request to the upstream. After receiving the Partition Request, the upstream Netty Server re-creates a corresponding View for the downstream SubTask. At this time, the upstream Record Writer can write data normally.

■ The downstream task perceives the failure of the upstream task

The same downstream Netty Client can perceive that a subTask in the upstream has failed, and then find out the corresponding channel, and insert an unavailable event at the end (an exclamation mark is used here to indicate the event). Our goal is to lose as little data as possible. At this time, the buffer in the channel can be consumed normally by the InputProcessor until the "unavailable event" is read. Then mark the channel as unavailable and clean up the corresponding buffer queue.

■ Incomplete data in Buffer

First of all, we need to know where the incomplete data is stored. It exists inside the input process, and the input process will maintain a small buffer queue for each channel. When a buffer is received, it is incomplete data, then wait until the next buffer is received and then spliced into a complete data and sent to the operator.

■ Downstream Task and upstream Task reconnect

When the upstream problematic Task is rescheduled, the downstream is notified by calling the TaskManager API. After receiving the notification, the downstream Shuffle Environment judges the status of the corresponding channel. If not, it will directly generate a new channel and release the old one. If it is in the available state, it means that the buffer of the channel has not been consumed, and it is necessary to wait for the buffer to be consumed before performing the replacement operation.

business income

The above figure is a comparative test using a job with 4000 degrees of parallelism as an example. The business is to join a user presentation flow and a user behavior flow. There are 12,000 tasks in the whole job.

The single-point recovery (reserved resources) in the above figure is a feature made by using the scheduling group. When applying for resources, you can choose to apply for some additional resources. When a failover occurs, the time overhead of applying for resources from YARN is saved.

In the end, the output of the job was reduced by one thousandth, and the recovery time was about 5 seconds. Because the entire recovery process takes a short time, the downstream can basically be unaware.

2. Regional Checkpoint

In a more classic data integration scenario, data is imported and exported. For example, importing from Kafka to Hive meets the following characteristics.

There are no All-to-All connections in the topology
Strongly rely on Checkpoint to achieve data output under Exactly-Once semantics
The checkpoint interval is long and requires a high success rate
In this case, the data does not have any shuffling.

What problems are encountered in the data integration scenario?

A single Task Checkpoint failure will affect the global Checkpoint output
The impact of network jitter, write timeout/failure, and storage environment jitter on jobs is too obvious
The success rate of jobs with more than 2,000 parallel operations has dropped significantly, which is lower than business expectations
Here, we think that the job will divide the topology of the job into multiple regions according to the region-failover strategy. So can Checkpoint take a similar idea and manage checkpoints in units of regions? The answer is yes.

In this case, there is no need to wait until all Task checkpoints are completed before performing partition archiving operations (such as HDFS file rename). Instead, the region-level checkpoint archiving operation can be performed after a region is completed.

Before introducing the scheme, let’s briefly review Flink’s existing checkpoint mechanism. I believe everyone is familiar with it.

Existing ckp

The figure above is an example of a Kafka source and Hive sink operator topology with a parallelism of 4.

First, the checkpoint coordinator triggers the triggerCheckpoint operation and sends it to each source task. After the Task receives the request, it triggers the operator in the Task to perform the snapshot operation. There are 8 operator states in the example.

Existing ckp1

After each operator completes the snapshot, the Task sends an ACK message to the checkpoint coordinator to indicate that the current Task has completed the Checkpoint.

Then when the coordinator receives all successful ACK messages of the Task, the checkpont can be considered successful. Finally, the finalize operation is triggered to save the corresponding metadata. Notify all Task checkpoint completion.

What problems will we encounter when we use the Region method to manage checkpoints?

How to divide Checkpoint Region

Divide the set of tasks that are not connected to each other into a region. It is obvious that there are four Regions in the example.

How to deal with the Checkpoint result of the failed Region
Assuming that the first checkpoint can be completed normally, the status corresponding to each operator is successfully written into the HDFS checkpoint1 directory, and 8 operators are mapped to 4 checkpoint regions through logical mapping. Note that it is only a logical mapping, and no physical files have been moved or modified.

Existing ckp1

The region-4-data (Kafka-4, Hive-4) checkpoint failed during the second checkpoint. There are no corresponding Kafka-4-state and Hive-4-state files in the checkpoint2 (job/chk_2) directory, and the current checkpoint2 is incomplete. In order to ensure completeness, find the successful state file of region-4-data from the last or previous successful checkpoint file, and perform logical mapping. In this way, the state files of each region of the current checkpoint are complete, and the checkpoint can be considered complete.

At this time, if most or all regions fail, if the previous checkpoint is referenced, then the current checkpoint is the same as the previous checkpoint, which is meaningless.

By configuring the maximum failure ratio of the region, such as 50%, there are 4 regions in the example, and at most two regions can fail.

How to avoid storing too much checkpoint history data on the file system
If a region keeps failing (dirty data or code logic problems), the current mechanism will cause all historical checkpoint files to be preserved, which is obviously unreasonable.

The maximum number of consecutive failures supported by the region is configured. For example, 2 means that the region can refer to the region results of the previous two successful checkpoints at most.

Difficulties in engineering realization

How to deal with Task Fail and checkpoint timeout
How to deal with the subTask status that has been successfully snapshot in the same region
How to ensure compatibility with checkpoint Coordinator
Let's see how Flink is currently doing.

Existing coordinator

When a Task failure occurs, the JobMaster FailoverStrategy will be notified first, and the checkpoint coordinator will be notified through the FailoverStrategy to perform the checkpoint cancel operation.

So how to deal with the checkpoint timeout situation? When the coordinator triggers a checkpoint, the checkpoint canceller is turned on. There is a timer in the canceller. When the preset time is exceeded and the coordinator has not completed the checkpoint, it means that a timeout occurs and the coordinator is notified to cancel the checkpoint.

Whether it is Task fail or timeout, it will eventually point to the pending checkpoint, and the currently pointed checkpoint will be discarded.

Before making corresponding modifications, sort out the checkpoint-related Message and the response that the checkpoint coordinator will make.

Global checkpoint is Flink's existing mechanism.

To maintain compatibility with checkpoint Coordinator, add a CheckpointHandle interface. And added two implementations: GlobalCheckpointHandle and RegionalCheckpointHandle to implement global checkpoint and region checkpoint related operations by filtering messages.

region checkpoint mention a little. If the handler receives a failure message, set the region as failed, and try to perform region logical mapping from the previous successful checkpoint. Similarly, the nofityComplate message sent by the coordinator will first be filtered by the handler to filter out the messages sent to the failed Task.

business income

The test is under 5000 degrees of parallelism, assuming that the success rate of a single Task snapshot is 99.99%. The success rate of using Global checkpoint is 60.65%, while using Region checkpoint can still maintain 99.99%.

3. Other optimizations on Checkpoint

■ Parallelized restoration of operator state

The union state is a special state. When restoring, it is necessary to find all the Task states of the job, and then perform union restoration to a single Task. If the job parallelism is very large, such as 10000, then at least 10000 files need to be read when the union state of each task is restored. If the state in these 10,000 files is restored serially, the recovery time can be imagined to be very long.

Although the data structure corresponding to OperatorState cannot be operated in parallel, our process of reading files can be parallelized. During the recovery process of OperatorStateBackend, we parallelize the process of reading HDFS files until all state files are resolved to After memory, use a single thread to process, so that we can reduce the state recovery time from tens of minutes to a few minutes.

■ Enhance CheckpointScheduler and support Checkpoint hourly trigger

The interval and timeout of Flink checkpoint cannot be modified after the task is submitted. But when you first go online, you can only set it according to the experience value. However, it is often found that the settings of parameters such as interval and timeout are unreasonable during the peak period of the job. At this time, one method is usually to modify the parameters and restart the task, which has a relatively large impact on the business. Obviously, this method is unreasonable.

Here, we refactor the Checkpoint trigger mechanism inside CheckpointCoordinator, and abstract the existing Checkpoint trigger process, so that we can quickly customize the Checkpoint trigger mechanism based on abstract classes. For example, in a scenario that supports data import, in order to form Hive partitions faster, we have implemented an hourly trigger mechanism, so that the downstream can see the data as soon as possible.

There are many optimization points that I will not list one by one.

4. Challenges & Future Planning

At present, the job status inside Byte can reach a maximum level of about 200TB, but for such a job with a large flow and a large status, it cannot be supported by directly using the RocksDB StateBackend. So in the future, we will continue to do more work on state and checkpoint performance optimization and stability, such as strengthening the existing StateBackend, solving checkpoint speed problems under tilt and backpressure, and enhancing debugging capabilities.

Related Articles

Explore More Special Offers

  1. Short Message Service(SMS) & Mail Service

    50,000 email package starts as low as USD 1.99, 120 short messages start at only USD 1.00

phone Contact Us