All Products
Search
Document Center

Realtime Compute for Apache Flink:FAQ about checkpoints or savepoints of a deployment

Last Updated:Mar 06, 2024

This topic provides answers to some frequently asked questions about checkpoints or savepoints of a fully managed Flink deployment.

How is the minimum time interval between two checkpoints calculated?

The minimum time interval between two checkpoints is calculated from the last successful checkpoint. If the minimum duration between checkpoint attempts is set to 3 and the minimum checkpoint interval is set to 5, the checkpoint interval is adjusted to 5.

In the following scenarios, the minimum duration between checkpoint attempts is 3 minutes, the timeout period is 10 minutes, and the minimum checkpoint interval is 5 minutes.

  • Scenario 1: The deployment runs as expected. Each checkpoint is successful.

    The first checkpoint is triggered at 12:00:00 and becomes successful at 12:00:02. The second checkpoint is triggered at 12:05:02.

  • Scenario 2: The deployment is abnormal. (A checkpoint times out or fails due to specific reasons. In this example, the checkpoint times out.)

    The first checkpoint is triggered at 12:00:00 and becomes successful at 12:00:02. The second checkpoint is triggered at 12:05:02 but fails at 12:15:02 because the checkpoint times out. The third checkpoint is triggered at 12:15:02.

For more information about the setting of the minimum checkpoint interval, see Tuning Checkpointing.

What are the differences between GeminiStateBackend in VVR 8.X and GeminiStateBackend in VVR 6.X?

By default, GeminiStateBackend V3 is used in Ververica Runtime (VVR) 6.X of Realtime Compute for Apache Flink, and GeminiStateBackend V4 is used in VVR 8.X of Realtime Compute for Apache Flink.

Item

Description

Basic capabilities

  • V3: GeminiStateBackend supports various features, such as key-value separation, storage-computing separation, standard or native deployment savepoints, and state lazy loading.

  • V4: The core architecture and features of GeminiStateBackend are upgraded based on stream processing characteristics. GeminiStateBackend V4 supports all features provided by GeminiStateBackend V3 and provides better state access performance and faster scaling.

State lazy loading configuration

  • V4: state.backend.gemini.file.cache.download.type: LazyDownloadOnRestore

  • V3: state.backend.gemini.file.cache.lazy-restore: ON

Managed memory

The two versions differ only in the resident set size (RSS) metric in terms of the managed memory.

  • V4: GeminiStateBackend applies for memory from the operating system and uses the RSS metric to collect memory usage only when GeminiStateBackend actually requires the memory.

  • V3: GeminiStateBackend directly applies for memory from the operating system and manages the memory in a deployment. The memory for which GeminiStateBackend applies is calculated by using the following formula: Managed memory of the state × 80%. The memory used by GeminiStateBackend is reflected by using the RSS metric when the deployment starts.

Note

For more information about managed memory, see Set up TaskManager Memory.

What do I do if the error message "org.apache.flink.util.SerializedThrowable" appears?

  • Error details

    image

  • Cause

    When you use GeminiStateBackend V3 to create savepoints, the NullPointerException (NPE) error may occur at an extremely low probability. In most cases, this error occurs because the internal memory structure reference is 0 but is not reclaimed in a timely manner.

  • Solution

    • In most cases, this error may be fixed after the system runs for a specific period of time or after a restart. This error only causes checkpoint failures but does not affect the correctness of data. You can increase the maximum number of restarts that are allowed when a checkpoint failure occurs.

    • Upgrade the VVR version to 8.0.1 or later. For more information, see Upgrade the engine version of deployments.

What do I do if the error message "You are using the new V4 state engine to restore old state data from a checkpoint" appears?

  • Problem description

    When I upgrade the VVR version from 6.X to 8.X, the error message "You are using the new V4 state engine to restore old state data from a checkpoint" appears.

  • Cause

    The checkpoints of GeminiStateBackend V3 and GeminiStateBackend V4 are incompatible.

  • Solution

    Use one of the following methods to resolve the issue:

    • Create a savepoint in the standard format for your deployment and start the deployment from the state of the savepoint. For more information, see Manually create a savepoint and Start a deployment.

    • Restart your deployment without states.

    • (Not recommended) Use GeminiStateBackend V3. In this case, you must configure state.backend.gemini.engine.type: STREAMING and restart your deployment. For more information about how to configure parameters, see How do I configure parameters for deployment running?

What do I do if the error message "No space left on device" appears?

  • Problem description

    When a deployment is running, an error message that is similar to the following information appears:

    java.io.IOException: No space left on device
      at java.io.FileOutputStream.writeBytes(Native Method) ~[?:1.8.0_102]
      at java.io.FileOutputStream.write(FileOutputStream.java:326) ~[?:1.8.0_102]
      at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) ~[?:1.8.0_102]
      at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140) ~[?:1.8.0_102]
      at java.io.FilterOutputStream.close(FilterOutputStream.java:158) ~[?:1.8.0_102]
      at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.aliyun.oss.AliyunOSSOutputStream.close(AliyunOSSOutputStream.java:82) ~[?:?]
      at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72) ~[?:?]
      at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:101) ~[?:?]
      at org.apache.flink.fs.osshadoop.common.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52) ~[?:?]
      at com.alibaba.flink.statebackend.FlinkDataOutputStreamWapper.close(FlinkDataOutputStreamWapper.java:31) ~[flink-statebackend-gemini-2.1.23-vvr-3.0-SNAPSHOT.jar:2.1.23-vvr-3.0-SNAPSHOT]
      at com.alibaba.gemini.common.io.GeminiFileOutputViewImpl.close(GeminiFileOutputViewImpl.java:188) ~[flink-statebackend-gemini-2.1.23-vvr-3.0-SNAPSHOT.jar:2.1.23-vvr-3.0-SNAPSHOT]
      at com.alibaba.gemini.engine.filecache.InfiniteFileCache.lambda$flushBatchPages$1(InfiniteFileCache.java:635) ~[flink-statebackend-gemini-2.1.23-vvr-3.0-SNAPSHOT.jar:2.1.23-vvr-3.0-SNAPSHOT]
      at com.alibaba.gemini.engine.handler.GeminiEventExecutor.lambda$execute$1(GeminiEventExecutor.java:137) ~[flink-statebackend-gemini-2.1.23-vvr-3.0-SNAPSHOT.jar:2.1.23-vvr-3.0-SNAPSHOT]
      at com.alibaba.gemini.engine.handler.GeminiEventExecutor.doEventInQueue(GeminiEventExecutor.java:86) [flink-statebackend-gemini-2.1.23-vvr-3.0-SNAPSHOT.jar:2.1.23-vvr-3.0-SNAPSHOT]
      at com.alibaba.gemini.engine.handler.GeminiEventExecutor.run(GeminiEventExecutor.java:71) [flink-statebackend-gemini-2.1.23-vvr-3.0-SNAPSHOT.jar:2.1.23-vvr-3.0-SNAPSHOT]
      at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) [flink-dist_2.11-1.12-vvr-3.0.4-SNAPSHOT.jar:1.12-vvr-3.0.4-SNAPSHOT]
      at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [flink-dist_2.11-1.12-vvr-3.0.4-SNAPSHOT.jar:1.12-vvr-3.0.4-SNAPSHOT]
      at java.lang.Thread.run(Thread.java:834) [?:1.8.0_102]
  • Cause

    The local disk space is insufficient. The maximum disk size for a single pod is 20 GB. If one of the following types of data is accumulated, the local disk space may become insufficient:

    • State data

    • Non-state data of compute nodes, such as logs

    • Old state data that is accumulated due to exceptions

  • Solution

    You can determine whether the state data is excessively large based on the state size in savepoints. If the error message appears because the state data is excessively large, you can use one of the following solutions to fix the issue based on the VVR version of the deployment:

    • Solutions for VVR 4.X and VVR 6.X

      Use one of the following solutions:

      • Enable the compute-storage separation feature. By default, this feature is enabled in VVR 4.0.12 and later.

        To enable this feature, you can configure the state.backend.gemini.file.cache.type and state.backend.gemini.file.cache.preserved-space parameters. For more information, see Parameters for compute-storage separation.

      • Increase the degree of parallelism.

        If the original degree of parallelism is 1, only one pod can run in a deployment, and the total disk space is 20 GB. If you increase the degree of parallelism to 4, four pods can run in a job, and the total disk space is 80 GB.

      • Remove expired state values from disks based on time-to-live (TTL).

        If you configure TTL and a state value expires based on TTL, the system automatically removes the expired state value. This way, disk space is released.

    • Solutions for VVR 3.X.X

      Use one of the following solutions:

      • Compress the state data.

        For VVR 3.0.X, configure the state.backend.gemini.page.flush.local.compression: Lz4 parameter to compress the state data in your local disks. This way, less space is occupied on your local disks. However, deployment performance deteriorates.

      • Enable the compute-storage separation feature.

        For VVR 3.0.3 or later, configure the state.backend.gemini.file.cache.type: LIMITED parameter. Local disks can store up to 18 GB of state data. If the data size exceeds 18 GB, the excess data is stored in a remote distributed file system (DFS). In this case, the system reads the excess data from the DFS the next time the system reads the data. Each local disk is used as an on-premises file cache.

What do I do if the error message "java.lang.IllegalArgumentException: Illegal Capacity: -1" appears?

  • Problem description

    If a deployment uses the map state to traverse data, the following exception may occur at a low probability when the deployment is running:

    java.lang.IllegalArgumentException: Illegal Capacity: -1
      at java.util.ArrayList.<init>(ArrayList.java:156)
      at com.alibaba.gemini.engine.pagestore.PageStoreImpl$3.<init>(PageStoreImpl.java:1113)
      at com.alibaba.gemini.engine.pagestore.PageStoreImpl.prefixIterator(PageStoreImpl.java:1094)
      at com.alibaba.gemini.engine.pagestore.PageStoreImpl.prefixIterator(PageStoreImpl.java:112)
      at com.alibaba.gemini.engine.table.BinaryKMapTable.internalEntries(BinaryKMapTable.java:83)
      at com.alibaba.gemini.engine.table.AbstractBinaryKMapTable.iterator(AbstractBinaryKMapTable.java:282)
      at com.alibaba.flink.statebackend.gemini.keyed.AbstractGeminiKeyedMapStateImpl.doIterator(AbstractGeminiKeyedMapStateImpl.java:496)
      at com.alibaba.flink.statebackend.gemini.keyed.AbstractGeminiKeyedMapStateImpl.iteratorWithMetrics(AbstractGeminiKeyedMapStateImpl.java:501)
      at com.alibaba.flink.statebackend.gemini.keyed.AbstractGeminiKeyedMapStateImpl.iterator(AbstractGeminiKeyedMapStateImpl.java:489)
      at com.alibaba.flink.statebackend.gemini.context.ContextMapState.entries(ContextMapState.java:97)
      at org.apache.flink.runtime.state.ttl.TtlMapState.entries(TtlMapState.java:107)
      at org.apache.flink.runtime.state.ttl.TtlMapState.entries(TtlMapState.java:102)
      at org.apache.flink.runtime.state.UserFacingMapState.entries(UserFacingMapState.java:77)
      at org.apache.flink.table.runtime.operators.join.stream.state.OuterJoinRecordStateViews$InputSideHasNoUniqueKey$1.<init>(OuterJoinRecordStateViews.java:279)
      at org.apache.flink.table.runtime.operators.join.stream.state.OuterJoinRecordStateViews$InputSideHasNoUniqueKey.getRecordsAndNumOfAssociations(OuterJoinRecordStateViews.java:276)
      at org.apache.flink.table.runtime.operators.join.stream.AbstractStreamingJoinOperator$AssociatedRecords.of(AbstractStreamingJoinOperator.java:229)
      at org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.processElement(StreamingJoinOperator.java:216)
      at org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.processRight(StreamingJoinOperator.java:134)
      at org.apache.flink.table.runtime.operators.join.stream.AbstractStreamingJoinOperator.processElement2(AbstractStreamingJoinOperator.java:136)
      at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.processRecord2(StreamTwoInputProcessorFactory.java:221)
      at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.lambda$create$1(StreamTwoInputProcessorFactory.java:190)
      at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:291)
      at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
      at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
      at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
      at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:98)
      at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
      at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
      at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:684)
      at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:639)
      at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650)
      at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623)
      at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
      at java.lang.Thread.run(Thread.java:834)
  • Cause

    This is a known issue. This issue occurs only in VVR 4.0.10.

  • Solution

    Update the VVR version to 4.0.11 or later.

What do I do if the error message "java.lang.NegativeArraySizeException" appears?

  • Problem description

    If a deployment uses the list state, the following exception occurs when the deployment is running:

    Caused by: java.lang.NegativeArraySizeException
      at com.alibaba.gemini.engine.rm.GUnPooledByteBuffer.newTempBuffer(GUnPooledByteBuffer.java:270)
      at com.alibaba.gemini.engine.page.bmap.BinaryValue.merge(BinaryValue.java:85)
      at com.alibaba.gemini.engine.page.bmap.BinaryValue.merge(BinaryValue.java:75)
      at com.alibaba.gemini.engine.pagestore.PageStoreImpl.internalGet(PageStoreImpl.java:428)
      at com.alibaba.gemini.engine.pagestore.PageStoreImpl.get(PageStoreImpl.java:271)
      at com.alibaba.gemini.engine.pagestore.PageStoreImpl.get(PageStoreImpl.java:112)
      at com.alibaba.gemini.engine.table.BinaryKListTable.get(BinaryKListTable.java:118)
      at com.alibaba.gemini.engine.table.BinaryKListTable.get(BinaryKListTable.java:57)
      at com.alibaba.flink.statebackend.gemini.subkeyed.GeminiSubKeyedListStateImpl.getOrDefault(GeminiSubKeyedListStateImpl.java:97)
      at com.alibaba.flink.statebackend.gemini.subkeyed.GeminiSubKeyedListStateImpl.get(GeminiSubKeyedListStateImpl.java:88)
      at com.alibaba.flink.statebackend.gemini.subkeyed.GeminiSubKeyedListStateImpl.get(GeminiSubKeyedListStateImpl.java:47)
      at com.alibaba.flink.statebackend.gemini.context.ContextSubKeyedListState.get(ContextSubKeyedListState.java:60)
      at com.alibaba.flink.statebackend.gemini.context.ContextSubKeyedListState.get(ContextSubKeyedListState.java:44)
      at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:533)
      at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:289)
      at org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1435)
  • Cause

    The size of the state data of a single key in the list state exceeds 2 GB. A large amount of state data is generated in the following process:

    1. When a deployment runs as expected, the values that are appended to the values for a key in the list state are combined in the merge process. For example, the state data is continuously accumulated for a deployment that uses the list state of window operators.

    2. When the size of the state data accumulates to a specific threshold, an out-of-memory (OOM) error is triggered. After the deployment recovers from the failure, the merge process of the list state causes the size of the temporary byte array that is requested by the state backend to exceed the limit. As a result, this exception occurs.

    Note

    When you use RocksDBStateBackend, this issue may also occur and the error message "ArrayIndexOutOfBoundsException" or "Segmentation fault" appears. For more information, see The EmbeddedRocksDBStateBackend.

  • Solution

    • If window operators generate an excessively large amount of state data, we recommend that you reduce the window size.

    • If the deployment logic generates an excessively large amount of state data, we recommend that you modify the deployment logic. For example, you can split keys.

What do I do if the size of an incremental checkpoint is the same as the size of a full checkpoint?

If the size of an incremental checkpoint is the same as the size of a full checkpoint when you use fully managed Flink, perform the following operations to check whether an issue occurs:

  • Check whether the incremental savepoint is properly configured and takes effect.

  • Check whether the deployment runs in special scenarios. In some special scenarios, the size of an incremental checkpoint is expected to be the same as the size of the full checkpoint. Example:

    1. Before data is injected into a deployment at 18:29, the deployment does not process any data. In this case, the checkpoint contains only the initialized state data of the data source. The checkpoint is a full checkpoint because no state data exists.

    2. A total of 1,000,000 data records are injected into the deployment at 18:29. If the data records are completely processed within the current checkpoint interval and no other data is injected into the deployment during the interval, the first incremental checkpoint that is generated during the interval contains all state data of the 1,000,000 data records. The checkpoint interval is 3 minutes.

    In this case, the size of the incremental checkpoint is the same as the size of the full checkpoint. The first incremental checkpoint must contain the state of full data to ensure that the entire state can be restored from the incremental checkpoint. Therefore, the first incremental checkpoint is actually a full checkpoint.

    In most cases, if the data input is stable and no large-scale state changes occur, the sizes of the second and subsequent incremental checkpoints are different from the size of the full checkpoint. This indicates that the system creates savepoints only for the incremental data of the state data as expected. If the size of the second checkpoint or a subsequent checkpoint is the same as the size of the full checkpoint, check the status and behavior of the system to determine whether a system issue occurs.