If you do not have requirements on the order of query results, you can use parallel scan to quickly obtain query results.

Background information

The search index feature allows you to call the Search API operation to query data, sort data in a specific order, and aggregate data.

In some cases, a faster query speed may be more important than the order of query results. For example, when you want to connect Tablestore to a cluster computing environment such as Spark or Presto, or you want to query a specified group of objects. To improve query speeds, Tablestore provides the ParallelScan API operation for the search index feature.
Note Tablestore SDK for Java 5.6.0 or later supports the parallel scan feature.

Compared with the Search operation, the ParallelScan operation supports all query features but does not provide analytics capabilities such as sorting and aggregation. This way, query speeds are improved by more than five times. You can call the ParallelScan operation to export hundreds of millions of data rows within a minute. The capability to export data can be horizontally scaled without upper limits.

The maximum number of rows that can be returned by each ParallelScan call is greater than the maximum number of rows that can be returned by each Search call. The Search operation returns up to 100 rows per call, whereas the ParallelScan operation returns up to 2,000 rows per call. The parallel scan feature allows you to use multiple threads to initiate requests in a session in parallel, which accelerates data export.

Scenarios

  • If you want to sort or aggregate query results, or the query request is sent from an end user, use the Search operation.
  • If you do not need to sort query results and want to quickly return all matched results, or the data is pulled by a computing environment such as Spark or Presto, use the ParallelScan operation.

Features

The differences between the ParallelScan operation and the Search operation lie in the following aspects:

  • Stable results

    Parallel scan tasks are stateful. In a session, the result set of the scanned data is determined by the data status when the first request is initiated. If data is inserted or modified after the first request is sent, the result set is not affected.

  • Sessions
    Parallel scan-related operations use sessions. The session ID can be used to determine the result set of scanned data. The following process describes how to obtain and use a session ID:
    1. Use the ComputeSplits operation to query the maximum number of parallel scan tasks and the current session ID.
    2. Initiate multiple parallel scan requests to read data. You must specify the current session ID and the parallel scan task IDs in these requests.

    If the session ID is difficult to obtain, you can call the ParallelScan operation to initiate a request without a specified session ID. However, if you send a request without a specified session ID, there is a very low probability that the obtained result set contains duplicate data.

    Tablestore returns the OTSSessionExpired error code when network exceptions, thread exceptions, dynamic modifications on schemas, or index switchovers occur in the parallel scan process and data scans stop. In these cases, you must initiate another parallel scan task to scan data again.

  • Maximum number of parallel scan tasks in a single request

    The maximum number of parallel scan tasks in a single request supported by the ParallelScan operation is determined by the return value of the ComputeSplits request. A larger volume of data requires more parallel scan tasks in a session.

    A single request is specified by one query statement. For example, if you use the Search operation to query results in which the value of city is Hangzhou, all data that matches this condition is returned in the result. However, if you use the ParallelScan operation and the number of parallel scan tasks in a session is 2, each ParallelScan request returns half of the results. The complete result set consists of the two parallel result sets.

  • Maximum number of rows that can be returned by each ParallelScan call

    The default value of limit is 2000. The maximum value of limit is 2000. If you enter a value greater than 2000, the performance is not affected.

  • Performance

    The query speed of a ParallelScan request that includes a parallel scan task is five times faster than the query speed of a Search request. When you use parallel scan, the query speed increases together with the number of parallel scan tasks in a session. For example, if eight parallel scan tasks are included in a session, the query speed can be improved by four times.

  • Cost

    ParallelScan requests consume fewer resources and are offered at a lower price. To export large amounts of data, we recommend that you use the ParallelScan operation.

  • Columns to return

    Only indexed columns can be returned from search indexes. You can set the ReturnType parameter to RETURN_ALL_INDEX or RETURN_SPECIFIED, but not to RETURN_ALL.

    The ParallelScan operation can return only values of the ARRAY and GEOPOINT columns. However, the return values are formatted and may be different from the values that are written to the data table. For example, if you write [1,2, 3, 4] to an ARRAY column, the ParallelScan operation returns [1,2,3,4] as the value. If you write 10,50 to a GEOPOINT column, the ParallelScan operation returns 10.0,50.0 as the value.

  • Limits

    The maximum number of parallel scan tasks is 10. You can adjust this limit based on your business requirements. Parallel tasks that have the same session ID and the same ScanQuery parameter value are considered one task. A parallel scan task starts from the time when you send the first ParallelScan request, and ends when all data is scanned or the token expires.

API operations

You can call the following API operations to use the parallel scan feature:
  • ComputeSplits: You can call this operation to query the maximum number of parallel scan tasks for a single ParallelScan request.
  • ParallelScan: You can call this operation to export data.

Use Tablestore SDKs

You can use the following Tablestore SDKs to scan data in parallel:

Parameters

Parameter Description
tableName The name of the data table.
indexName The name of the search index.
scanQuery query The query statement for the search index. The operation supports term query, fuzzy query, range query, geo query, and nested query, which are similar to those of the Search operation.
limit The maximum number of rows that can be returned by each ParallelScan call.
maxParallel The maximum number of parallel scan tasks per request. The maximum number of parallel scan tasks per request varies based on the data volume. A larger volume of data requires more parallel scan tasks per request. You can use the ComputeSplits operation to query the maximum number of parallel scan tasks per request.
currentParallelId The ID of the parallel scan task in the request. Valid values: [0, Value of maxParallel)
token The token that is used to paginate query results. The results of the ParallelScan request contain the token for the next page. You can use the token to retrieve the next page.
aliveTime The validity period of the current parallel scan task. This validity period is also the validity period of the token. Unit: seconds. Default value: 60. We recommend that you use the default value. If the next request is not initiated within the validity period, more data cannot be queried. The validity time of the token is refreshed each time you send a request.
Note The server uses the asynchronous method to process expired tasks. The current task does not expire within the validity period. However, Tablestore does not guarantee that the task expires after the validity period.
columnsToGet You can use parallel scan to scan data only in search indexes. To use parallel scan for a search index, you must set store to true when you create the search index.
sessionId The session ID of the parallel scan task. You can call the ComputeSplits operation to create a session and query the maximum number of parallel scan tasks that are supported by the parallel scan request.

Examples

The following code provides examples on how to scan data by using a single thread or by using multiple threads at the same time:

  • Scan data by using a single thread

    When you use parallel scan, the code for a request that uses a single thread is simpler than the code for a request that uses multiple threads. The currentParallelId and maxParallel parameters are not required for a request that uses a single thread. The ParallelScan request that uses a single thread provides higher throughput than the Search request. However, the ParallelScan request that uses a single thread provides lower throughput than the ParallelScan request that uses multiple threads. For more information about how to scan data by using multiple threads at the same time, see the "Scan data by using multiple threads" section.

    import java.util.ArrayList;
    import java.util.Arrays;
    import java.util.List;
    
    import com.alicloud.openservices.tablestore.SyncClient;
    import com.alicloud.openservices.tablestore.model.ComputeSplitsRequest;
    import com.alicloud.openservices.tablestore.model.ComputeSplitsResponse;
    import com.alicloud.openservices.tablestore.model.Row;
    import com.alicloud.openservices.tablestore.model.SearchIndexSplitsOptions;
    import com.alicloud.openservices.tablestore.model.iterator.RowIterator;
    import com.alicloud.openservices.tablestore.model.search.ParallelScanRequest;
    import com.alicloud.openservices.tablestore.model.search.ParallelScanResponse;
    import com.alicloud.openservices.tablestore.model.search.ScanQuery;
    import com.alicloud.openservices.tablestore.model.search.SearchRequest.ColumnsToGet;
    import com.alicloud.openservices.tablestore.model.search.query.MatchAllQuery;
    import com.alicloud.openservices.tablestore.model.search.query.Query;
    import com.alicloud.openservices.tablestore.model.search.query.QueryBuilders;
    
    public class Test {
    
        public static List<Row> scanQuery(final SyncClient client) {
            String tableName = "<TableName>";
            String indexName = "<IndexName>";
            // Query the session ID and the maximum number of parallel scan tasks supported by the request. 
            ComputeSplitsRequest computeSplitsRequest = new ComputeSplitsRequest();
            computeSplitsRequest.setTableName(tableName);
            computeSplitsRequest.setSplitsOptions(new SearchIndexSplitsOptions(indexName));
            ComputeSplitsResponse computeSplitsResponse = client.computeSplits(computeSplitsRequest);
            byte[] sessionId = computeSplitsResponse.getSessionId();
            int splitsSize = computeSplitsResponse.getSplitsSize();
            /*
             * Create a parallel scan request. 
             */
            ParallelScanRequest parallelScanRequest = new ParallelScanRequest();
            parallelScanRequest.setTableName(tableName);
            parallelScanRequest.setIndexName(indexName);
            ScanQuery scanQuery = new ScanQuery();
            // This query determines the range of the data to scan. You can create a nested and complex query. 
            Query query = new MatchAllQuery();
            scanQuery.setQuery(query);
    
            // Specify the maximum number of rows that can be returned by each ParallelScan call. 
            scanQuery.setLimit(2000);
            parallelScanRequest.setScanQuery(scanQuery);
            ColumnsToGet columnsToGet = new ColumnsToGet();
            columnsToGet.setColumns(Arrays.asList("col_1", "col_2"));
            parallelScanRequest.setColumnsToGet(columnsToGet);
            parallelScanRequest.setSessionId(sessionId);
    
            /*
             * Use builder to create a parallel scan request that has the same features as the preceding request. 
             */
            ParallelScanRequest parallelScanRequestByBuilder = ParallelScanRequest.newBuilder()
                .tableName(tableName)
                .indexName(indexName)
                .scanQuery(ScanQuery.newBuilder()
                    .query(QueryBuilders.matchAll())
                    .limit(2000)
                    .build())
                .addColumnsToGet("col_1", "col_2")
                .sessionId(sessionId)
                .build();
            List<Row> result = new ArrayList<>();
    
            /*
             * Use the native API operation to scan data. 
             */
            {
                ParallelScanResponse parallelScanResponse = client.parallelScan(parallelScanRequest);
                // Query the token of ScanQuery for the next request. 
                byte[] nextToken = parallelScanResponse.getNextToken();
                // Obtain the data. 
                List<Row> rows = parallelScanResponse.getRows();
                result.addAll(rows);
                while (nextToken != null) {
                    // Specify the token. 
                    parallelScanRequest.getScanQuery().setToken(nextToken);
                    // Continue to scan the data. 
                    parallelScanResponse = client.parallelScan(parallelScanRequest);
                    // Obtain the data. 
                    rows = parallelScanResponse.getRows();
                    result.addAll(rows);
                    nextToken = parallelScanResponse.getNextToken();
                }
            }
    
            /*
             * Recommended method. 
             * Use an iterator to scan all matched data. This method has the same query speed but is easier to use compared with the previous method. 
             */
            {
                RowIterator iterator = client.createParallelScanIterator(parallelScanRequestByBuilder);
                while (iterator.hasNext()) {
                    Row row = iterator.next();
                    result.add(row);
                    // Obtain the specific values. 
                    String col_1 = row.getLatestColumn("col_1").getValue().asString();
                    long col_2 = row.getLatestColumn("col_2").getValue().asLong();
                }
            }
    
            /*
             * If the operation fails, retry the operation. If the caller of this function has a retry mechanism or if you do not want to retry the failed operation, you can ignore this part. 
             * To ensure availability, we recommend that you start a new parallel scan task when exceptions occur. 
             * The following exceptions may occur when you send a ParallelScan request:
             * 1. A session exception occurs on the server side. The error code is OTSSessionExpired. 
             * 2. An exception such as a network exception occurs on the client side. 
             */
            try {
                // Execute the processing logic. 
                {
                    RowIterator iterator = client.createParallelScanIterator(parallelScanRequestByBuilder);
                    while (iterator.hasNext()) {
                        Row row = iterator.next();
                        // Process rows of data. If you have sufficient memory resources, you can add the rows to a list. 
                        result.add(row);
                    }
                }
            } catch (Exception ex) {
                // Retry the processing logic. 
                {
                    result.clear();
                    RowIterator iterator = client.createParallelScanIterator(parallelScanRequestByBuilder);
                    while (iterator.hasNext()) {
                        Row row = iterator.next();
                        // Process rows of data. If you have sufficient memory resources, you can add the rows to a list. 
                        result.add(row);
                    }
                }
            }
            return result;
        }
    }
  • Scan data by using multiple threads
    import com.alicloud.openservices.tablestore.SyncClient;
    import com.alicloud.openservices.tablestore.model.ComputeSplitsRequest;
    import com.alicloud.openservices.tablestore.model.ComputeSplitsResponse;
    import com.alicloud.openservices.tablestore.model.Row;
    import com.alicloud.openservices.tablestore.model.SearchIndexSplitsOptions;
    import com.alicloud.openservices.tablestore.model.iterator.RowIterator;
    import com.alicloud.openservices.tablestore.model.search.ParallelScanRequest;
    import com.alicloud.openservices.tablestore.model.search.ScanQuery;
    import com.alicloud.openservices.tablestore.model.search.query.QueryBuilders;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.Semaphore;
    import java.util.concurrent.atomic.AtomicLong;
    
    public class Test {
    
        public static void scanQueryWithMultiThread(final SyncClient client, String tableName, String indexName) throws InterruptedException {
            // Query the number of CPU cores on the client. 
            final int cpuProcessors = Runtime.getRuntime().availableProcessors();
            // Specify the number of parallel threads for the client. We recommend that you specify the number of CPU cores on the client as the number of parallel threads for the client to prevent impact on the query performance. 
            final Semaphore semaphore = new Semaphore(cpuProcessors);
    
            // Query the session ID and the maximum number of parallel scan tasks supported by the request. 
            ComputeSplitsRequest computeSplitsRequest = new ComputeSplitsRequest();
            computeSplitsRequest.setTableName(tableName);
            computeSplitsRequest.setSplitsOptions(new SearchIndexSplitsOptions(indexName));
            ComputeSplitsResponse computeSplitsResponse = client.computeSplits(computeSplitsRequest);
            final byte[] sessionId = computeSplitsResponse.getSessionId();
            final int maxParallel = computeSplitsResponse.getSplitsSize();
    
            // Create an AtomicLong object if you need to obtain the row count for your business. 
            AtomicLong rowCount = new AtomicLong(0);
            /*
             * If you want to perform multithreading by using a function, you can build an internal class to inherit the threads. 
             * You can also build an external class to organize the code. 
             */
            final class ThreadForScanQuery extends Thread {
                private final int currentParallelId;
    
                private ThreadForScanQuery(int currentParallelId) {
                    this.currentParallelId = currentParallelId;
                    this.setName("ThreadForScanQuery:" + maxParallel + "-" + currentParallelId);  // Specify the thread name. 
                }
    
                @Override
                public void run() {
                    System.out.println("start thread:" + this.getName());
                    try {
                        // Execute the processing logic. 
                        {
                            ParallelScanRequest parallelScanRequest = ParallelScanRequest.newBuilder()
                                    .tableName(tableName)
                                    .indexName(indexName)
                                    .scanQuery(ScanQuery.newBuilder()
                                            .query(QueryBuilders.range("col_long").lessThan(10_0000)) // Specify the data to query. 
                                            .limit(2000)
                                            .currentParallelId(currentParallelId)
                                            .maxParallel(maxParallel)
                                            .build())
                                    .addColumnsToGet("col_long", "col_keyword", "col_bool")  // Specify the fields to return from the search index. To return all fields from the search index, set returnAllColumnsFromIndex to true. 
                                    //.returnAllColumnsFromIndex(true)
                                    .sessionId(sessionId)
                                    .build();
                            // Use an iterator to obtain all the data. 
                            RowIterator ltr = client.createParallelScanIterator(parallelScanRequest);
                            long count = 0;
                            while (ltr.hasNext()) {
                                Row row = ltr.next();
                                // Add a custom processing logic. The following sample code shows how to add a custom processing logic to count the number of rows: 
                                count++;
                            }
                            rowCount.addAndGet(count);
                            System.out.println("thread[" + this.getName() + "] finished. this thread get rows:" + count);
                        }
                    } catch (Exception ex) {
                        // If exceptions occur, you can retry the processing logic. 
                    } finally {
                        semaphore.release();
                    }
                }
            }
    
            // Simultaneously execute threads. Valid values of currentParallelId: [0, Value of maxParallel). 
            List<ThreadForScanQuery> threadList = new ArrayList<ThreadForScanQuery>();
            for (int currentParallelId = 0; currentParallelId < maxParallel; currentParallelId++) {
                ThreadForScanQuery thread = new ThreadForScanQuery(currentParallelId);
                threadList.add(thread);
            }
    
            // Simultaneously initiate the threads. 
            for (ThreadForScanQuery thread : threadList) {
                // Specify a value for semaphore to limit the number of threads that can be initiated at the same time to prevent bottlenecks on the client. 
                semaphore.acquire();
                thread.start();
            }
    
            // The main thread is blocked until all threads are complete. 
            for (ThreadForScanQuery thread : threadList) {
                thread.join();
            }
            System.out.println("all thread finished! total rows:" + rowCount.get());
        }
    }