All Products
Search
Document Center

Tablestore:Use TableStoreReader to concurrently read data

Last Updated:Jul 03, 2025

TableStoreReader is a simple, high-performance data reading utility class implemented by Tablestore based on the Java SDK. It encapsulates interfaces for high concurrency and high throughput data reading, enabling concurrent data reading while supporting row-level callbacks and custom configuration features. This topic describes how to use TableStoreReader for concurrent data reading.

Prerequisites

An AccessKey pair is created for your Alibaba Cloud account or a RAM user that is granted with permissions to access Tablestore.

Procedure

Step 1: Install Tablestore SDK for Java

If you use Maven to manage Java projects, you can add the following dependency to the pom.xml file:

<dependency>
    <groupId>com.aliyun.openservices</groupId>
    <artifactId>tablestore</artifactId>
    <version>5.17.4</version>
</dependency>                 

For more information, see Install Tablestore SDK for Java.

Step 2: Initialization

Before you initialize TableStoreReader, you must create a Tablestore client connection instance. You can customize the parameters and callback functions for TableStoreReader. The initialization sample code is shown below.

Note

When you use multiple threads, we recommend that the threads share one TableStoreReader object.

public static TableStoreReader createReader() {
        // Initialize a Tablestore client.
        client = new AsyncClient(endpoint, accessKeyId, accessKeySecret, instanceName);

        // TableStoreReader configuration
        TableStoreReaderConfig config = new TableStoreReaderConfig();

        // Thread pool
        executorService = new ThreadPoolExecutor(4, 4, 0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(1024), new ThreadPoolExecutor.CallerRunsPolicy());

        // Callback function
        TableStoreCallback<PrimaryKeyWithTable, RowReadResult> callback = new TableStoreCallback<PrimaryKeyWithTable, RowReadResult>() {
            @Override
            public void onCompleted(PrimaryKeyWithTable primaryKeyWithTable, RowReadResult rowReadResult) {
                succeedRows.incrementAndGet();
                System.out.println(rowReadResult.getRowResult());
            }

            @Override
            public void onFailed(PrimaryKeyWithTable primaryKeyWithTable, Exception e) {
                failedRows.incrementAndGet();
                System.out.println("Failed Rows: " + primaryKeyWithTable.getTableName() + " | " + primaryKeyWithTable.getPrimaryKey() + " | " + e.getMessage());
            }
        };

        return new DefaultTableStoreReader(client, config, executorService, callback);
    }

TableStoreReaderConfig parameters

Parameter

Type

Description

checkTableMeta

boolean

Specifies whether to enable schema checking. The default value is true. When enabled, TableStoreReader performs the following checks before data is written to the buffer.

  • Whether the data table exists.

  • Whether the primary key schema of the data that you want to query is the same as the table's primary key.

bufferSize

int

The size of the buffer queue. The size must be a power of 2. The default value is 1024.

concurrency

int

The maximum number of concurrent requests when sending buffer data to Tablestore. The default value is 10.

maxBatchRowsCount

int

The maximum number of rows to read in a batch request. The default value is 100. The maximum value is 100.

defaultMaxVersions

int

The number of data versions to read. The default value is 1, which means only the latest version of data is read.

flushInterval

int

The time interval for automatically sending buffer data to Tablestore. The default value is 10000. The unit is milliseconds.

logInterval

int

The time interval for printing task status when sending buffer data to Tablestore. The default value is 10000. The unit is milliseconds.

bucketCount

int

The number of buckets. Each bucket is equivalent to a buffer. The default value is 4.

The following sample code shows how to configure the parameters.

// Specify whether to enable schema checking.
config.setCheckTableMeta(false);
// Specify the buffer queue size.
config.setBufferSize(1024);
// Specify the concurrency.
config.setConcurrency(10);
// Specify the maximum number of rows for batch requests.
config.setMaxBatchRowsCount(100);
// Specify the number of data versions to read.
config.setDefaultMaxVersions(1);
// Specify the flush interval.
config.setFlushInterval(10000);
// Specify the log interval.
config.setLogInterval(10000);
// Specify the number of buckets.
config.setBucketCount(4);
  • If you do not need a callback function, you can set the callback function parameter to null in the initialization method.

    return new DefaultTableStoreReader(client, config, executorService, null);

Step 3: Query data

  1. Before you use TableStoreReader to query data, you need to add the primary key information of the row data to the buffer.

    PrimaryKey primaryKey = PrimaryKeyBuilder.createPrimaryKeyBuilder()
            .addPrimaryKeyColumn("id", PrimaryKeyValue.fromString("row1"))
            .build();
    tableStoreReader.addPrimaryKey("test_table", primaryKey);
    • If you need to retrieve the row data after the query, you can use the addPrimaryKeyWithFuture method.

      Future<ReaderResult> readerResult = tableStoreReader.addPrimaryKeyWithFuture("test_table", primaryKey);
    • You can also specify query parameters, such as maximum version number, data version range, and filters.

      RowQueryCriteria rowQueryCriteria = new RowQueryCriteria("test_version");
      // Specify the maximum number of versions to read.
      rowQueryCriteria.setMaxVersions(1);
      // Specify the data version range to read.
      rowQueryCriteria.setTimeRange(new TimeRange(System.currentTimeMillis() - 86400*1000, System.currentTimeMillis()));
      // Specify the attribute columns to return.
      rowQueryCriteria.addColumnsToGet("col1");
      // Specify filter conditions.
      SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter("col1", SingleColumnValueFilter.CompareOperator.EQUAL, ColumnValue.fromString("val1"));
      rowQueryCriteria.setFilter(singleColumnValueFilter);
      // Add query criteria.
      tableStoreReader.setRowQueryCriteria(rowQueryCriteria);
  2. After the primary key information is added to the buffer, TableStoreReader automatically sends the data in the buffer to Tablestore for querying according to the specified interval (default interval: 10 seconds). You can also manually send the buffer data.

    • Synchronous transmission

      tableStoreReader.flush();
    • Asynchronous transmission

      tableStoreReader.send();

Step 4: Disable resources

After the data query is complete, if no other operations are needed, you can disable resources without affecting the operation of the business system.

tableStoreReader.close();
client.shutdown();
executorService.shutdown();

Complete sample code

The following sample code uses TableStoreReader to concurrently query 200 rows of data in the test_table table and prints the query results in the callback function.

public class TableStoreReaderExample {
    private static final String endpoint = "https://n01k********.cn-hangzhou.ots.aliyuncs.com";
    private static final String instanceName = "n01k********";
    private static final String accessKeyId = System.getenv("TABLESTORE_ACCESS_KEY_ID");
    private static final String accessKeySecret = System.getenv("TABLESTORE_ACCESS_KEY_SECRET");
    private static AsyncClientInterface client;
    private static ExecutorService executorService;
    private static AtomicLong succeedRows = new AtomicLong();
    private static AtomicLong failedRows = new AtomicLong();

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        // Create TableStoreReader.
        TableStoreReader tableStoreReader = createReader();

        // Add primary keys for querying data.
        for(int i=0; i<200; i++) {
            PrimaryKey primaryKey = PrimaryKeyBuilder.createPrimaryKeyBuilder()
                    .addPrimaryKeyColumn("id", PrimaryKeyValue.fromString("row" + i))
                    .build();
            tableStoreReader.addPrimaryKey("test_table", primaryKey);
        }

        // Send data in the buffer.
        tableStoreReader.flush();

        // Wait for callback function to complete.
        Thread.sleep(1000L);

        System.out.println("Succeed Rows Count: " + succeedRows.get());
        System.out.println("Failed Rows Count: " + failedRows.get());

        // Disable resources.
        tableStoreReader.close();
        client.shutdown();
        executorService.shutdown();
    }

    public static TableStoreReader createReader() {
        // Initialize a Tablestore client.
        client = new AsyncClient(endpoint, accessKeyId, accessKeySecret, instanceName);

        // TableStoreReader parameter configuration
        TableStoreReaderConfig config = new TableStoreReaderConfig();

        // Thread pool
        executorService = new ThreadPoolExecutor(4, 4, 0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(1024), new ThreadPoolExecutor.CallerRunsPolicy());

        // Callback function
        TableStoreCallback<PrimaryKeyWithTable, RowReadResult> callback = new TableStoreCallback<PrimaryKeyWithTable, RowReadResult>() {
            @Override
            public void onCompleted(PrimaryKeyWithTable primaryKeyWithTable, RowReadResult rowReadResult) {
                succeedRows.incrementAndGet();
                System.out.println(rowReadResult.getRowResult());
            }

            @Override
            public void onFailed(PrimaryKeyWithTable primaryKeyWithTable, Exception e) {
                failedRows.incrementAndGet();
                System.out.println("Failed Rows: " + primaryKeyWithTable.getTableName() + " | " + primaryKeyWithTable.getPrimaryKey() + " | " + e.getMessage());
            }
        };

        return new DefaultTableStoreReader(client, config, executorService, callback);
    }
}