All Products
Search
Document Center

Tablestore:Parallel scan

Last Updated:Oct 30, 2023

If you do not have requirements on the order of query results, you can use parallel scan to quickly obtain query results.

Important

Tablestore SDK for Java V5.6.0 or later supports the parallel scan feature. Before you use the parallel scan feature, make sure that you obtain the correct version of Tablestore SDK for Java. For more information about the version history of Tablestore SDK for Java, see Version history of Tablestore SDK for Java.

Prerequisites

  • The OTSClient is initialized. For more information, see Initialization.
  • A data table is created. Data is written to the table.
  • A search index is created for the data table. For more information, see Create search indexes.

Parameters

ParameterDescription
tableNameThe name of the data table.
indexNameThe name of the search index.
scanQueryqueryThe query statement for the search index. The operation supports term query, fuzzy query, range query, geo query, and nested query, which are similar to those of the Search operation.
limitThe maximum number of rows that can be returned by each ParallelScan call.
maxParallelThe maximum number of parallel scan tasks per request. The maximum number of parallel scan tasks per request varies based on the data volume. A larger volume of data requires more parallel scan tasks per request. You can use the ComputeSplits operation to query the maximum number of parallel scan tasks per request.
currentParallelIdThe ID of the parallel scan task in the request. Valid values: [0, Value of maxParallel)
tokenThe token that is used to paginate query results. The results of the ParallelScan request contain the token for the next page. You can use the token to retrieve the next page.
aliveTimeThe validity period of the current parallel scan task. This validity period is also the validity period of the token. Unit: seconds. Default value: 60. We recommend that you use the default value. If the next request is not initiated within the validity period, more data cannot be queried. The validity time of the token is refreshed each time you send a request.
Note The server uses the asynchronous method to process expired tasks. The current task does not expire within the validity period. However, Tablestore does not guarantee that the task expires after the validity period.
columnsToGetYou can use parallel scan to scan data only in search indexes. To use parallel scan for a search index, you must set store to true when you create the search index.
sessionIdThe session ID of the parallel scan task. You can call the ComputeSplits operation to create a session and query the maximum number of parallel scan tasks that are supported by the parallel scan request.

Examples

The following code provides examples on how to scan data by using a single thread or by using multiple threads at the same time:

  • Scan data by using a single thread

    When you use parallel scan, the code for a request that uses a single thread is simpler than the code for a request that uses multiple threads. The currentParallelId and maxParallel parameters are not required for a request that uses a single thread. The ParallelScan request that uses a single thread provides higher throughput than the Search request. However, the ParallelScan request that uses a single thread provides lower throughput than the ParallelScan request that uses multiple threads. For more information about how to scan data by using multiple threads at the same time, see the "Scan data by using multiple threads" section.

    import java.util.ArrayList;
    import java.util.Arrays;
    import java.util.List;
    
    import com.alicloud.openservices.tablestore.SyncClient;
    import com.alicloud.openservices.tablestore.model.ComputeSplitsRequest;
    import com.alicloud.openservices.tablestore.model.ComputeSplitsResponse;
    import com.alicloud.openservices.tablestore.model.Row;
    import com.alicloud.openservices.tablestore.model.SearchIndexSplitsOptions;
    import com.alicloud.openservices.tablestore.model.iterator.RowIterator;
    import com.alicloud.openservices.tablestore.model.search.ParallelScanRequest;
    import com.alicloud.openservices.tablestore.model.search.ParallelScanResponse;
    import com.alicloud.openservices.tablestore.model.search.ScanQuery;
    import com.alicloud.openservices.tablestore.model.search.SearchRequest.ColumnsToGet;
    import com.alicloud.openservices.tablestore.model.search.query.MatchAllQuery;
    import com.alicloud.openservices.tablestore.model.search.query.Query;
    import com.alicloud.openservices.tablestore.model.search.query.QueryBuilders;
    
    public class Test {
    
        public static List<Row> scanQuery(final SyncClient client) {
            String tableName = "<TableName>";
            String indexName = "<IndexName>";
            // Query the session ID and the maximum number of parallel scan tasks supported by the request. 
            ComputeSplitsRequest computeSplitsRequest = new ComputeSplitsRequest();
            computeSplitsRequest.setTableName(tableName);
            computeSplitsRequest.setSplitsOptions(new SearchIndexSplitsOptions(indexName));
            ComputeSplitsResponse computeSplitsResponse = client.computeSplits(computeSplitsRequest);
            byte[] sessionId = computeSplitsResponse.getSessionId();
            int splitsSize = computeSplitsResponse.getSplitsSize();
            /*
             * Create a parallel scan request. 
             */
            ParallelScanRequest parallelScanRequest = new ParallelScanRequest();
            parallelScanRequest.setTableName(tableName);
            parallelScanRequest.setIndexName(indexName);
            ScanQuery scanQuery = new ScanQuery();
            // This query determines the range of the data to scan. You can create a nested and complex query. 
            Query query = new MatchAllQuery();
            scanQuery.setQuery(query);
    
            // Specify the maximum number of rows that can be returned by each ParallelScan call. 
            scanQuery.setLimit(2000);
            parallelScanRequest.setScanQuery(scanQuery);
            ColumnsToGet columnsToGet = new ColumnsToGet();
            columnsToGet.setColumns(Arrays.asList("col_1", "col_2"));
            parallelScanRequest.setColumnsToGet(columnsToGet);
            parallelScanRequest.setSessionId(sessionId);
    
            /*
             * Use builder to create a parallel scan request that has the same features as the preceding request. 
             */
            ParallelScanRequest parallelScanRequestByBuilder = ParallelScanRequest.newBuilder()
                .tableName(tableName)
                .indexName(indexName)
                .scanQuery(ScanQuery.newBuilder()
                    .query(QueryBuilders.matchAll())
                    .limit(2000)
                    .build())
                .addColumnsToGet("col_1", "col_2")
                .sessionId(sessionId)
                .build();
            List<Row> result = new ArrayList<>();
    
            /*
             * Use the native API operation to scan data. 
             */
            {
                ParallelScanResponse parallelScanResponse = client.parallelScan(parallelScanRequest);
                // Query the token of ScanQuery for the next request. 
                byte[] nextToken = parallelScanResponse.getNextToken();
                // Obtain the data. 
                List<Row> rows = parallelScanResponse.getRows();
                result.addAll(rows);
                while (nextToken != null) {
                    // Specify the token. 
                    parallelScanRequest.getScanQuery().setToken(nextToken);
                    // Continue to scan the data. 
                    parallelScanResponse = client.parallelScan(parallelScanRequest);
                    // Obtain the data. 
                    rows = parallelScanResponse.getRows();
                    result.addAll(rows);
                    nextToken = parallelScanResponse.getNextToken();
                }
            }
    
            /*
             * Recommended method. 
             * Use an iterator to scan all matched data. This method has the same query speed but is easier to use compared with the previous method. 
             */
            {
                RowIterator iterator = client.createParallelScanIterator(parallelScanRequestByBuilder);
                while (iterator.hasNext()) {
                    Row row = iterator.next();
                    result.add(row);
                    // Obtain the specific values. 
                    String col_1 = row.getLatestColumn("col_1").getValue().asString();
                    long col_2 = row.getLatestColumn("col_2").getValue().asLong();
                }
            }
    
            /*
             * If the operation fails, retry the operation. If the caller of this function has a retry mechanism or if you do not want to retry the failed operation, you can ignore this part. 
             * To ensure availability, we recommend that you start a new parallel scan task when exceptions occur. 
             * The following exceptions may occur when you send a ParallelScan request:
             * 1. A session exception occurs on the server side. The error code is OTSSessionExpired. 
             * 2. An exception such as a network exception occurs on the client side. 
             */
            try {
                // Execute the processing logic. 
                {
                    RowIterator iterator = client.createParallelScanIterator(parallelScanRequestByBuilder);
                    while (iterator.hasNext()) {
                        Row row = iterator.next();
                        // Process rows of data. If you have sufficient memory resources, you can add the rows to a list. 
                        result.add(row);
                    }
                }
            } catch (Exception ex) {
                // Retry the processing logic. 
                {
                    result.clear();
                    RowIterator iterator = client.createParallelScanIterator(parallelScanRequestByBuilder);
                    while (iterator.hasNext()) {
                        Row row = iterator.next();
                        // Process rows of data. If you have sufficient memory resources, you can add the rows to a list. 
                        result.add(row);
                    }
                }
            }
            return result;
        }
    }
  • Scan data by using multiple threads
    import com.alicloud.openservices.tablestore.SyncClient;
    import com.alicloud.openservices.tablestore.model.ComputeSplitsRequest;
    import com.alicloud.openservices.tablestore.model.ComputeSplitsResponse;
    import com.alicloud.openservices.tablestore.model.Row;
    import com.alicloud.openservices.tablestore.model.SearchIndexSplitsOptions;
    import com.alicloud.openservices.tablestore.model.iterator.RowIterator;
    import com.alicloud.openservices.tablestore.model.search.ParallelScanRequest;
    import com.alicloud.openservices.tablestore.model.search.ScanQuery;
    import com.alicloud.openservices.tablestore.model.search.query.QueryBuilders;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.Semaphore;
    import java.util.concurrent.atomic.AtomicLong;
    
    public class Test {
    
        public static void scanQueryWithMultiThread(final SyncClient client, String tableName, String indexName) throws InterruptedException {
            // Query the number of CPU cores on the client. 
            final int cpuProcessors = Runtime.getRuntime().availableProcessors();
            // Specify the number of parallel threads for the client. We recommend that you specify the number of CPU cores on the client as the number of parallel threads for the client to prevent impact on the query performance. 
            final Semaphore semaphore = new Semaphore(cpuProcessors);
    
            // Query the session ID and the maximum number of parallel scan tasks supported by the request. 
            ComputeSplitsRequest computeSplitsRequest = new ComputeSplitsRequest();
            computeSplitsRequest.setTableName(tableName);
            computeSplitsRequest.setSplitsOptions(new SearchIndexSplitsOptions(indexName));
            ComputeSplitsResponse computeSplitsResponse = client.computeSplits(computeSplitsRequest);
            final byte[] sessionId = computeSplitsResponse.getSessionId();
            final int maxParallel = computeSplitsResponse.getSplitsSize();
    
            // Create an AtomicLong object if you need to obtain the row count for your business. 
            AtomicLong rowCount = new AtomicLong(0);
            /*
             * If you want to perform multithreading by using a function, you can build an internal class to inherit the threads. 
             * You can also build an external class to organize the code. 
             */
            final class ThreadForScanQuery extends Thread {
                private final int currentParallelId;
    
                private ThreadForScanQuery(int currentParallelId) {
                    this.currentParallelId = currentParallelId;
                    this.setName("ThreadForScanQuery:" + maxParallel + "-" + currentParallelId);  // Specify the thread name. 
                }
    
                @Override
                public void run() {
                    System.out.println("start thread:" + this.getName());
                    try {
                        // Execute the processing logic. 
                        {
                            ParallelScanRequest parallelScanRequest = ParallelScanRequest.newBuilder()
                                    .tableName(tableName)
                                    .indexName(indexName)
                                    .scanQuery(ScanQuery.newBuilder()
                                            .query(QueryBuilders.range("col_long").lessThan(10_0000)) // Specify the data to query. 
                                            .limit(2000)
                                            .currentParallelId(currentParallelId)
                                            .maxParallel(maxParallel)
                                            .build())
                                    .addColumnsToGet("col_long", "col_keyword", "col_bool")  // Specify the fields to return from the search index. To return all fields from the search index, set returnAllColumnsFromIndex to true. 
                                    //.returnAllColumnsFromIndex(true)
                                    .sessionId(sessionId)
                                    .build();
                            // Use an iterator to obtain all the data. 
                            RowIterator ltr = client.createParallelScanIterator(parallelScanRequest);
                            long count = 0;
                            while (ltr.hasNext()) {
                                Row row = ltr.next();
                                // Add a custom processing logic. The following sample code shows how to add a custom processing logic to count the number of rows: 
                                count++;
                            }
                            rowCount.addAndGet(count);
                            System.out.println("thread[" + this.getName() + "] finished. this thread get rows:" + count);
                        }
                    } catch (Exception ex) {
                        // If exceptions occur, you can retry the processing logic. 
                    } finally {
                        semaphore.release();
                    }
                }
            }
    
            // Simultaneously execute threads. Valid values of currentParallelId: [0, Value of maxParallel). 
            List<ThreadForScanQuery> threadList = new ArrayList<ThreadForScanQuery>();
            for (int currentParallelId = 0; currentParallelId < maxParallel; currentParallelId++) {
                ThreadForScanQuery thread = new ThreadForScanQuery(currentParallelId);
                threadList.add(thread);
            }
    
            // Simultaneously initiate the threads. 
            for (ThreadForScanQuery thread : threadList) {
                // Specify a value for semaphore to limit the number of threads that can be initiated at the same time to prevent bottlenecks on the client. 
                semaphore.acquire();
                thread.start();
            }
    
            // The main thread is blocked until all threads are complete. 
            for (ThreadForScanQuery thread : threadList) {
                thread.join();
            }
            System.out.println("all thread finished! total rows:" + rowCount.get());
        }
    }