All Products
Search
Document Center

Tablestore:Stream processing

Last Updated:May 20, 2025

This topic describes how to perform stream processing on Tablestore data by using DataFrame when you access Tablestore with a Spark compute engine. This topic also explains how to run and debug the code in both local and cluster environments.

Prerequisites

  • A data table is created in Tablestore, data is written to the data table, and a tunnel is created for the data table. For more information, see Get started with the Wide Column model and Get started with Tunnel Service.

    Note

    For the schema and sample data of the order_source_stream_view data table, see Appendix: Sample data table.

  • An AccessKey pair is created for your Alibaba Cloud account or a Resource Access Management (RAM) user that has permissions to access Tablestore. For more information, see Create an AccessKey pair.

  • A Java development environment is deployed.

    In this topic, the Windows environment, JDK 1.8, IntelliJ IDEA 2024.1.2 (Community Edition), and Apache Maven are used as examples.

Procedure

Step 1: Download the project source code

Download the sample project by using Git.

git clone https://github.com/aliyun/tablestore-examples.git

If you cannot download the project because of network issues, you can directly download tablestore-examples-master.zip.

Step 2: Update Maven dependencies

  1. Go to the tablestore-spark-demo root directory.

    Note

    We recommend that you read the README.md document in the tablestore-spark-demo root directory to fully understand the project information.

  2. Run the following command to install emr-tablestore-2.2.0-SNAPSHOT.jar to the local Maven repository.

    mvn install:install-file -Dfile="libs/emr-tablestore-2.2.0-SNAPSHOT.jar" -DartifactId=emr-tablestore -DgroupId="com.aliyun.emr" -Dversion="2.2.0-SNAPSHOT" -Dpackaging=jar -DgeneratePom=true

Step 3: (Optional) Modify the sample code

Core code modification description

This section uses StructuredTableStoreAggSQLSample as an example to explain the core parts of the sample code.

Code block

Description

val ordersDF = sparkSession.readStream
  .format("tablestore")
  .option("instance.name", instanceName)
  .option("table.name", tableName)
  .option("tunnel.id", tunnelId)
  .option("endpoint", endpoint)
  .option("access.key.id", accessKeyId)
  .option("access.key.secret", accessKeySecret)
  .option("maxoffsetsperchannel", maxOffsetsPerChannel) // default 10000
  .option("catalog", dataCatalog)
  .load()
  .createTempView("order_source_stream_view")

Reads streaming data from Tablestore by using Spark Structured Streaming, loads the streaming data as a streaming DataFrame, and registers the streaming DataFrame as a temporary view order_source_stream_view.

  • format("tablestore") specifies that the Spark Tablestore connector is loaded by using the ServiceLoader method. For specific configurations, see the META-INF.services directory.

  • instanceName, tableName, tunnelId, endpoint, accessKeyId, and accessKeySecret specify the Tablestore instance name, data table name, tunnel ID, instance endpoint, AccessKey ID of the Alibaba Cloud account or RAM user, and AccessKey secret of the Alibaba Cloud account or RAM user, respectively.

  • catalog is a JSON string used to describe the schema information in Tablestore.

  • maxoffsetsperchannel specifies the maximum amount of data read from each channel in each mini-batch. The default value is 10000.

val dataCatalog: String =
  s"""
     |{"columns": {
     |    "UserId": {"type":"string"},
     |    "OrderId": {"type":"string"},
     |    "price": {"type":"double"},
     |    "timestamp": {"type":"long"}
     | }
     |}""".stripMargin

Defines a JSON string dataCatalog to describe the schema information in Tablestore. The name and data type of each field are specified by using a key-value pair.

val aggDF = sparkSession.sql(
  "SELECT CAST(window.start AS String) AS begin, CAST(window.end AS String) AS end, count(*) AS count, " +
    "CAST(sum(price) AS Double) AS totalPrice FROM order_source_stream_view " +
    "GROUP BY window(to_timestamp(timestamp / 1000), '30 seconds')")
    
val query = aggDF.writeStream
  .outputMode("complete")
  .format("console")
  .option("truncate", value = false)
  .option("checkpointLocation", checkpointLocation)
  .option("triggerInterval", 10000) // custom
  .start()

Aggregates and analyzes streaming data based on a specific time window by using Spark SQL and displays the results in the Tablestore console.

  • SQL query

    • Time window: Divides data based on a 30-second time windows specified by the timestamp field.

    • Aggregate operation: Counts the total number of records in each window and calculates the sum of the values in the price field in each window.

  • Configure the output method for streaming query

    • outputMode("complete") specifies that the output mode is complete, which means that the complete aggregation result is displayed each time the query is triggered.

    • format("console") specifies that the results are displayed in the Tablestore console.

    • option("truncate", value = false) specifies that field truncation is disabled to ensure that full field values are displayed.

    • option("checkpointLocation", checkpointLocation) specifies that the checkpoint directory is configured for fault tolerance and recovery.

    • option("triggerInterval", 10000) specifies that the trigger interval is set to 10 seconds, which means that the aggregation operation is performed every 10 seconds.

    • start() specifies that streaming query is started.

Step 4: Run and debug the code

You can run and debug the code locally or in a Spark cluster. This section uses StructuredTableStoreAggSQLSample as an example to describe the debugging process.

Local development environment

This section uses Windows operating system with IntelliJ IDEA as an example to describe how to debug the code.

  1. Install the Scala plugin.

    By default, IntelliJ IDEA does not support Scala. You need to manually install the Scala plugin.

  2. Install winutils.exe (winutils 3.3.6 is used in this topic).

    When you run Spark in a Windows environment, you also need to install winutils.exe to resolve compatibility issues. You can download winutils.exe from the GitHub project homepage.

  3. Right-click the Scala program TableStoreBatchSample, select Modify Run Configuration, and open the Edit Run Configuration dialog box.

    Note

    The actual operations slightly vary based on the operating systems and IntelliJ IDEA versions.

    1. In the Program arguments field, specify the instance name, data table name, tunnel ID, AccessKey ID, AccessKey secret, instance endpoint, and MaxOffsetsPerChannel in sequence.

      myinstance order_source_stream_view 8f6a****-****-****-****-************ LTAI********************** DByT************************** https://myinstance.cn-hangzhou.ots.aliyuncs.com 10000
    2. Click Modify options, select Add dependencies with "provided" scope to classpath, and click OK.

      2025-05-13_150824

  4. Run the Scala program.

    After you run the Scala program, the results will be printed to the Tablestore console.

    +-------------------+-------------------+-----+-----------------+
    |begin              |end                |count|totalPrice       |
    +-------------------+-------------------+-----+-----------------+
    |2025-04-16 11:13:30|2025-04-16 11:14:00|1    |2547.0           |
    |2025-04-16 11:13:00|2025-04-16 11:13:30|3    |984.1999999999999|
    |2025-04-16 11:12:30|2025-04-16 11:13:00|1    |29.6             |
    +-------------------+-------------------+-----+-----------------+

Spark cluster environment

Important

Before you perform debugging, make sure you have deployed a Spark cluster and the Spark version in the cluster environment is consistent with the Spark version of the sample project. Otherwise, version incompatibility might cause runtime errors.

This section uses the spark-submit method as an example. The master in the sample code is set to local[*] by default. When you run the code on a Spark cluster, you can remove this setting and use the spark-submit parameter to specify the master.

  1. Run the mvn -U clean package command to package the project. The path of the JAR package is target/tablestore-spark-demo-1.0-SNAPSHOT-jar-with-dependencies.jar.

  2. Upload the JAR package to the Driver node of the Spark cluster and submit the task by using spark-submit.

    spark-submit --class com.aliyun.tablestore.spark.demo.streaming.StructuredTableStoreAggSQLSample --master yarn tablestore-spark-demo-1.0-SNAPSHOT-jar-with-dependencies.jar myinstance order_source_stream_view 8f6a****-****-****-****-************ LTAI********************** DByT************************** https://myinstance.cn-hangzhou.ots.aliyuncs.com 10000

    fig_dataframe_streaming_001

Appendix: Sample data table

The following tables show the schema and sample data of the order_source_stream_view table.

Sample table structure

Field name

Type

Description

pk

long

The primary key column.

UserId

string

The user ID.

OrderId

string

The order ID.

price

double

The order amount.

timestamp

long

The timestamp.

Sample data

pk (Primary key column)

UserId

OrderId

price

timestamp

1

user_A

00002664-9d8b-441b-bad7-845202f3b142

29.6

1744773175362

2

user_A

9d8b7a6c-5e4f-4321-8765-0a9b8c7d6e5f

785.3

1744773190240

3

user_A

c3d4e5f6-7a8b-4901-8c9d-0a1b2c3d4e5f

187

1744773195579

4

user_B

f1e2d3c4-b5a6-4789-90ab-123cdef45678

11.9

1744773203345

5

user_B

e2f3a4b5-c6d7-4890-9abc-def012345678

2547

1744773227789