This topic describes the major features and bug fixes in Blink 3.4.3.

Major features

GeminiStateBackend is the new generation of a backend platform that uses GeminiDB. GeminiDB is a storage engine developed by Alibaba Cloud. The performance of GeminiStateBackend is 1.5 times that of NiagaraStateBackend. This performance rating is based on tests that are performed on running jobs. GeminiStateBackend has the following major benefits:
  • Uses LSM-based indexing and hash indexing. LSM refers to log-structured merge-tree. LSM is adopted to improve write performance, and hash indexes are stored in memory to optimize LSM read amplification. Specifically, GeminiDB divides each file into different pages, and flushes and compresses data by page. Hash indexes are used to quickly locate the page where the data is stored based on keys. This way, the number of I/O operations is reduced and read performance is improved.
  • Optimizes the cache policy. GeminiDB caches important information in memory, such as newly inserted data and compressed data that includes hotspots. For traditional LSM-based storage, the data is first flushed to disks. New data is cached after at least one read I/O operation is performed. This process reduces the cache hit ratio.
  • Optimizes the policy of flushing data to disks. GeminiDB flushes data to disks only after the cached data occupies all the memory space. Therefore, no data files are generated if the memory space is sufficient and data is compressed in a timely manner. For traditional LSM-based storage, data is flushed to disks for persistence. If Blink is used, this process is no longer required. Blink provides the checkpointing mechanism to ensure data consistency, and data can be persisted when checkpoints are created.
  • Supports in-memory compaction. The data records that reside in memory are relocated in a timely manner to maximize the available space. This allows you to optimize write amplification and reduce read I/O operations.
  • Eliminates the Java Native Interface (JNI) overhead of RocksDB or Niagara by using Java.
  • Supports incremental checkpointing.
  • Supports the local recovery feature, which enables quick recovery after a job fails.
  • Supports separation of computing from storage, which enables quick recovery after a job is restarted or rescaled. This feature is continuously optimized to improve user experience.
GeminiStateBackend requires the following configurations for DataStream and SQL jobs:
  • DataStream jobs
    • API configuration
      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      GeminiStateBackend stateBackend = new GeminiStateBackend(checkpointDir);
      // Configuration for gemini
      Configuration config = new Configuration();
      config.setString("state.backend.gemini.heap.size", "1024mb");
      // set configuration to backend
      stateBackend.setConfiguration(conf);
      // use gemini as state backend
      env.setStateBackend(new GeminiStateBackend(checkpointDir));
    • Parameters
      Parameter Data type Unit Default value Description
      state.backend.gemini.ttl.ms LONG ms -1 (This value indicates that this feature is disabled by default.) Optional. The data retention period.
      state.backend.gemini.heap.size STRING The following units are supported:
      • 1024
      • 1024kb
      • 1024mb
      • 1024gb
      No default value Optional. The memory size that can be used for a single GeminiDB database.
      Note We recommend that you specify this parameter. If you do not specify the parameter, the backend calculates the default value based on the Java Virtual Machine (JVM) and TaskManager configurations.
  • SQL jobs
    # Use GeminiStateBackend as the backend.
    state.backend.type=gemini
    # Set the time to live (TTL) of the state data.
    state.backend.gemini.ttl.ms=129600000
    # Set the memory size that can be used for a single GeminiDB database. The unit is MB. Note that the memory resources of operators must include the memory size that can be used for a single GeminiDB database. The default memory size is 512 MB.
    state.backend.gemini.heap.size.mb=512
    # Configure the JVM parameters. Recommended configurations:
    blink.job.option=-yD env.java.opts.taskmanager='-XX:NewRatio=3 -XX:SurvivorRatio=3 -XX:ParallelGCThreads=8 -XX:+UnlockDiagnosticVMOptions -XX:ParGCCardsPerStrideChunk=4096 -XX:+UseCMSInitiatingOccupancyOnly -XX:CMSInitiatingOccupancyFraction=75 -Djdk.nio.maxCachedBufferSize=10240'

Major bug fixes

  • Fixes the bug that causes the Calc operator to encounter a null pointer exception (NPE) during code generation.
  • Fixes the bug that requires complete rows to be used for state storage.
  • Fixes a bug in the code splitting component. When JavaCodeSplitter converts local variables to member fields, JavaCodeSplitter does not process the local variables inside a "for each" control. This bug causes invalid calculations when you use DISTINCT filtering.