This topic lists the major features and bug fixes in Blink 3.4.3.

Major features

GeminiStateBackend is the new generation of a backend platform that uses GeminiDB. GeminiDB is a storage engine developed by Alibaba Cloud. The performance of the GeminiStateBackend is 1.5 times that of the NiagaraStateBackend. This performance rating is based on tests that are performed on running jobs. The major benefits of GeminiStateBackend are described as follows:
  • Uses LSM-based indexing and hash indexing. LSM refers to log-structured merge-tree. LSM is adopted to improve write performance, and hash indexes are stored in memory to optimize LSM read amplification. To be more specific, GeminiDB divides each file into different pages, and flushes and compresses data by page. Hash indexes are used to quickly locate the page where the data is stored based on keys. In this way, the number of I/O operations is reduced to improve read performance.
  • Optimizes the cache policy. GeminiDB caches important information in memory, such as newly inserted data and compressed data that includes hotspots. For traditional LSM-based storage, the data is first flushed to disks, and then cached after at least one read I/O operation is performed. This process reduces the rate of cache hits.
  • Optimizes the policy of flushing data to disks. GeminiDB flushes data to disks only after the cached data occupies all the memory space. Therefore, no data files are generated if the memory space is sufficient and data is compressed in a timely manner. For traditional LSM-based storage, data is flushed to disks for persistence. If Blink is used, this process is no longer required. Blink provides the checkpointing mechanism to ensure data consistency, and data can be persisted when checkpoints are created.
  • Supports in-memory compaction. The data records that reside in memory are relocated in a timely manner to maximize the available space. This allows you to optimize write amplification and reduce I/O read operations.
  • Eliminates the Java Native Interface (JNI) overhead of RocksDB or Niagara by using Java.
  • Supports incremental checkpointing.
  • Supports the local recovery feature, which enables quick recovery after a job fails.
  • Supports separation of computing from storage, which enables quick recovery after a job is restarted or rescaled. This feature is continuously optimized to improve user experience.
GeminiStateBackend configures jobs as follows:
  • Jobs created based on the DataStream API
    • API configuration
      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      GeminiStateBackend stateBackend = new GeminiStateBackend(checkpointDir);
      // Configuration for gemini
      Configuration config = new Configuration();
      config.setString("state.backend.gemini.heap.size", "1024mb");
      // Set configuration to backend
      // Use gemini as state backend
      env.setStateBackend(new GeminiStateBackend(checkpointDir));
    • Parameters
      Parameter Data type Unit Default value Description Long The unit is milliseconds. -1 (This value indicates that this feature is disabled by default.) Optional. The data retention period.
      state.backend.gemini.heap.size String The following units are supported:
      • 1024 bytes
      • 1024 KB
      • 1024 MB
      • 1024 GB
      N/A Optional. The memory size that can be used for a single GeminiDB database.
      Note We recommend that you specify this parameter. If you do not specify the parameter, the backend calculates the default value based on the Java Virtual Machine (JVM) and TaskManager configurations.
  • Jobs created based on SQL code
    # Use GeminiStateBackend as the backend.
    # Set the time to live (TTL) of the state data.
    # Set the memory size that can be used for a single GeminiDB database. The unit is MB. Note that the memory resources of operators must include the memory size that can be used for a single GeminiDB database. The default memory size is 512 MB.
    # Set the JVM parameters. The recommended settings are described as follows:
    blink.job.option=-yD'-XX:NewRatio=3 -XX:SurvivorRatio=3 -XX:ParallelGCThreads=8 -XX:+UnlockDiagnosticVMOptions -XX:ParGCCardsPerStrideChunk=4096 -XX:+UseCMSInitiatingOccupancyOnly -XX:CMSInitiatingOccupancyFraction=75 -Djdk.nio.maxCachedBufferSize=10240'

Major bug fixes

  • Fixes the bug that the Calc operator may encounter a null pointer exception (NPE) during code generation.
  • Fixes the bug that complete rows must be used for state storage.
  • Fixes a bug in the code splitting component. When converting local variables to member fields, JavaCodeSplitter does not process the local variables inside a "for each" control. This bug causes incorrect calculations when you use DISTINCT filtering.