All Products
Search
Document Center

Tablestore:Batch computing

Last Updated:May 20, 2025

This topic describes how to perform batch computing on Tablestore data by using DataFrame when you access Tablestore with a Spark compute engine. This topic also explains how to run and debug the code in both local and cluster environments.

Prerequisites

  • A data table is created in Tablestore and data is written to the data table. For more information, see Get started with the Wide Column model.

    Note

    For the schema and sample data of the search_view table, see Appendix: Sample data table.

  • An AccessKey pair is created for your Alibaba Cloud account or a Resource Access Management (RAM) user that has permissions to access Tablestore. For more information, see Create an AccessKey pair.

  • A Java development environment is deployed.

    In this topic, the Windows environment, JDK 1.8, IntelliJ IDEA 2024.1.2 (Community Edition), and Apache Maven are used as examples.

Procedure

Step 1: Download the project source code

Download the sample project by using Git.

git clone https://github.com/aliyun/tablestore-examples.git

If you cannot download the project because of network issues, you can directly download tablestore-examples-master.zip.

Step 2: Update Maven dependencies

  1. Go to the tablestore-spark-demo root directory.

    Note

    We recommend that you read the README.md document in the tablestore-spark-demo root directory to fully understand the project information.

  2. Run the following command to install emr-tablestore-2.2.0-SNAPSHOT.jar to the local Maven repository.

    mvn install:install-file -Dfile="libs/emr-tablestore-2.2.0-SNAPSHOT.jar" -DartifactId=emr-tablestore -DgroupId="com.aliyun.emr" -Dversion="2.2.0-SNAPSHOT" -Dpackaging=jar -DgeneratePom=true

Step 3: (Optional) Modify the sample code

Core code modification description

This section uses TableStoreBatchSample as an example to explain the core parts of the sample code.

Code block

Description

val df = sparkSession.read
  .format("tablestore")
  .option("instance.name", instanceName)
  .option("table.name", tableName)
  .option("endpoint", endpoint)
  .option("access.key.id", accessKeyId)
  .option("access.key.secret", accessKeySecret)
  .option("split.size.mbs", 100)
  // .option("catalog", dataCatalog)
  // The latest version allows you to use the .schema() method to replace the catalog configurations.
  .schema("salt LONG, UserId STRING, OrderId STRING, price DOUBLE, timestamp LONG")
  .load()

Reads data from Tablestore by using Spark's DataFrameReader interface and loads the data as a DataFrame object.

  • format("tablestore") specifies that the Spark Tablestore connector is loaded by using the ServiceLoader method. For specific configurations, see the META-INF.services directory.

  • instanceName, tableName, endpoint, accessKeyId, and accessKeySecret specify the Tablestore instance name, data table name, instance endpoint, AccessKey ID of the Alibaba Cloud account or RAM user, and AccessKey secret of the Alibaba Cloud account or RAM user, respectively.

  • split.size.mbs specifies the size of each split in MB. Default value: 100. The smaller the value of this parameter, the more splits will be generated, corresponding to more Spark tasks.

  • .schema("salt LONG, UserId STRING, OrderId STRING, price DOUBLE, timestamp LONG") specifies the schema of the data table. The field names and their data types are specified in the schema. The sample data table has five fields: salt (Long type), UserId (String type), OrderId (String type), price (Double type), and timestamp (Long type).

    Note

    The latest version allows you to use the .schema() method to replace the catalog configurations. Select a version based on your business requirements.

val dataCatalog: String =
  s"""
     |{"columns": {
     |    "salt": {"type":"long"},
     |    "UserId": {"type":"string"},
     |    "OrderId": {"type":"string"},
     |    "price": {"type":"double"},
     |    "timestamp": {"type":"long"}
     | }
     |}""".stripMargin

Defines a JSON string dataCatalog to describe the schema information in Tablestore. The name and data type of each field are specified by using a key-value pair.

df.filter("salt = 1 AND UserId = 'user_A'").show(20, truncate = false)

Performs a filter operation on DataFrame to select data that meets the conditions salt = 1 AND UserId = 'user_A', and displays the first 20 records.

df.createTempView("search_view")
val searchDF = sparkSession.sql("SELECT COUNT(*) FROM search_view WHERE salt = 1 AND UserId = 'user_A'")
searchDF.show()
val searchDF2 = sparkSession.sql("SELECT COUNT(*) FROM search_view WHERE salt = 1 AND UserId = 'user_A'" +
  " AND OrderId = '00002664-9d8b-441b-bad7-845202f3b142'")
searchDF2.show()
val searchDF3 = sparkSession.sql("SELECT COUNT(*) FROM search_view WHERE salt = 1 AND UserId >= 'user_A' AND UserId < 'user_B'")
searchDF3.show()

Registers DataFrame as a temporary view search_view and executes multiple aggregate queries through Spark SQL to count records that meet different conditions.

  • SQL query 1: Counts the total number of records that meet the conditions salt = 1 AND UserId = 'user_A'.

  • SQL query 2: Counts the total number of records that meet the conditions salt = 1 AND UserId = 'user_A' AND OrderId = '00002664-9d8b-441b-bad7-845202f3b142'.

  • SQL query 3: Counts the total number of records that meet the conditions salt = 1 AND UserId >= 'user_A' AND UserId < 'user_B''.

Step 4: Run and debug the code

You can run and debug the code locally or in a Spark cluster. This section uses TableStoreBatchSample as an example to describe the debugging process.

Local development environment

This section uses Windows operating system with IntelliJ IDEA as an example to describe how to debug the code.

  1. Install the Scala plugin.

    By default, IntelliJ IDEA does not support Scala. You need to manually install the Scala plugin.

  2. Install winutils.exe (winutils 3.3.6 is used in this topic).

    When you run Spark in a Windows environment, you also need to install winutils.exe to solve compatibility issues. You can download winutils.exe from the GitHub project homepage.

  3. Right-click the Scala program TableStoreBatchSample, select Modify Run Configuration, and open the Edit Run Configuration dialog box.

    Note

    The actual operations slightly vary based on the operating systems and IntelliJ IDEA versions.

    1. In the Program arguments field, specify the instance name, data table name, AccessKey ID, AccessKey secret, and instance endpoint in sequence.

      myinstance search_view LTAI********************** DByT************************** https://myinstance.cn-hangzhou.ots.aliyuncs.com
    2. Click Modify options, select Add dependencies with "provided" scope to classpath, and click OK.

      2025-05-13_145645

  4. Run the Scala program.

    After you run the Scala program, the results will be printed to the Tablestore console.

    With DataFrame
    +----+------+------------------------------------+-----+-------------+
    |salt|UserId|OrderId                             |price|timestamp    |
    +----+------+------------------------------------+-----+-------------+
    |1   |user_A|00002664-9d8b-441b-bad7-845202f3b142|29.6 |1744773183629|
    |1   |user_A|9d8b7a6c-5e4f-4321-8765-0a9b8c7d6e5f|785.3|1744773190240|
    +----+------+------------------------------------+-----+-------------+
    
    With Spark SQL
    +--------+
    |count(1)|
    +--------+
    |       2|
    +--------+
    
    +--------+
    |count(1)|
    +--------+
    |       1|
    +--------+
    
    +--------+
    |count(1)|
    +--------+
    |       2|
    +--------+

Spark cluster environment

Important

Before you perform debugging, make sure you have deployed a Spark cluster and the Spark version in the cluster environment is consistent with the Spark version of the sample project. Otherwise, version incompatibility might cause runtime errors.

This section uses the spark-submit method as an example. The master in the sample code is set to local[*] by default. When you run the code on a Spark cluster, you can remove this setting and use the spark-submit parameter to specify the master.

  1. Run the mvn -U clean package command to package the project. The path of the JAR package is target/tablestore-spark-demo-1.0-SNAPSHOT-jar-with-dependencies.jar.

  2. Upload the JAR package to the Driver node of the Spark cluster and submit the task by using spark-submit.

    spark-submit --class com.aliyun.tablestore.spark.demo.batch.TableStoreBatchSample --master yarn tablestore-spark-demo-1.0-SNAPSHOT-jar-with-dependencies.jar myinstance search_view LTAI********************** DByT************************** https://myinstance.cn-hangzhou.ots.aliyuncs.com 

    fig_batch_dataframe001

Appendix: Sample data table

The following tables show the schema and sample data of the search_view table.

Sample table schema

Field name

Type

Description

pk

long

The primary key column.

salt

long

The random salt value.

UserId

string

The user ID.

OrderId

string

The order ID.

price

double

The order amount.

timestamp

long

The timestamp.

Sample data

pk (Primary key column)

salt

UserId

OrderId

price

timestamp

1

1

user_A

00002664-9d8b-441b-bad7-845202f3b142

29.6

1744773183629

2

1

user_A

9d8b7a6c-5e4f-4321-8765-0a9b8c7d6e5f

785.3

1744773190240

3

2

user_A

c3d4e5f6-7a8b-4901-8c9d-0a1b2c3d4e5f

187

1744773195579

4

3

user_B

f1e2d3c4-b5a6-4789-90ab-123cdef45678

11.9

1744773203345

5

4

user_B

e2f3a4b5-c6d7-4890-9abc-def012345678

2547

1744773207789