This topic provides answers to some frequently asked questions about state of fully managed Flink.

What are the differences between GeminiStateBackend in VVR 3.X.X and GeminiStateBackend in VVR 4.X.X?

GeminiStateBackend in Ververica Runtime (VVR) 4.X.X provides better performance than in VVR 3.X.X. GeminiStateBackend in VVR 3.X.X does not provide optimized features and new features. Only issues of GeminiStateBackend are fixed. We recommend that you update the VVR version to 4.X.X to use optimized features and new features and obtain better performance.

What do I do if the error message "No space left on device" is returned?

  • Problem description
    When a job is running, an error message that is similar to the following information is returned:
    java.io.IOException: No space left on device
      at java.io.FileOutputStream.writeBytes(Native Method) ~[?:1.8.0_102]
      at java.io.FileOutputStream.write(FileOutputStream.java:326) ~[?:1.8.0_102]
      at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) ~[?:1.8.0_102]
      at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140) ~[?:1.8.0_102]
      at java.io.FilterOutputStream.close(FilterOutputStream.java:158) ~[?:1.8.0_102]
      at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.aliyun.oss.AliyunOSSOutputStream.close(AliyunOSSOutputStream.java:82) ~[?:?]
      at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72) ~[?:?]
      at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:101) ~[?:?]
      at org.apache.flink.fs.osshadoop.common.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52) ~[?:?]
      at com.alibaba.flink.statebackend.FlinkDataOutputStreamWapper.close(FlinkDataOutputStreamWapper.java:31) ~[flink-statebackend-gemini-2.1.23-vvr-3.0-SNAPSHOT.jar:2.1.23-vvr-3.0-SNAPSHOT]
      at com.alibaba.gemini.common.io.GeminiFileOutputViewImpl.close(GeminiFileOutputViewImpl.java:188) ~[flink-statebackend-gemini-2.1.23-vvr-3.0-SNAPSHOT.jar:2.1.23-vvr-3.0-SNAPSHOT]
      at com.alibaba.gemini.engine.filecache.InfiniteFileCache.lambda$flushBatchPages$1(InfiniteFileCache.java:635) ~[flink-statebackend-gemini-2.1.23-vvr-3.0-SNAPSHOT.jar:2.1.23-vvr-3.0-SNAPSHOT]
      at com.alibaba.gemini.engine.handler.GeminiEventExecutor.lambda$execute$1(GeminiEventExecutor.java:137) ~[flink-statebackend-gemini-2.1.23-vvr-3.0-SNAPSHOT.jar:2.1.23-vvr-3.0-SNAPSHOT]
      at com.alibaba.gemini.engine.handler.GeminiEventExecutor.doEventInQueue(GeminiEventExecutor.java:86) [flink-statebackend-gemini-2.1.23-vvr-3.0-SNAPSHOT.jar:2.1.23-vvr-3.0-SNAPSHOT]
      at com.alibaba.gemini.engine.handler.GeminiEventExecutor.run(GeminiEventExecutor.java:71) [flink-statebackend-gemini-2.1.23-vvr-3.0-SNAPSHOT.jar:2.1.23-vvr-3.0-SNAPSHOT]
      at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) [flink-dist_2.11-1.12-vvr-3.0.4-SNAPSHOT.jar:1.12-vvr-3.0.4-SNAPSHOT]
      at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [flink-dist_2.11-1.12-vvr-3.0.4-SNAPSHOT.jar:1.12-vvr-3.0.4-SNAPSHOT]
      at java.lang.Thread.run(Thread.java:834) [?:1.8.0_102]
  • Cause
    The local disk space is insufficient. The maximum disk size for a single pod is 20 GB. If one of the following types of data is accumulated, the local disk space may become insufficient:
    • State data
    • Non-state data of compute nodes, such as logs
    • Old state data caused by an exception
  • Solutions
    You can determine whether state data is excessively large based on the state size in snapshots. If the amount of state data is small, submit a ticket to contact Alibaba Cloud technical support. If the error message appears because state data is excessively large, you can use one of the following solutions to fix the issue based on the VVR version of the job:
    • Solutions for VVR 4.X.X
      Use one of the following solutions:
      • Enable the compute-storage separation feature.

        To enable this feature, configure the state.backend.gemini.file.cache.type and state.backend.gemini.file.cache.preserved-space parameters. For more information, see Parameters for compute-storage separation.

      • Increase the parallelism.

        If the original parallelism is 1, only one pod can run in a job, and the total disk space is 20 GB. If you increase the parallelism to 4, four pods can run in a job, and the total disk space is 80 GB.

      • Remove expired state values from disks based on a time-to-live (TTL).

        If you configure a TTL and a state value expires based on the TTL, the system automatically removes the expired state value. This way, disk space is released.

    • Solutions for VVR 3.X.X
      Use one of the following solutions:
      • Compress state data.

        For VVR 3.0.X, configure the state.backend.gemini.page.flush.local.compression: Lz4 parameter to compress state data in your local disks. This way, less space is occupied on your local disks. However, the job performance deteriorates.

      • Enable the compute-storage separation feature.

        For VVR 3.0.3 or later, configure the state.backend.gemini.file.cache.type: LIMITED parameter. Local disks can store up to 18 GB of state data. If the data exceeds 18 GB, the excess data is stored in a remote distributed file system (DFS). In this case, the system reads the excess data from the DFS the next time the system reads the data. Each local disk is used as a local file cache.

What do I do if the error message "java.lang.IllegalArgumentException: Illegal Capacity: -1" is returned?

  • Problem description
    If a job uses the map state to traverse data, the following exception may occur at a low probability when the job is running:
    java.lang.IllegalArgumentException: Illegal Capacity: -1
      at java.util.ArrayList.<init>(ArrayList.java:156)
      at com.alibaba.gemini.engine.pagestore.PageStoreImpl$3.<init>(PageStoreImpl.java:1113)
      at com.alibaba.gemini.engine.pagestore.PageStoreImpl.prefixIterator(PageStoreImpl.java:1094)
      at com.alibaba.gemini.engine.pagestore.PageStoreImpl.prefixIterator(PageStoreImpl.java:112)
      at com.alibaba.gemini.engine.table.BinaryKMapTable.internalEntries(BinaryKMapTable.java:83)
      at com.alibaba.gemini.engine.table.AbstractBinaryKMapTable.iterator(AbstractBinaryKMapTable.java:282)
      at com.alibaba.flink.statebackend.gemini.keyed.AbstractGeminiKeyedMapStateImpl.doIterator(AbstractGeminiKeyedMapStateImpl.java:496)
      at com.alibaba.flink.statebackend.gemini.keyed.AbstractGeminiKeyedMapStateImpl.iteratorWithMetrics(AbstractGeminiKeyedMapStateImpl.java:501)
      at com.alibaba.flink.statebackend.gemini.keyed.AbstractGeminiKeyedMapStateImpl.iterator(AbstractGeminiKeyedMapStateImpl.java:489)
      at com.alibaba.flink.statebackend.gemini.context.ContextMapState.entries(ContextMapState.java:97)
      at org.apache.flink.runtime.state.ttl.TtlMapState.entries(TtlMapState.java:107)
      at org.apache.flink.runtime.state.ttl.TtlMapState.entries(TtlMapState.java:102)
      at org.apache.flink.runtime.state.UserFacingMapState.entries(UserFacingMapState.java:77)
      at org.apache.flink.table.runtime.operators.join.stream.state.OuterJoinRecordStateViews$InputSideHasNoUniqueKey$1.<init>(OuterJoinRecordStateViews.java:279)
      at org.apache.flink.table.runtime.operators.join.stream.state.OuterJoinRecordStateViews$InputSideHasNoUniqueKey.getRecordsAndNumOfAssociations(OuterJoinRecordStateViews.java:276)
      at org.apache.flink.table.runtime.operators.join.stream.AbstractStreamingJoinOperator$AssociatedRecords.of(AbstractStreamingJoinOperator.java:229)
      at org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.processElement(StreamingJoinOperator.java:216)
      at org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.processRight(StreamingJoinOperator.java:134)
      at org.apache.flink.table.runtime.operators.join.stream.AbstractStreamingJoinOperator.processElement2(AbstractStreamingJoinOperator.java:136)
      at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.processRecord2(StreamTwoInputProcessorFactory.java:221)
      at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.lambda$create$1(StreamTwoInputProcessorFactory.java:190)
      at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:291)
      at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
      at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
      at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
      at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:98)
      at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
      at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
      at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:684)
      at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:639)
      at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650)
      at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623)
      at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
      at java.lang.Thread.run(Thread.java:834)
  • Cause

    This is a known issue. This issue occurs only in VVR 4.0.10.

  • Solution

    Update the VVR version to 4.0.11 or later.

What do I do if the error message "java.lang.NegativeArraySizeException" is returned?

  • Problem description
    If a job uses the list state, the following exception occurs when the job is running:
    Caused by: java.lang.NegativeArraySizeException
      at com.alibaba.gemini.engine.rm.GUnPooledByteBuffer.newTempBuffer(GUnPooledByteBuffer.java:270)
      at com.alibaba.gemini.engine.page.bmap.BinaryValue.merge(BinaryValue.java:85)
      at com.alibaba.gemini.engine.page.bmap.BinaryValue.merge(BinaryValue.java:75)
      at com.alibaba.gemini.engine.pagestore.PageStoreImpl.internalGet(PageStoreImpl.java:428)
      at com.alibaba.gemini.engine.pagestore.PageStoreImpl.get(PageStoreImpl.java:271)
      at com.alibaba.gemini.engine.pagestore.PageStoreImpl.get(PageStoreImpl.java:112)
      at com.alibaba.gemini.engine.table.BinaryKListTable.get(BinaryKListTable.java:118)
      at com.alibaba.gemini.engine.table.BinaryKListTable.get(BinaryKListTable.java:57)
      at com.alibaba.flink.statebackend.gemini.subkeyed.GeminiSubKeyedListStateImpl.getOrDefault(GeminiSubKeyedListStateImpl.java:97)
      at com.alibaba.flink.statebackend.gemini.subkeyed.GeminiSubKeyedListStateImpl.get(GeminiSubKeyedListStateImpl.java:88)
      at com.alibaba.flink.statebackend.gemini.subkeyed.GeminiSubKeyedListStateImpl.get(GeminiSubKeyedListStateImpl.java:47)
      at com.alibaba.flink.statebackend.gemini.context.ContextSubKeyedListState.get(ContextSubKeyedListState.java:60)
      at com.alibaba.flink.statebackend.gemini.context.ContextSubKeyedListState.get(ContextSubKeyedListState.java:44)
      at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:533)
      at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:289)
      at org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1435)
  • Cause
    The size of state data of a single key in the list state exceeds 2 GB. A large amount of state data is generated in the following process:
    1. When a job runs as expected, the values that are appended to the values for a key in the list state are combined in the merge process. For example, the state data is continuously accumulated for a job that uses the list state of window operators.
    2. When the state data accumulates to a specific size, an out-of-memory (OOM) error is triggered. After the job recovers from the failure, the merge process of the list state further causes the size of the temporary byte array that is applied by the state backend to exceed the available amount. As a result, this exception occurs.
    Note When you use RocksDBStateBackend, this issue may also occur and the error message "ArrayIndexOutOfBoundsException" or "Segmentation fault" appears. For more information, see The EmbeddedRocksDBStateBackend.
  • Solutions
    • If window operators generate a large amount of state data, we recommend that you reduce the window size.
    • If the job logic generates a large amount of state data, we recommend that you modify the job logic. For example, you can split keys.