All Products
Search
Document Center

Realtime Compute for Apache Flink:FAQ about checkpoints or savepoints of a deployment

Last Updated:Mar 12, 2025

This topic provides answers to some frequently asked questions about checkpoints or savepoints of a Realtime Compute for Apache Flink deployment.

When miniBatch is enabled, no new data is updated after the TTL of state data expires. Why?

If miniBatch is enabled, data is calculated in batches and stored in the state. The data in the state is based on the previous full data calculation results. If the state is cleared because the time to live (TTL) of the state data expires, the accumulated previous calculation results are also cleared. As a result, the state data cannot be updated based on the data that is calculated in batches.

If miniBatch is disabled, when the TTL of the state data expires, the data of the expired key is cumulatively calculated again. Therefore, the state data can always be updated. The increase of data update frequency will cause other issues such as data processing delay.

You need to configure the miniBatch and TTL based on your business requirements.

How do I calculate the start time of the next checkpoint?

The current interval and the minimum interval can affect the start time of the next checkpoint. When both of the following conditions are met at a time point, the next checkpoint is triggered:

  • The current interval is the minimum time difference between <last start time, next start time>.

  • The minimum interval is the minimum time difference between <last end time, next start time>.

In the following scenarios, the minimum interval between checkpoint attempts is 3 minutes, the minimum interval is 3 minutes, and the timeout period is 10 minutes.

  • Scenario 1: The deployment runs as expected. Each checkpoint is successful.

    The first checkpoint is triggered at 12:00:00 and becomes successful at 12:00:02. The second checkpoint is triggered at 12:03:00.

  • Scenario 2: The deployment is abnormal. (A checkpoint times out or fails due to specific reasons. In this example, the checkpoint times out.)

    The first checkpoint is triggered at 12:00:00 and becomes successful at 12:00:02. The second checkpoint is triggered at 12:03:00 but fails at 12:13:00 because the checkpoint times out. The third checkpoint is triggered at 12:16:00.

For more information about the setting of the minimum checkpoint interval, see Tuning Checkpointing.

What are the differences between GeminiStateBackend in VVR 8.X and GeminiStateBackend in VVR 6.X?

By default, GeminiStateBackend V3 is used in Ververica Runtime (VVR) 6.X of Realtime Compute for Apache Flink, and GeminiStateBackend V4 is used in VVR 8.X of Realtime Compute for Apache Flink.

Item

Description

Basic capabilities

  • V3: GeminiStateBackend supports various features, such as key-value separation, storage-computing separation, standard or native deployment savepoints, and state lazy loading.

  • V4: The core architecture and features of GeminiStateBackend are upgraded based on stream processing characteristics. GeminiStateBackend V4 supports all features provided by GeminiStateBackend V3 and provides better state access performance and faster scaling.

State lazy loading configuration

  • V4: state.backend.gemini.file.cache.download.type: LazyDownloadOnRestore

  • V3: state.backend.gemini.file.cache.lazy-restore: ON

Managed memory

The two versions differ only in the resident set size (RSS) metric in terms of the managed memory.

  • V4: GeminiStateBackend applies for memory from the operating system and uses the RSS metric to collect memory usage only when GeminiStateBackend actually requires the memory.

  • V3: GeminiStateBackend directly applies for memory from the operating system and manages the memory in a deployment. The memory for which GeminiStateBackend applies is calculated by using the following formula: Managed memory of the state × 80%. The applied memory is reflected in the RSS metric when the deployment starts.

Note

For more information about managed memory, see Set up TaskManager Memory.

What do I do if the size of an incremental checkpoint is the same as the size of a full checkpoint?

If the size of an incremental checkpoint is the same as the size of a full checkpoint when you use Realtime Compute for Apache Flink, perform the following operations to check whether an issue occurs:

  • Check whether the incremental savepoint is properly configured and takes effect.

  • Check whether the deployment runs in special scenarios. In some special scenarios, the size of an incremental checkpoint is expected to be the same as the size of the full checkpoint. Example:

    1. Before data is injected into a deployment at 18:29, the deployment does not process any data. In this case, the checkpoint contains only the initialized state data of the data source. The checkpoint is a full checkpoint because no state data exists.

    2. A total of 1,000,000 data records are injected into the deployment at 18:29. If the data records are completely processed within the current checkpoint interval and no other data is injected into the deployment during the interval, the first incremental checkpoint that is generated during the interval contains all state data of the 1,000,000 data records. The checkpoint interval is 3 minutes.

    In this case, the size of the incremental checkpoint is the same as the size of the full checkpoint. The first incremental checkpoint must contain the state of full data to ensure that the entire state can be restored from the incremental checkpoint. Therefore, the first incremental checkpoint is actually a full checkpoint.

    In most cases, if the data input is stable and no large-scale state changes occur, the sizes of the second and subsequent incremental checkpoints are different from the size of the full checkpoint. This indicates that the system creates savepoints only for the incremental data of the state data as expected. If the size of the second checkpoint or a subsequent checkpoint is the same as the size of the full checkpoint, check the status and behavior of the system to determine whether a system issue occurs.

What do I do if checkpoints for Python deployments are created at a low speed?

  • Cause

    Python operators contain a cache. When the system creates a checkpoint, the system must process all data in the cache. If the performance of Python user-defined functions (UDFs) is poor, the time that is required to create a checkpoint increases. This affects the execution of Python deployments.

  • Solution

    Reduce the amount of data that can be cached. You can add the following configurations to the Other Configuration field in the Parameters section of the Configuration tab. For more information, see How do I configure custom parameters for deployment running?

    python.fn-execution.bundle.size: The maximum number of elements that can be included in a bundle. Default value: 100000. 
    python.fn-execution.bundle.time: Default value: 1000. Unit: milliseconds.

    For more information about the parameters, see Flink Python Configuration.

What do I do if an error occurs during checkpointing of a deployment?

  1. Identify the error type

    You can view the historical information of checkpoints on the Alarm or State tab to identify the error type, such as checkpointing timeout or write failures.

    image

  2. Troubleshoot errors of different types

    • Scenario 1: When checkpointing timeout frequently occurs, you need to check whether the deployment has backpressure. If the deployment has backpressure, analyze the root causes of the backpressure, locate the slow operators, and adjust related resources or configurations. For more information about how to troubleshoot backpressure issues, see How do I troubleshoot backpressure issues?

    • Scenario 2: When a write failure occurs during checkpointing, you need to perform the following steps to view TaskManager logs and troubleshoot the error based on logs.

      1. On the Checkpoints subtab of the Logs tab of the deployment, click Checkpoints History.

        image

      2. Click the plus (+) sign on the left of the abnormal checkpoint to view details about the operators related to the abnormal checkpoint.

      3. Click the plus (+) sign on the left of the abnormal operator, and click the value in the ID column of the abnormal subtask to go to the related TaskManager.

        image

What do I do if the error message "You are using the new V4 state engine to restore old state data from a checkpoint" appears?

  • Error details

    When I upgrade the VVR version from 6.X to 8.X, the error message "You are using the new V4 state engine to restore old state data from a checkpoint" appears.

  • Cause

    The GeminiStateBackend version used by VVR 6.X is inconsistent with the GeminiStateBackend version used by VVR 8.X. Therefore, the checkpoints of the two version are incompatible.

  • Solution

    Use one of the following methods to resolve the issue:

    • Create a savepoint in the standard format for your deployment and start the deployment from the state of the savepoint. For more information, see Manually create a savepoint and Start a deployment.

    • Restart your deployment without states.

    • (Not recommended) Use GeminiStateBackend V3. In this case, you must configure state.backend.gemini.engine.type: STREAMING and restart your deployment. For more information about how to configure parameters, see How do I configure parameters for deployment running?

    • (Not recommended) Use the VVR 6.X engine to start the deployment.

What do I do if the error message "java.lang.NegativeArraySizeException" appears?

  • Error details

    If a deployment uses the list state, the following exception occurs when the deployment is running:

    Caused by: java.lang.NegativeArraySizeException
      at com.alibaba.gemini.engine.rm.GUnPooledByteBuffer.newTempBuffer(GUnPooledByteBuffer.java:270)
      at com.alibaba.gemini.engine.page.bmap.BinaryValue.merge(BinaryValue.java:85)
      at com.alibaba.gemini.engine.page.bmap.BinaryValue.merge(BinaryValue.java:75)
      at com.alibaba.gemini.engine.pagestore.PageStoreImpl.internalGet(PageStoreImpl.java:428)
      at com.alibaba.gemini.engine.pagestore.PageStoreImpl.get(PageStoreImpl.java:271)
      at com.alibaba.gemini.engine.pagestore.PageStoreImpl.get(PageStoreImpl.java:112)
      at com.alibaba.gemini.engine.table.BinaryKListTable.get(BinaryKListTable.java:118)
      at com.alibaba.gemini.engine.table.BinaryKListTable.get(BinaryKListTable.java:57)
      at com.alibaba.flink.statebackend.gemini.subkeyed.GeminiSubKeyedListStateImpl.getOrDefault(GeminiSubKeyedListStateImpl.java:97)
      at com.alibaba.flink.statebackend.gemini.subkeyed.GeminiSubKeyedListStateImpl.get(GeminiSubKeyedListStateImpl.java:88)
      at com.alibaba.flink.statebackend.gemini.subkeyed.GeminiSubKeyedListStateImpl.get(GeminiSubKeyedListStateImpl.java:47)
      at com.alibaba.flink.statebackend.gemini.context.ContextSubKeyedListState.get(ContextSubKeyedListState.java:60)
      at com.alibaba.flink.statebackend.gemini.context.ContextSubKeyedListState.get(ContextSubKeyedListState.java:44)
      at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:533)
      at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:289)
      at org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1435)
  • Cause

    The size of the state data of a single key in the list state exceeds 2 GB. A large amount of state data is generated in the following process:

    1. When a deployment runs as expected, the values that are appended to the values for a key in the list state are combined in the merge process. For example, the state data is continuously accumulated for a deployment that uses the list state of window operators.

    2. When the size of the state data accumulates to a specific threshold, an out-of-memory (OOM) error is triggered. After the deployment recovers from the failure, the merge process of the list state causes the size of the temporary byte array that is requested by the state backend to exceed the limit. As a result, this exception occurs.

    Note

    When you use RocksDBStateBackend, this issue may also occur and the error message "ArrayIndexOutOfBoundsException" or "Segmentation fault" appears. For more information, see The EmbeddedRocksDBStateBackend.

  • Solution

    • If window operators generate a large amount of state data, we recommend that you reduce the window size.

    • If the deployment logic generates an excessively large amount of state data, we recommend that you modify the deployment logic. For example, you can split keys.

What do I do if the error message "org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Too many ongoing snapshots." appears?

  • Error details

    org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Too many ongoing snapshots. Increase kafka producers pool size or decrease number of concurrent checkpoints
  • Cause

    This error is caused by multiple consecutive checkpoint failures if Kafka is used as the sink.

  • Solution

    Modify the timeout period of a checkpoint by setting the execution.checkpointing.timeout parameter to ensure that the checkpoint does not fail due to timeout. For more information about how to configure parameters, see How do I configure custom parameters for deployment running?

What do I do if the error message "Exceeded checkpoint tolerable failure threshold" appears?

  • Error details

    org.apache.flink.util.FlinkRuntimeException:Exceeded checkpoint tolerable failure threshold.
      at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleJobLevelCheckpointException(CheckpointFailureManager.java:66)
  • Cause

    The number of checkpoint failures that can be tolerated is set to a small value. The deployment triggers a failover after the number of checkpoint failures exceeds the limit. The number of checkpoint failures that can be tolerated is not specified. Any checkpoint failure cannot be tolerated by default.

  • Solution

    Specify the execution.checkpointing.tolerable-failed-checkpoints: num parameter to adjust the number of checkpoint failures that can be tolerated for a deployment. This parameter must be set to 0 or a positive integer. If the parameter is set to 0, no checkpoint exceptions or failures can be tolerated. For more information about how to configure parameters, see How do I configure custom parameters for deployment running?