Optimization and Practice of Flink Unaligned Checkpoint in Shope

1. Problems in Checkpoint

1.1 Technical problems in checkpoint

It is a common problem in Flink production that the backpressure of Flink operation causes the checkpoint timeout failure seriously, and the continuous backpressure will lead to a checkpoint that fails for a long time.

For example, common scenarios such as external query or write performance bottlenecks, CPU bottlenecks, and data skew during rush or peak periods will indirectly cause checkpoint failures.

1.2 Impact of continuous failure of checkpoint on business

• After consuming half an hour of lag data, dev found that the consumption rate of the half an hour task was slow and could not meet expectations. He wanted to increase the task parallelism and restart to improve consumption capacity. If the checkpoint fails all the time, it needs to recover from the checkpoint half an hour ago. The data consumed in this half an hour will be consumed repeatedly, leading to the risk of resource waste and possible duplication of business data.

• When consuming lag, if the tolerable failed checkpoints (the default number of tolerance CP failures is 0) is too low, the Flink job may enter an endless cycle (consuming lag causes serious backpressure on the job, serious backpressure causes checkpoint timeout failure, checkpoint failure causes job failure, and job failure causes more lag consumption), the lag can never be consumed.

• Unlimited tolerance of checkpoint failure is not an elegant solution. If the tolerance times are too high:

⚀ Problems in production cannot be found in time;

⚀ Some connectors will submit data or files during checkpoint. If checkpoint fails continuously, these data or files cannot be submitted for a long time, which will cause data delay and transaction timeout. For example, Kafka Producer transaction timeout will lead to transaction failure;

⚀ Once the job is restarted, a large amount of data will be consumed repeatedly.

• Business peak and promotion are similar to consumption lag, and they will encounter the same problems.

1.3 Introducing Unaligned Checkpoint

Based on the above background, many users hope that the checkpoint can succeed when the Flink task has a bottleneck (serious backpressure), so the Flink community has introduced the Unaligned Checkpoint mechanism (hereinafter referred to as UC) in FLIP-76.

2. Principle of Unaligned Checkpoint

2.1 Core idea of UC

When the backpressure is serious, the timeout of the aligned checkpoint (hereinafter referred to as AC) is mainly caused by the barrier queuing in the data stream. When the backpressure is serious, the data flow is very slow, leading to the slow flow of Barrier, which eventually leads to the AC timeout.

The core idea of UC is that when the data flow is very slow, Barrier can surpass the data through some mechanisms, so that Barrier can quickly overtake all the way from Source to Sink.

2.2 Task UC Process Details

Assume that the current task's upstream Task parallelism is 3 and the downstream Task parallelism is 2. After the UC starts, the three InputChannels of the Task will successively receive the Barriers sent by the upstream.

As shown in the figure, the gray boxes represent the data in the buffer. InputChannel-0 receives the Barrier first, and other InputChannels have not received the Barrier yet.

When an InputChannel receives a Barrier, it will directly start the first phase of UC, namely the UC synchronization phase. be careful:

• As long as any Barrier enters the input buffer of the Task network layer, the Task directly starts UC;

• Do not wait for other InputChannel to receive the Barrier, nor do you need to process the data before the Barrier in the InputChannel.

As shown in the figure below, in order to ensure data consistency, the UC synchronization phase Task cannot process data. The synchronization phase will do the following:

• Barrier overtaking: Send the Barrier to the head of all ResultSubPartitions, surpassing all input&output buffers, and the Barrier can be quickly sent to downstream tasks;

• Snapshot of the buffer: snapshot all the input&output buffers that exceed;

• Call the snapshot State method of the operator;

• Flink engine takes a snapshot of the state inside the operator.

There are several considerations:

• During UC, the buffer data exceeded by Barrier is directly skipped. To ensure that data is not lost, these buffers need to be written to HDFS together with the State. When recovering from the checkpoint, these data will be consumed;

• In the synchronization phase, the Task cannot process data. In order to minimize the blocking time, the synchronization phase only makes a reference to the buffer and status data. The actual writing of data to HDFS will be completed asynchronously;

• The last two steps of UC synchronization phase are completely consistent with AC, and the State inside the operator is snapped.

After the UC synchronization phase is completed, Task continues to process data, and simultaneously starts the second phase of UC: Barrier alignment and UC asynchronous phase. The asynchronous phase writes the state and buffer of the shallow copy of the synchronous phase to HDFS.

Why does UC have Barrier alignment?

When the task starts UC, many InputChannels do not receive Barriers, and the Barriers of these InputChannels may have network buffers that need to be snapped before. Therefore, in the second phase of UC, all the Barriers of InputChannels need to arrive, and the buffers before the Barriers need to be snapped. It can be considered that UC needs to write three types of data to HDFS:

• All input&output buffers referenced in the synchronization phase;

• The state inside the operator referenced in the synchronization phase;

• Buffer before other InputChannel Barriers after synchronization.

In the asynchronous phase, after all the three parts of data are written, the file address is reported to the JobManager, and the UC of the current Task ends.

Note: Theoretically, Barrier alignment in UC asynchronous phase will be fast. As shown in the above tasks, Barrier can quickly surpass all input&output buffers and send Barrier to downstream tasks first. Therefore, upstream tasks are similar: Barrier exceeds all upstream buffers and sends it to the current task quickly.

2.3 Problems in UC practice

When any Barrier enters the input buffer of the Task network layer, the Task directly starts UC. Barrier quickly surpasses all buffers and sends them to the downstream, so UC is not affected by backpressure. Theoretically: No matter how serious the backpressure is, UC Barrier can overtake all the way, quickly flow from Source to Sink, and each task can quickly complete the snapshot.

The theory is very good, but in the process of actual research and task use, we found that the UC effect was not as expected:

In many scenarios, UC still fails to succeed when the backpressure on tasks is severe, leading to a large discount in UC's expected benefits;

UC will significantly increase the number of files written to HDFS, affect the stability of online services, and increase the difficulty of large-scale applications;

UC has some bugs.

The following sections will introduce the above problems, Shopee's solutions and contributions to the community.

3. Greatly increase UC income

The Task cannot process the checkpoint during data processing. It must process the currently processed data and write the results to the OutputBufferPool before checking whether the InputChannel has received the UC Barrier. If so, UC will be started.

If the Task processes a piece of data and writes the result to the OutputBufferPool for more than 10 minutes, the UC will still timeout. Generally, processing a piece of data will not be slow, but writing the results to the OutputBufferPool may be time-consuming.

From the perspective of OutputBufferPool, upstream Tasks are producers and downstream Tasks are consumers. Therefore, when the downstream task has a bottleneck, the upstream task output results to the OutputBufferPool will be stuck waiting for the buffer, and UC cannot be started.

To solve this problem, Flink community introduced the mechanism of reserving buffer in FLINK-14396. The solution is to check whether the OutputBufferPool has an idle buffer before the Task processes the data. If there is no idle buffer, continue to wait. The detailed process is shown in the figure below.

Do not process data until there is a free buffer in the OutputBufferPool, so as to ensure that the Task can write the results to the OutputBufferPool smoothly after processing the data, and will not be stuck in the data output phase in step 5. If there is no idle buffer after optimization, the task will be stuck in step 3 waiting for the idle buffer and UC Barrier phase. In this phase, when receiving the UC Barrier, UC can be started quickly.

3.1 Processing one piece of data requires improvement of multiple buffer scenarios

As shown in the figure below, since only one buffer is reserved, when processing a single piece of data requires multiple buffers, the Task may still be stuck in step 5 when processing the data and outputting the results to the OutputBufferPool, resulting in the Task being unable to process UC.

For example, large single data, flatmap, window trigger, and broadcast watermark all require multiple buffer scenarios to process one data. In these scenarios, Task is stuck in the data output phase of step 5, resulting in poor UC performance. The core idea to solve this problem is how to let the Task not be stuck in step 5 but in the waiting phase of step 3.

Based on the above problems, Shopee proposed an overdraft buffer in FLIP-227. The idea is: if the buffer is insufficient and the TaskManager has free network memory during data processing, the OutputBufferPool of the current Task will overdraw some buffers to TM to complete the data processing step 5.

Note: OutputBufferPool can only use overdraft buffer when there is no idle buffer. So once the overdraft buffer is used, when the task enters the phase of waiting for the Barrier and the idle buffer in the third step of the next round, the task will think that there is no idle buffer in the OutputBufferPool. The task will not continue to process data until all overdraft buffers are consumed by downstream tasks and there is at least one idle buffer in the OutputBufferPool.

By default, taskmanager. network. memory. max overdraft buffers per gate=5, that is, each OutputBufferPool of a Task can overdraw 5 buffers to TM. After the introduction of the overdraft buffer mechanism, when the TM network memory is sufficient, if five buffers are needed to process a piece of data, the UC will not be stuck at all. If the TM network memory is large, you can increase the parameters for more scenarios.

FLINK-1.16 began to support the function of overdraft buffer. The JIRA involved include FLINK-27,522, FLINK-26,762, and Flink-27,789.

3.2 Improvement of Legacy Source

There are two types of tasks from the source of data, SourceTask and Non SourceTask:

SourceTasks read data from external components to Flink jobs, and non SourceTasks read data from InputChannels. The data comes from upstream Tasks.

Non SourceTasks will check the OutputBufferPool before reading data from the InputChannel, and will only read when there are free buffers. If SourceTask does not check whether the OutputBufferPool has idle buffers before reading data from external components, UC will perform poorly.

Flink has two sources, Legacy Source and New Source:

The working mode of the new source and task is pull mode, that is, the task pulls data from the source. The working mode is similar to that of the InputChannel. The task will check that the OutputBufferPool has free buffers before pulling data from the source.

The Legacy Source is a push mode, that is, the Legacy Source reads data from an external component and sends it downstream directly. When the OutputBufferPool has no idle buffer, the Legacy Source will get stuck and cannot process UC normally.

However, almost all Flink jobs in our production environment still use Legacy Source. Since Legacy Source has been abandoned by Flink community and is no longer maintained, Shope has made improvements to the commonly used Legacy Source.

The improvement idea is similar to the above idea: Legacy Source checks that the OutputBufferPool has free buffer, and then sends data downstream.

The most commonly used FlinkKafkaConsumer in Flink is actually Legacy Source, so many Flink users in the industry are still using Legacy Source. We shared the internally improved version of Legacy Source to FLINK-26759.

4. Greatly reduce UC risk

After the above optimization, when the backpressure is serious, the scenario where UC needs multiple buffers to consume a piece of data in Legacy Source can also be successful quickly, which has reached the expected effect of some Flink users, but UC still cannot meet the standard of mass production. The main reason is that UC, compared with AC, will write the network buffer to the Checkpoint, so it introduces some additional risks:

More files will be written to HDFS, causing additional pressure on the NameNode;

After the schema of the data is upgraded, if the serialization is incompatible, the data cannot be recovered;

When the connection between operators changes, the buffer data between operators cannot be recovered (for example, changing from rebalance to forward).

4.1 Unable to switch from AC to UC smoothly

Users want to avoid these risks and enjoy the benefits brought by UC, so Flink community has introduced the aligned checkpoint timeout mechanism, that is, the default checkpoint is AC, and if the AC cannot be completed within the specified time, it will be switched to UC.

After the introduction of AC timeout mechanism, the risk of UC has not been completely avoided, but it is still AC without backpressure in the task, and there is no additional risk. When AC fails due to severe back pressure, switch to UC to ensure that the checkpoint can succeed.

We assume that AC timeout=1min and Checkpoint timeout=5min, that is, the checkpoint still starts with AC, if AC fails for one minute, it will be switched to UC, and if the total checkpoint duration exceeds 5min, it will fail to timeout.

There are three stages in the development of AC timeout. The first two stages fail to meet the expected goals, that is, when the time is up to one minute, the Job still cannot switch from AC to UC, or even five minutes cannot switch to UC, resulting in the Checkpoint timeout failure. We can understand these three stages with goals.

4.2 InputChannel supports switching from AC to UC

FLINK-19680 supports the AC timeout mechanism for the first time. The principle of the first phase is that each task starts timing from receiving the first barrier. If the AC barrier alignment time in the task exceeds the AC timeout, the current task switches from AC to UC.

The problem with this mechanism is that when there are many tasks in a job, there are 10 tasks from the Source to the Sink. Assuming that the internal Barrier alignment time of 10 Tasks is 59 seconds, all Tasks will not switch to UC, but all 10 Tasks need to be aligned. The total checkpoint time needs at least 590 seconds (more than 5 minutes), so the checkpoint still fails to timeout.

Based on the problem in Phase 1, FLINK-23041 has made improvements. The principle of Phase 2 is that the Barrier carries the timestamp of the beginning of the checkpoint. After the InputChannel receives the Barrier, it uses the current system time minus the time of the beginning of the checkpoint to indicate how long the checkpoint has passed:

If it has been more than 1 minute, directly switch to UC;

If it is less than 1 minute, subtract the time consumed by AC from 1 minute to indicate how long you want to switch to UC. Set a timer. When the time is up, it will switch to UC.

Compared with Phase I, Phase II solves the problem of time accumulation of multiple tasks. As long as the InputChannel receives the Barrier and the AC is not completed within the specified time, the AC can be switched to UC regularly.

4.3 Output buffer supports switching from AC to UC

After Phase II is completed, it can be considered that InputChannel has well supported AC switching to UC. However, the problem is also obvious: the output buffer does not support switching from AC to UC.

If the backpressure of the task is serious, the Barrier will queue in the output buffer. If the Barrier cannot send to the InputChannel of the downstream task within 5 minutes, the checkpoint will still timeout.

Based on this problem, Shopee proposed an improvement in FLINK-27251 and FLINK-28077 to support the switch of output buffer from AC to UC. The design idea is:

If UC is enabled and AC is current, the Barrier is sent to the end of the output buffer. But after a while, AC may need to be converted to UC, so you need to set a timer.

If the Barrier is still queuing in the output buffer when the timer time is up, the AC will be converted to UC: Barrier overtaking to the output buffer header, and the light blue buffer exceeded in the figure needs to be written to the checkpoint by the snapshot.

The community designed a Benchmark for Checkpoint in the early stage to evaluate the performance of Checkpoint, as shown in the figure below. The UC performance has been improved by 11 times after the optimization merge is transferred to the branch of Flink master.

4.4 UC Small File Consolidation

After the AC timeout mechanism is enabled, Flink can use AC when the backpressure is not serious, and switch to UC smoothly when the backpressure is serious. It greatly reduces the additional risks of UC, and can also enjoy the benefits of UC when the backpressure is serious. But in large-scale production, there are still risks.

By default, each subtask of Flink writes a file for the buffer. Assuming that there are 10 tasks and each task is concurrent at 1000, UC may write 10000 additional small files. If the Kafka cluster fails or has a bottleneck, a large number of Flink jobs are slow to write to Kafka, which will cause a large number of Flink tasks to switch from AC to UC. In this case, a large number of tasks write hundreds of thousands of small files to HDFS instantly, which may cause the NameNode avalanche.

In order to solve the problem of small files, Shopee proposed an improvement to merge UC small files in FLINK-26803 and FLINK-28474.

Optimization idea: multiple tasks share the same file. Each Task no longer creates files separately, but obtains file streams from CheckpointStreamManager.

CheckpointStreamManager will allocate one file for n Tasks. By default, channel state. number of tasks share file=5, that is, if 5 Tasks share one UC file, the number of UC files will be reduced by 5 times. Multiple Tasks writing the same file at the same time will cause thread safety problems. Therefore, when writing files, lock the file stream to ensure that multiple Tasks write files serially.

From the production experience, a large number of UC small files are within 1MB, so it is acceptable for 20 Tasks to share a file. Of course, if NN pressure is very low and Flink Job is more committed to writing efficiency, you can set this parameter to 1, which means that Tasks do not share UC files.

Currently, we are still contributing to the community with the UC small file merge function.

4.5 Repair network buffer deadlock

Shopee's contribution to UC also includes solving the deadlock problem when recycling network buffers in FLINK-22946.

5. UC's production practice and future planning in Shopee

5.1 UC production practice

In order to avoid the additional risks brought by UC, Shope internally sets the aligned checkpoint timeout to 1 minute, which means that the backpressure of the task is not serious. If the AC can be completed within 1 minute, then use the AC. When the backpressure is serious and AC cannot be completed within 1 minute, it will be switched to UC.

The development page of the Shopee Flink platform also adds a UC switch. The user can choose whether to enable the Unaligned Checkpoint for the job. Up to now, hundreds of Flink tasks have enabled UC, and the current operations using UC perform well. UC can also succeed in backpressure.

5.2 Future planning of UC

We will continue to pay attention to the problems users encounter with UC. After a few months of stable operation, we can consider enabling UC for all tasks on the premise of enabling AC timeout.

The internal version of Shopee has greatly changed Flink scheduling and network memory modules. It can accurately calculate the network memory required by TM. In the future, it will reserve separate memory for UC overhead buffer.

Related Articles

Explore More Special Offers

  1. Short Message Service(SMS) & Mail Service

    50,000 email package starts as low as USD 1.99, 120 short messages start at only USD 1.00

phone Contact Us