This page answers common questions about checkpoints and job snapshots in Realtime Compute for Apache Flink.
Configuration quick reference
The following table summarizes all checkpoint-related parameters mentioned in this FAQ. Use this as a starting point when tuning checkpoint behavior.
| Parameter | Default | Unit | Description |
|---|---|---|---|
execution.checkpointing.timeout |
— | ms | Maximum time a checkpoint can run before it is marked as failed |
execution.checkpointing.tolerable-failed-checkpoints |
0 | count | Number of checkpoint failures tolerated before a failover is triggered. 0 means no failures are tolerated. |
python.fn-execution.bundle.size |
100000 | entries | Maximum number of entries buffered in the Python operator cache before flushing |
python.fn-execution.bundle.time |
1000 | ms | Maximum time entries are held in the Python operator cache before flushing |
For instructions on applying these parameters, see How do I configure custom runtime parameters for a job?
Why is no new data updated after table.exec.state.ttl expires when mini-batch is enabled?
When mini-batch is enabled, data is computed in batches and the results are accumulated in state. If the state for a key is purged because its time to live (TTL) expires, the accumulated computation results for that key are also lost — so no further updates are produced from the mini-batch output.
When mini-batch is disabled, an expired key's data is re-accumulated from scratch and output normally, so updates continue. The trade-off is a higher update frequency, which can introduce data processing delays.
Configure mini-batch and TTL together based on your business requirements.
How do I calculate the start time of the next periodic checkpoint?
Two parameters control when the next checkpoint starts. Both conditions must be satisfied before the checkpoint triggers:
-
Checkpointing interval: The minimum time between the *start* of the previous checkpoint and the *start* of the next checkpoint.
-
Minimum pause: The minimum time between the *end* of the previous checkpoint and the *start* of the next checkpoint.
The following examples use an interval of 3 minutes, a minimum pause of 3 minutes, and a timeout of 10 minutes.
Normal run (all checkpoints succeed)
The first checkpoint starts at 12:00:00 and completes at 12:00:02. The second checkpoint starts at 12:03:00, driven by the interval condition.
Abnormal run (a checkpoint times out)
The first checkpoint starts at 12:00:00 and completes at 12:00:02. The second checkpoint starts at 12:03:00, times out, and fails at 12:13:00. The third checkpoint can only start after both conditions are met: the interval condition (12:03:00 + 3 min = 12:06:00) and the minimum pause condition (12:13:00 + 3 min = 12:16:00). The binding constraint is the minimum pause, so the third checkpoint starts at 12:16:00.
For more information about tuning these parameters, see Tuning checkpointing.
What are the differences between GeminiStateBackend in VVR 8.x and VVR 6.x?
Ververica Runtime (VVR) 6.x uses the V3 version of GeminiStateBackend by default. VVR 8.x uses the V4 version.
| Category | V3 (VVR 6.x) | V4 (VVR 8.x) |
|---|---|---|
| Core features | Key-value separation, separation of storage and compute, standard or native format job snapshots, lazy state loading | All V3 features, plus improved state access performance and faster scaling |
| Lazy state loading parameter | state.backend.gemini.file.cache.lazy-restore: ON |
state.backend.gemini.file.cache.download.type: LazyDownloadOnRestore |
| Managed memory (RSS behavior) | Requests state's managed memory * 80% from the OS immediately at job start; this amount is reflected in the Resident Set Size (RSS) metric from the moment the job starts |
Requests memory from the OS only when it is actually used; RSS reflects actual consumption rather than reserved capacity |
For more information about managed memory, see TaskManager memory.
Is it normal for a full checkpoint and an incremental checkpoint to be the same size?
Yes, in certain situations this is expected behavior. The first incremental checkpoint must contain the complete state at that point so that the job can recover from it — making it effectively a full checkpoint.
For example, suppose a job has processed no data before 18:29 (the checkpoint contains only source initialization state). At 18:29, one million entries are ingested and fully processed within one checkpointing interval, with no additional data arriving during that period. The resulting incremental checkpoint contains all the state generated by those one million entries, so its size equals a full checkpoint.
From the second checkpoint onward, if data input is stable and state changes are incremental, subsequent checkpoints should be significantly smaller. If they remain the same size as full checkpoints, verify that incremental snapshots are correctly configured and in effect.
What should I do if checkpoints are slow for a Python job?
Python operators maintain an internal cache. During a checkpoint, all data in the cache must be processed before the snapshot can complete. If a Python user-defined function (UDF) is slow, the cache takes longer to drain, increasing checkpoint duration and affecting overall job performance.
To reduce checkpoint latency, lower the cache size by adding the following parameters in the Additional Configurations section of the job:
# Maximum number of entries held in the cache (default: 100000)
python.fn-execution.bundle.size
# Maximum time entries are held in the cache, in milliseconds (default: 1000)
python.fn-execution.bundle.time
For parameter details, see Flink Python configuration.
How do I troubleshoot abnormal checkpoints in a job?
Step 1: Identify the exception type
On the Monitoring and Alerts tab or in State Management, open the checkpoint history to determine whether the failure is a timeout, a write error, or another type.
Step 2: Act based on the exception type
-
Frequent checkpoint timeouts — The most common cause is backpressure. Identify the slow operator and adjust its resources or configuration. See How do I troubleshoot job backpressure?
-
Checkpoint write failures — Retrieve the TaskManager (TM) logs: Analyze the TM logs to determine the root cause.
-
On the job logs page, go to the Checkpoints tab and click Checkpoints History.

-
Click the (+) icon next to the failed checkpoint to expand operator-level checkpoint status.
-
Click the (+) icon next to the failing operator, then click the subtask ID to open the corresponding TM.

-
Error: You are using the new V4 state engine to restore old state data from a checkpoint
This error occurs when upgrading from VVR 6.x to VVR 8.x. The V3 and V4 versions of GeminiStateBackend use incompatible checkpoint formats.
Resolve the issue using one of the following methods, listed from most to least recommended:
-
Create a job snapshot in standard format and restart from that state. See Manually create a job snapshot and Start a job.
-
Restart the job without state.
-
(Not recommended) Continue using the V3 Gemini engine. Configure
state.backend.gemini.engine.type: STREAMINGand restart the job. See How do I configure runtime parameters for a job? -
(Not recommended) Continue using the VVR 6.x engine to start the job.
Error: java.lang.NegativeArraySizeException
This exception occurs when a job uses List State and the state data for a single key exceeds 2 GB.
Caused by: java.lang.NegativeArraySizeException
at com.alibaba.gemini.engine.rm.GUnPooledByteBuffer.newTempBuffer(GUnPooledByteBuffer.java:270)
at com.alibaba.gemini.engine.page.bmap.BinaryValue.merge(BinaryValue.java:85)
at com.alibaba.gemini.engine.page.bmap.BinaryValue.merge(BinaryValue.java:75)
at com.alibaba.gemini.engine.pagestore.PageStoreImpl.internalGet(PageStoreImpl.java:428)
at com.alibaba.gemini.engine.pagestore.PageStoreImpl.get(PageStoreImpl.java:271)
at com.alibaba.gemini.engine.pagestore.PageStoreImpl.get(PageStoreImpl.java:112)
at com.alibaba.gemini.engine.table.BinaryKListTable.get(BinaryKListTable.java:118)
at com.alibaba.gemini.engine.table.BinaryKListTable.get(BinaryKListTable.java:57)
at com.alibaba.flink.statebackend.gemini.subkeyed.GeminiSubKeyedListStateImpl.getOrDefault(GeminiSubKeyedListStateImpl.java:97)
at com.alibaba.flink.statebackend.gemini.subkeyed.GeminiSubKeyedListStateImpl.get(GeminiSubKeyedListStateImpl.java:88)
at com.alibaba.flink.statebackend.gemini.subkeyed.GeminiSubKeyedListStateImpl.get(GeminiSubKeyedListStateImpl.java:47)
at com.alibaba.flink.statebackend.gemini.context.ContextSubKeyedListState.get(ContextSubKeyedListState.java:60)
at com.alibaba.flink.statebackend.gemini.context.ContextSubKeyedListState.get(ContextSubKeyedListState.java:44)
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:533)
at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:289)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1435)
Values appended under a single key in a List State are combined through a merge operation. In jobs with a window operator, these merged values accumulate continuously. When the accumulated size exceeds 2 GB, this exception is thrown — typically after an initial out-of-memory (OOM) error triggers a job recovery.
EmbeddedRocksDBStateBackend can exhibit similar behavior, resulting in an ArrayIndexOutOfBoundsException or a segmentation fault. See The EmbeddedRocksDBStateBackend.
To resolve the issue:
-
If a window operator is accumulating oversized state, reduce the window size.
-
If the job logic is causing excessive key merging, refactor the logic — for example, split the key to distribute state across multiple smaller entries.
Error: org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Too many ongoing snapshots
org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Too many ongoing snapshots. Increase kafka producers pool size or decrease number of concurrent checkpoints
This error occurs when multiple checkpoints fail consecutively while Apache Kafka is used as a sink.
Increase the checkpoint timeout using the execution.checkpointing.timeout parameter so that checkpoints have enough time to complete before being marked as failed. See How do I configure custom runtime parameters for a job?
Error: Exceeded checkpoint tolerable failure threshold
org.apache.flink.util.FlinkRuntimeException:Exceeded checkpoint tolerable failure threshold.
at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleJobLevelCheckpointException(CheckpointFailureManager.java:66)
By default, no checkpoint failures are tolerated (execution.checkpointing.tolerable-failed-checkpoints is not set, which is equivalent to 0). When the number of failed checkpoints exceeds the configured threshold, the job triggers a failover.
Adjust the tolerance by setting:
execution.checkpointing.tolerable-failed-checkpoints: <num>
<num> must be 0 or a positive integer. Setting it to 0 means no checkpoint failures are allowed; any failure immediately triggers a failover.