This topic answers frequently asked questions (FAQs) about checkpoints and savepoints in Realtime Compute for Apache Flink.
-
Why is data not updated after the state TTL expires when mini-batch is enabled?
-
How to calculate the start time of the next periodic checkpoint
-
What are the differences between the GeminiStateBackend in VVR 8.x and VVR 6.x?
-
Is it normal for a full checkpoint and an incremental checkpoint to have the same size?
-
Error: You are using the new V4 state engine to restore old state data from a checkpoint
-
Error: org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Too many ongoing snapshots.
Mini-batch data update failures
The state holds the results of previous full computations. If the state's TTL expires, it is cleared, and these accumulated results are lost. Consequently, new data cannot be updated based on the mini-batch results.
Conversely, if mini-batch is disabled, when the state's TTL expires, data for the expired key is recalculated and emitted. This ensures continuous data updates. However, the increased frequency of data updates can lead to other issues, such as processing delays.
Therefore, you should configure mini-batch and TTL settings based on your specific business requirements.
Calculating the next checkpoint start time
The start time of the next checkpoint is determined by two parameters: the checkpoint interval and the minimum pause between checkpoints. A new checkpoint is triggered when both of the following conditions are met:
-
Checkpoint interval: The minimum time between the start of one checkpoint and the start of the next. This is the time between
<start time of previous checkpoint, start time of next checkpoint>. -
Minimum pause: The minimum time between the end of one checkpoint and the start of the next. This is the time between
<end time of previous checkpoint, start time of next checkpoint>.
Consider the following two scenarios where the checkpoint interval is 3 minutes, the minimum pause is 3 minutes, and the timeout is 10 minutes.
-
Scenario 1: The deployment runs normally, and every checkpoint succeeds.
The first checkpoint starts at 12:00:00 and completes successfully at 12:00:02. The second checkpoint will start at 12:03:00.
-
Scenario 2: The deployment runs abnormally, and a checkpoint fails due to a timeout.
The first checkpoint starts at 12:00:00 and completes successfully at 12:00:02. The second checkpoint starts at 12:03:00 but fails at 12:13:00 due to a timeout. The third checkpoint will start at 12:16:00.
For more information about configuring the minimum pause between checkpoints, see Tuning Checkpointing.
GeminiStateBackend in VVR 8.x vs. VVR 6.x
By default, Realtime Compute for Apache Flink engines for VVR 6.x use GeminiStateBackend V3, while VVR 8.x engines use V4.
|
Category |
Description |
|
Basic capabilities |
|
|
State lazy loading parameter |
|
|
Managed Memory usage |
The only difference is in the Resident Set Size (RSS) metric:
Note
For more information about managed memory, see TaskManager Memory. |
Full and incremental checkpoints of the same size
If you observe that a full checkpoint and an incremental checkpoint have the same size, you should:
-
Verify that incremental checkpointing is correctly configured and enabled.
-
This behavior may be expected in certain scenarios. For example:
-
Before data is ingested (for example, before 18:29), the deployment has not processed any data. The checkpoint contains only the initial state of the source, making it effectively a full checkpoint.
-
At 18:29, one million records are ingested. If this data is completely processed within the next checkpoint interval (for example, 3 minutes) and no other data arrives, the first incremental checkpoint will contain all the state generated from those records.
In this case, the full checkpoint and the first incremental checkpoint are expected to have the same size. The first incremental checkpoint must include the state of all data to ensure a full recovery is possible from that point, which makes it functionally equivalent to a full checkpoint.
The benefits of incremental checkpointing typically become apparent from the second checkpoint onward. With a stable data input and no major state changes, subsequent incremental checkpoints should be smaller, indicating that the system is correctly snapshotting only state changes. If they remain the same size, you should investigate your system's state and behavior to identify any potential issues.
-
Slow checkpoints in Python deployments
-
Cause
A poorly performing Python User-Defined Function (UDF) can increase the checkpoint duration and degrade deployment performance.
-
Solution
Reduce the buffer size. In the Other Configuration section, set the following parameters. For instructions, see Configure custom deployment parameters.
python.fn-execution.bundle.size: Default value: 100000. Unit: records. python.fn-execution.bundle.time: Default value: 1000. Unit: milliseconds.For more information about these parameters, see Flink Python Configuration.
Troubleshooting checkpoint exceptions
-
Diagnose the exception type
View the checkpoint history on the Alarm or State tab to identify the exception type, such as a timeout or a write failure.
Select the Overview tab and expand the Checkpoint section. The table displays details for each checkpoint, including its ID, status, trigger time, duration, and data size. The Status column shows whether each checkpoint succeeded or failed.
-
Isolate and resolve the issue
-
Scenario 1: Frequent checkpoint timeouts. Check the deployment for backpressure. Analyze the root cause of the backpressure, identify the slow operator and resolve the issue by adjusting resources or configurations. For more information, see How to troubleshoot backpressure issues.
-
Scenario 2: Checkpoint write failures. Follow these steps to find the relevant TaskManager logs and analyze the root cause.
-
On the Checkpoints page of the Logs tab, click Checkpoints History.
On the Checkpoints History page, you can view details for each checkpoint, such as ID, Status, Acknowledged, Trigger Time, End to End Duration, and Checkpointed Data Size.
-
Click the plus sign (+) next to the failed checkpoint to view its operator details.
-
Expand the failed operator and click the SubTask ID to navigate to the corresponding TaskManager logs.
-
-
Error: Restoring old state with V4 engine
-
Error message
When upgrading from VVR 6.x to VVR 8.x, you may encounter the error:
You are using the new V4 state engine to restore old state data from a checkpoint -
Cause
VVR 6.x and VVR 8.x use different versions of GeminiStateBackend, and their checkpoints are not compatible.
-
Solution
You can use any of the following methods to resolve this issue:
-
Create a savepoint in the standard format and start the deployment from this state. For more information, see Manually create a savepoint and Start a deployment.
-
Restart the deployment without state.
-
(Not recommended) Continue to use the legacy version of Gemini. You must set the parameter
state.backend.gemini.engine.type: STREAMINGand restart the deployment for the change to take effect. For information about how to configure parameters, see How to configure deployment parameters. -
(Not recommended) Continue to use the VVR 6.x engine to start the deployment.
-
Error: java.lang.NegativeArraySizeException
-
Error message
A deployment that uses a list state may encounter the following exception at runtime:
Caused by: java.lang.NegativeArraySizeException at com.alibaba.gemini.engine.rm.GUnPooledByteBuffer.newTempBuffer(GUnPooledByteBuffer.java:270) at com.alibaba.gemini.engine.page.bmap.BinaryValue.merge(BinaryValue.java:85) at com.alibaba.gemini.engine.page.bmap.BinaryValue.merge(BinaryValue.java:75) at com.alibaba.gemini.engine.pagestore.PageStoreImpl.internalGet(PageStoreImpl.java:428) at com.alibaba.gemini.engine.pagestore.PageStoreImpl.get(PageStoreImpl.java:271) at com.alibaba.gemini.engine.pagestore.PageStoreImpl.get(PageStoreImpl.java:112) at com.alibaba.gemini.engine.table.BinaryKListTable.get(BinaryKListTable.java:118) at com.alibaba.gemini.engine.table.BinaryKListTable.get(BinaryKListTable.java:57) at com.alibaba.flink.statebackend.gemini.subkeyed.GeminiSubKeyedListStateImpl.getOrDefault(GeminiSubKeyedListStateImpl.java:97) at com.alibaba.flink.statebackend.gemini.subkeyed.GeminiSubKeyedListStateImpl.get(GeminiSubKeyedListStateImpl.java:88) at com.alibaba.flink.statebackend.gemini.subkeyed.GeminiSubKeyedListStateImpl.get(GeminiSubKeyedListStateImpl.java:47) at com.alibaba.flink.statebackend.gemini.context.ContextSubKeyedListState.get(ContextSubKeyedListState.java:60) at com.alibaba.flink.statebackend.gemini.context.ContextSubKeyedListState.get(ContextSubKeyedListState.java:44) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:533) at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:289) at org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1435) -
Cause
The state data for a single key in the list state has exceeded 2 GB. This can happen as follows:
-
During normal operation, values appended to a single key in a list state are combined through a merge process (for example, in a window operator), causing the state data to grow continuously.
-
When the state data reaches a certain size, it may first trigger an Out Of Memory (OOM) error. After the deployment recovers from the failure, the merge process can cause the state backend to request a temporary byte array that exceeds 2 GB, resulting in this exception.
NoteThe RocksDBStateBackend can encounter a similar issue, which may trigger an
ArrayIndexOutOfBoundsExceptionor a segmentation fault. For more information, see The EmbeddedRocksDBStateBackend. -
-
Solution
-
If the large state is caused by a window operator, consider reducing the window size.
-
If the large state is caused by the deployment's logic, consider redesigning it, for example, by splitting the keys.
-
Error: FlinkKafkaException: Too many ongoing snapshots
-
Error message
org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Too many ongoing snapshots. Increase kafka producers pool size or decrease number of concurrent checkpoints -
Cause
This error occurs when using a Kafka sink and is caused by multiple consecutive checkpoint failures.
-
Solution
To prevent failures due to timeouts, increase the checkpoint timeout by adjusting the
execution.checkpointing.timeoutparameter. For more information about how to configure parameters, see Configure custom deployment parameters.
Error: Exceeded checkpoint tolerable failure threshold
-
Error message
org.apache.flink.util.FlinkRuntimeException:Exceeded checkpoint tolerable failure threshold. at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleJobLevelCheckpointException(CheckpointFailureManager.java:66) -
Cause
The configured number of tolerable checkpoint failures is too low, causing the deployment to trigger a failover when this threshold is exceeded. If this parameter is not set, the default value is 0, which means no checkpoint failures are tolerated.
-
Solution
Adjust the number of allowed checkpoint failures by setting the
execution.checkpointing.tolerable-failed-checkpoints: numparameter, wherenummust be 0 or a positive integer. For more information about how to configure parameters, see Configure custom deployment parameters.