This topic answers frequently asked questions (FAQ) about system checkpoints and job snapshots in Realtime Compute for Apache Flink.
Why is new data not updated after table.exec.state.ttl expires when minibatch is enabled?
How do I calculate the start time of the next periodic checkpoint?
What are the differences between the GeminiStateBackend used in VVR 8.x and VVR 6.x?
Is it normal for the size of a full checkpoint to be the same as an incremental checkpoint?
Error: You are using the new V4 state engine to restore old state data from a checkpoint
Error: org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Too many ongoing snapshots.
Why is no new data updated after table.exec.state.ttl expires when minibatch is enabled?
When minibatch is enabled, data is computed in batches and stored in the state. The data in the state is based on previous full computation results. If the state is purged because the time to live (TTL) expires, the previous accumulated computation results are also lost. As a result, data cannot be updated based on the minibatch results.
Conversely, if minibatch is disabled, when the state expires due to TTL, the data for the expired key is re-accumulated and output. This ensures that data updates continue. However, the increased frequency of data updates can cause other issues, such as data processing delays.
Therefore, you should configure minibatch and TTL based on your business scenario.
How do I calculate the start time of the next periodic checkpoint?
Two parameters affect the start time of the next checkpoint: the checkpoint interval and the minimum pause between checkpoints. The next checkpoint is triggered when both of the following conditions are met:
Checkpoint interval: The minimum time difference between the start time of the previous checkpoint and the start time of the next checkpoint.
Minimum pause: The minimum time difference between the end time of the previous checkpoint and the start time of the next checkpoint.
The following two scenarios illustrate this. In both scenarios, the checkpoint interval is 3 minutes, the minimum pause is 3 minutes, and the timeout is 10 minutes.
Scenario 1: The job runs normally and each checkpoint succeeds.
The first checkpoint starts at 12:00:00 and succeeds at 12:00:02. The second checkpoint starts at 12:03:00.
Scenario 2: The job runs abnormally. For example, a checkpoint times out or fails.
The first checkpoint starts at 12:00:00 and succeeds at 12:00:02. The second checkpoint starts at 12:03:00, but it times out and fails at 12:13:00. The third checkpoint starts at 12:16:00.
For more information about setting the minimum pause between checkpoints, see Tuning Checkpointing.
What are the differences between the GeminiStateBackend used in VVR 8.x and VVR 6.x?
The Realtime Compute for Apache Flink compute engine Ververica Runtime (VVR) 6.x uses the V3 version of GeminiStateBackend by default. VVR 8.x uses the V4 version of GeminiStateBackend by default.
Category | Details |
Basic features |
|
Lazy state loading parameter |
|
Differences in Managed Memory usage | The only difference is in the Resident Set Size (RSS) metric:
Note For more information about Managed Memory, see TaskManager Memory. |
Is it normal for the size of a full checkpoint to be the same as an incremental checkpoint?
If you observe that the size of a full checkpoint is the same as an incremental checkpoint when you use Flink, perform the following steps:
Check whether incremental snapshots are configured correctly and are in effect.
This behavior can be normal in specific situations. For example:
Before data ingestion (before 18:29), the job has not processed any data. The checkpoint contains only the initial state information of the source. Because there is no other state data, this checkpoint is a full checkpoint.
At 18:29, one million data entries are ingested. Assume the data is fully processed within the next checkpoint interval (3 minutes), and no other data is ingested during this period. The first incremental checkpoint will contain all the state information generated by these one million data entries.
In this case, it is normal for the full checkpoint and the incremental checkpoint to have the same size. This is because the first incremental checkpoint must contain the full data state to ensure that the entire state can be recovered from that point. This effectively makes it a full checkpoint.
The benefits of incremental checkpoints usually become apparent from the second checkpoint onward. When data input is stable and there are no large-scale state changes, subsequent incremental checkpoints are expected to be smaller. This indicates that the system is correctly creating a snapshot of only the incremental part of the state. If the sizes are still the same, you should review the system state and behavior to identify any potential issues.
What should I do if checkpoints are slow for a Python job?
Cause
Python operators have an internal cache. During a checkpoint, all data in the cache must be processed. Therefore, if the performance of a Python user-defined function (UDF) is poor, the checkpoint time increases and affects job execution.
Solution
To reduce the cache size, set the following parameters in the Additional Configurations section. For more information, see How do I configure custom runtime parameters for a job?.
# The default value is 100000. The unit is the number of entries. python.fn-execution.bundle.size # The default value is 1000. The unit is milliseconds. python.fn-execution.bundle.timeFor more information about the parameters, see Flink Python Configuration.
How Do I Troubleshoot Abnormal Checkpoints in a Job?
Diagnose the exception type
On the Monitoring and Alerts tab or in State Management, you can view the checkpoint history to identify the exception type, such as a checkpoint timeout or a write failure.

Classification, Identification, and Handling
Scenario 1: Frequent checkpoint timeouts. Check the job for backpressure. You can analyze the root cause of the backpressure, locate the slow operator, and take appropriate action, such as adjusting resources or configurations. For more information, see How do I troubleshoot job backpressure?
Scenario 2: Checkpoint write failures. You can find the TaskManager (TM) logs by following these steps. Then, you can analyze the logs to determine the cause.
On the Checkpoints tab of the job logs page, click Checkpoints History.

Click the plus sign (+) to the left of the abnormal checkpoint to view the checkpoint status of the operator.
Click the plus sign (+) to the left of the abnormal operator. Then, click the ID of the abnormal subtask to go to the corresponding TM.

Error: You are using the new V4 state engine to restore old state data from a checkpoint
Error details
When you upgrade from VVR 6.x to VVR 8.x, the following error is reported:
You are using the new V4 state engine to restore old state data from a checkpoint.Cause
The Gemini state backend versions used by VVR 6.x and 8.x are different, and their checkpoints are not compatible.
Solution
You can use one of the following methods to resolve the issue:
Create a job snapshot in the standard format and start the job from that state. For more information, see Manually create a job snapshot and Start a job.
Restart the job without a state.
(Not recommended) Continue to use the legacy version of Gemini. You must configure
state.backend.gemini.engine.type: STREAMINGand restart the job for the change to take effect. For more information about how to configure parameters, see How do I configure runtime parameters for a job?(Not recommended) Continue to use the VVR 6.x engine to start the job.
Error: java.lang.NegativeArraySizeException
Error details
When a job uses a List State, the following exception occurs during runtime.
Caused by: java.lang.NegativeArraySizeException at com.alibaba.gemini.engine.rm.GUnPooledByteBuffer.newTempBuffer(GUnPooledByteBuffer.java:270) at com.alibaba.gemini.engine.page.bmap.BinaryValue.merge(BinaryValue.java:85) at com.alibaba.gemini.engine.page.bmap.BinaryValue.merge(BinaryValue.java:75) at com.alibaba.gemini.engine.pagestore.PageStoreImpl.internalGet(PageStoreImpl.java:428) at com.alibaba.gemini.engine.pagestore.PageStoreImpl.get(PageStoreImpl.java:271) at com.alibaba.gemini.engine.pagestore.PageStoreImpl.get(PageStoreImpl.java:112) at com.alibaba.gemini.engine.table.BinaryKListTable.get(BinaryKListTable.java:118) at com.alibaba.gemini.engine.table.BinaryKListTable.get(BinaryKListTable.java:57) at com.alibaba.flink.statebackend.gemini.subkeyed.GeminiSubKeyedListStateImpl.getOrDefault(GeminiSubKeyedListStateImpl.java:97) at com.alibaba.flink.statebackend.gemini.subkeyed.GeminiSubKeyedListStateImpl.get(GeminiSubKeyedListStateImpl.java:88) at com.alibaba.flink.statebackend.gemini.subkeyed.GeminiSubKeyedListStateImpl.get(GeminiSubKeyedListStateImpl.java:47) at com.alibaba.flink.statebackend.gemini.context.ContextSubKeyedListState.get(ContextSubKeyedListState.java:60) at com.alibaba.flink.statebackend.gemini.context.ContextSubKeyedListState.get(ContextSubKeyedListState.java:44) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:533) at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:289) at org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1435)Cause
The state data for a single key in the List State is too large and exceeds 2 GB. The process that leads to oversized state data is as follows:
During normal job operation, values appended under a single key in a List State are combined through a merge operation, for example, in a List State that contains a window operator. This causes the state data to accumulate continuously.
When the state data accumulates to a certain point, it first triggers an out-of-memory (OOM) error. After the job recovers from the failure, the merge process of the List State can cause the size of the temporary byte array requested by the state backend to exceed the available limit, which causes this exception.
NoteRocksDBStateBackend can also encounter similar issues and trigger an ArrayIndexOutOfBoundsException or a segmentation fault. For more information, see The EmbeddedRocksDBStateBackend.
Solution
If a window operator is causing the state data to become too large, you can reduce the window size.
If the job logic is inefficient, you can adjust the logic. For example, you can split the key.
Error: org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Too many ongoing snapshots.
Error details
org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Too many ongoing snapshots. Increase kafka producers pool size or decrease number of concurrent checkpointsCause
This error is caused by multiple consecutive checkpoint failures when Kafka is used as a sink.
Solution
You can adjust the checkpoint timeout duration using the
execution.checkpointing.timeoutparameter to ensure that checkpoints do not fail due to timeouts. For more information about how to configure parameters, see How do I configure custom runtime parameters for a job?
Error: Exceeded checkpoint tolerable failure threshold
Error details
org.apache.flink.util.FlinkRuntimeException:Exceeded checkpoint tolerable failure threshold. at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleJobLevelCheckpointException(CheckpointFailureManager.java:66)Cause
The configured number of tolerable checkpoint failures is too low. When the number of failed checkpoints exceeds this threshold, the job triggers a failover. If this parameter is not set, no checkpoint failures are tolerated by default.
Solution
You can set the `num` value of the
execution.checkpointing.tolerable-failed-checkpoints: numparameter to adjust the number of checkpoint failures that the job can tolerate. The `num` value must be 0 or a positive integer. If `num` is 0, no checkpoint exceptions or failures are allowed. For more information about how to configure parameters, see How do I configure custom runtime parameters for a job?