TableStoreReader is a simple, high-performance data reading utility class implemented by Tablestore based on the Java SDK. It encapsulates interfaces for high concurrency and high throughput data reading, enabling concurrent data reading while supporting row-level callbacks and custom configuration features. This topic describes how to use TableStoreReader for concurrent data reading.
Prerequisites
An AccessKey pair is created for your Alibaba Cloud account or a RAM user that is granted with permissions to access Tablestore.
Procedure
Step 1: Install Tablestore SDK for Java
If you use Maven to manage Java projects, you can add the following dependency to the pom.xml file:
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>tablestore</artifactId>
<version>5.17.4</version>
</dependency> For more information, see Install Tablestore SDK for Java.
Step 2: Initialization
Before you initialize TableStoreReader, you must create a Tablestore client connection instance. You can customize the parameters and callback functions for TableStoreReader. The initialization sample code is shown below.
When you use multiple threads, we recommend that the threads share one TableStoreReader object.
public static TableStoreReader createReader() {
// Initialize a Tablestore client.
client = new AsyncClient(endpoint, accessKeyId, accessKeySecret, instanceName);
// TableStoreReader configuration
TableStoreReaderConfig config = new TableStoreReaderConfig();
// Thread pool
executorService = new ThreadPoolExecutor(4, 4, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(1024), new ThreadPoolExecutor.CallerRunsPolicy());
// Callback function
TableStoreCallback<PrimaryKeyWithTable, RowReadResult> callback = new TableStoreCallback<PrimaryKeyWithTable, RowReadResult>() {
@Override
public void onCompleted(PrimaryKeyWithTable primaryKeyWithTable, RowReadResult rowReadResult) {
succeedRows.incrementAndGet();
System.out.println(rowReadResult.getRowResult());
}
@Override
public void onFailed(PrimaryKeyWithTable primaryKeyWithTable, Exception e) {
failedRows.incrementAndGet();
System.out.println("Failed Rows: " + primaryKeyWithTable.getTableName() + " | " + primaryKeyWithTable.getPrimaryKey() + " | " + e.getMessage());
}
};
return new DefaultTableStoreReader(client, config, executorService, callback);
}If you do not need a callback function, you can set the callback function parameter to null in the initialization method.
return new DefaultTableStoreReader(client, config, executorService, null);
Step 3: Query data
Before you use TableStoreReader to query data, you need to add the primary key information of the row data to the buffer.
PrimaryKey primaryKey = PrimaryKeyBuilder.createPrimaryKeyBuilder() .addPrimaryKeyColumn("id", PrimaryKeyValue.fromString("row1")) .build(); tableStoreReader.addPrimaryKey("test_table", primaryKey);If you need to retrieve the row data after the query, you can use the
addPrimaryKeyWithFuturemethod.Future<ReaderResult> readerResult = tableStoreReader.addPrimaryKeyWithFuture("test_table", primaryKey);You can also specify query parameters, such as maximum version number, data version range, and filters.
RowQueryCriteria rowQueryCriteria = new RowQueryCriteria("test_version"); // Specify the maximum number of versions to read. rowQueryCriteria.setMaxVersions(1); // Specify the data version range to read. rowQueryCriteria.setTimeRange(new TimeRange(System.currentTimeMillis() - 86400*1000, System.currentTimeMillis())); // Specify the attribute columns to return. rowQueryCriteria.addColumnsToGet("col1"); // Specify filter conditions. SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter("col1", SingleColumnValueFilter.CompareOperator.EQUAL, ColumnValue.fromString("val1")); rowQueryCriteria.setFilter(singleColumnValueFilter); // Add query criteria. tableStoreReader.setRowQueryCriteria(rowQueryCriteria);
After the primary key information is added to the buffer, TableStoreReader automatically sends the data in the buffer to Tablestore for querying according to the specified interval (default interval: 10 seconds). You can also manually send the buffer data.
Synchronous transmission
tableStoreReader.flush();Asynchronous transmission
tableStoreReader.send();
Step 4: Disable resources
After the data query is complete, if no other operations are needed, you can disable resources without affecting the operation of the business system.
tableStoreReader.close();
client.shutdown();
executorService.shutdown();Complete sample code
The following sample code uses TableStoreReader to concurrently query 200 rows of data in the test_table table and prints the query results in the callback function.
public class TableStoreReaderExample {
private static final String endpoint = "https://n01k********.cn-hangzhou.ots.aliyuncs.com";
private static final String instanceName = "n01k********";
private static final String accessKeyId = System.getenv("TABLESTORE_ACCESS_KEY_ID");
private static final String accessKeySecret = System.getenv("TABLESTORE_ACCESS_KEY_SECRET");
private static AsyncClientInterface client;
private static ExecutorService executorService;
private static AtomicLong succeedRows = new AtomicLong();
private static AtomicLong failedRows = new AtomicLong();
public static void main(String[] args) throws InterruptedException, ExecutionException {
// Create TableStoreReader.
TableStoreReader tableStoreReader = createReader();
// Add primary keys for querying data.
for(int i=0; i<200; i++) {
PrimaryKey primaryKey = PrimaryKeyBuilder.createPrimaryKeyBuilder()
.addPrimaryKeyColumn("id", PrimaryKeyValue.fromString("row" + i))
.build();
tableStoreReader.addPrimaryKey("test_table", primaryKey);
}
// Send data in the buffer.
tableStoreReader.flush();
// Wait for callback function to complete.
Thread.sleep(1000L);
System.out.println("Succeed Rows Count: " + succeedRows.get());
System.out.println("Failed Rows Count: " + failedRows.get());
// Disable resources.
tableStoreReader.close();
client.shutdown();
executorService.shutdown();
}
public static TableStoreReader createReader() {
// Initialize a Tablestore client.
client = new AsyncClient(endpoint, accessKeyId, accessKeySecret, instanceName);
// TableStoreReader parameter configuration
TableStoreReaderConfig config = new TableStoreReaderConfig();
// Thread pool
executorService = new ThreadPoolExecutor(4, 4, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(1024), new ThreadPoolExecutor.CallerRunsPolicy());
// Callback function
TableStoreCallback<PrimaryKeyWithTable, RowReadResult> callback = new TableStoreCallback<PrimaryKeyWithTable, RowReadResult>() {
@Override
public void onCompleted(PrimaryKeyWithTable primaryKeyWithTable, RowReadResult rowReadResult) {
succeedRows.incrementAndGet();
System.out.println(rowReadResult.getRowResult());
}
@Override
public void onFailed(PrimaryKeyWithTable primaryKeyWithTable, Exception e) {
failedRows.incrementAndGet();
System.out.println("Failed Rows: " + primaryKeyWithTable.getTableName() + " | " + primaryKeyWithTable.getPrimaryKey() + " | " + e.getMessage());
}
};
return new DefaultTableStoreReader(client, config, executorService, callback);
}
}