All Products
Search
Document Center

Implementation of stream computing

Last Updated: Dec 02, 2021

This topic describes how Spark Structured Streaming works in the micro-batch mode. This topic also describes how Tablestore interacts with Spark Structured Streaming.

Background information

Spark Data Source API V1 is used in the example to describe how Spark Structured Streaming works in the micro-batch mode.

  1. Call the GetOffset method to obtain the maximum offset (EndOffset) that can be read for the current batch.

  2. Call the GetBatch method to obtain the data between the start offset (the EndOffset value of the last batch) and the maximum offset of the current batch. Then, convert the data.

  3. Execute the custom computational logic of Spark.

In this case, the streaming operation of the upstream database must provide a flexible and precise Seek feature. Then, the streaming operation can obtain the start or end cursor of each partition in real time to estimate the offset for the micro-batch mode.

If the upstream database is a distributed NoSQL database, it is difficult to connect the database to the micro-batch operation of Spark Structured Streaming. This is because the database provides a Change Data Capture (CDC)-based streaming operation, which uses the continuous streaming mode and does not provide the Seek feature. If data is obtained in the GetOffset stage in advance, the desired EndOffset value can be obtained. However, additional Resilient Distributed Dataset (RDD)-based computing must be performed in advance and the obtained EndOffset value must be persisted to the cache. This degrades the performance of Source.

Interaction between Tablestore and Spark Structured Streaming

The following Unified Modeling Language (UML) sequence diagram shows how Tablestore interacts with Spark Structured Streaming.

fig_newuml

In the figure, MicroBatchExecutor serves as the micro-batch framework of Spark. Source serves as the abstract operation class of Spark Structured Streaming. SourceRDD serves as the RDD abstract class of Spark. TablestoreClient serves as the client of Tablestore. Solid lines indicate operations. Dashed lines indicate success responses.

The overall process is implemented by the GetOffset, Commit, and GetBatch steps by using loops. Each loop corresponds to a batch operation of Spark Streaming.

Detailed procedure:

  1. Call the GetOffset method to obtain the maximum offset (EndOffset) that can be read for the current batch.

    1. Call the GetOffset method of Source by using MicroBatchExecutor to obtain the maximum offset (EndOffset) that can be read for the current batch.

    2. Generate a random UUID string in Source. Each UUID corresponds to an offset.

    3. (Optional) Obtain the tunnel information from Tablestore.

      Note

      This step is required only when the GetOffset method of Source is called for the first time. The system skips this step in the subsequent operations.

      1. Obtain the consumption checkpoints of all current channels from the Tablestore Tunnel Service server.

      2. Persist the checkpoints to the Meta table and establish the mappings between the UUID and checkpoints.

    4. Encapsulate the UUID into an offset (you can convert between the offset and UUID). Then, the offset is used as the EndOffset value of the current batch and returned to MicroBatchExecutor.

  2. (Optional) Persist the checkpoints to the Tunnel Service server.

    Note

    This step is only performed when the ID of the current batch is greater than 0. In other cases, the system skips this step.

    1. Call the Commit logic of Source by using MicroBatchExecutor.

    2. Persist the checkpoints that correspond to the EndOffset value of the last batch (start offset of the current batch) to the Tunnel Service server.

    Note

    In normal cases, the Commit logic is not required to process additional content. The checkpoints are persisted to the Tunnel Service server for the following purposes:

    • Display the consumption progress in real time.

    • Ensure that data is sequentially distributed based on the parent-child relationship. Child channels can only be loaded after the client returns the checkpoints when parent channels finished data consumption to the server.

  3. Call the GetBatch method to obtain the data between the start offset (the EndOffset value of the last batch) and the maximum offset of the current batch. Then, convert the data.

    1. Call the GetBatch method of Source by using MicroBatchExecutor based on the start offset of the current batch and the EndOffset value returned by using the GetOffset method. The GetBatch method is called to obtain the data of the current batch, which is used for the computational logic.

    2. Obtain the real-time checkpoints of the channels in tunnels from the Meta table based on the UUID to which the start offset corresponds.

    3. Obtain the checkpoints of channels from the Tunnel Service server on a regular basis.

      New partitions such as subpartitions may be generated due to changes to partitions of a table. Therefore, the checkpoints of channels must be obtained regularly.

    4. Obtain the checkpoints of all channels by merging the real-time checkpoints obtained from the Meta table and the checkpoints that are regularly obtained from the Tunnel Service server.

    5. Create SourceRDD based on information such as the latest checkpoints and the UUID to which the EndOffset value corresponds.

      RDD is the fundamental data abstraction of Spark. An RDD is an immutable and partitionable collection of objects that can be used for parallel computing.

    6. Bind the channels of tunnels to RDD partitions in SourceRDD. This way, each channel converts and processes data in parallel on the Spark executor node.

  4. Execute the custom computational logic of Spark.

    1. Execute the computational logic on each RDD partition. Data is read from the channels and the checkpoints in the memory are updated for each channel.

    2. After each RDD partition completes the task for the current batch, such as reading the specified number of entries or detecting that no new data is found, persist the latest checkpoints to the row with the UUID that corresponds to the EndOffset value in the Meta table. After a batch is completed, the checkpoints that correspond to the start offset of the next batch can be found in the Meta table.

    3. After the computational logic is executed for all RDD partitions, return the data within the offset range of the current batch to MicroBatchExecutor.

  5. Increase the batch ID after the computational logic is executed for all RDD partitions in the current batch. The preceding steps are performed from Step 1 in a continuous loop.