This topic describes the ParallelScan API of search index.

API description

The Search API of search index provides a full set of query features and analysis capabilities such as sorting and aggregation to help return query results in a specified order.

However, in some cases you may be more concerned about the query speed for obtaining all matched data rather than the order of the query results, such as when you integrate the cluster computing system Apache Spark or Presto or when you select a computing engine. To meet such requirements, Tablestore introduces the ParallelScan API.

Compared with the Search API, the ParallelScan API supports all the query features but does not provide analysis capabilities such as sorting and aggregation, improving the query speed for more than five times. You can call the ParallelScan API to export hundreds of millions of data records within one minute. The capability to export data can be horizontally expanded without limits.

The maximum number of rows to return through a single ParallelScan request is greater than that through a Search request. You can call the Search API to query up to 100 rows in a single request, whereas you can call the ParallelScan API to query up to 2,000 rows in a single request. The limit for the ParallelScan requests will be raised to 5,000 rows in the next version. To accelerate data exports, search index supports the use of multiple threads to initiate requests.

Concurrent data export is implemented through the following operations.
  • ComputeSplits: You can call this operation to query the maximum number of concurrent queries for a single ParallelScan request.
  • ParallelScan: You can call this operation to export data.

SDKs 5.7.0 and later support the ParallelScan API.

Features

Compared with the Search API, the ParallelScan API has some typical features.

  • Stable results

    ParallelScan tasks are subject to the data status. In a session, the result set of the scanned data is determined by the data status when the request is initiated for the first time. If you insert new data or modify existing data after you send the first request, the result set is not affected.

  • Sessions
    ParallelScan-related operations use sessions. You can specify session IDs to ensure that the returned result set is stable. Perform the following steps to use a session ID to export data:
    1. Use the ComputeSplits operation to query the maximum concurrent queries and the current session ID.
    2. Initiate multiple concurrent ParallelScan requests to read data. You must specify the current session ID and the concurrency ID in these requests.

    You can use ParallelScan to query data without specifying the session ID. However, there is a very low probability that you may obtain a result set that contains duplicate data if you do not specify the session ID.

    If a network exception, thread exception, dynamic schema modification, or A/B testing occurs when you perform the ParallelScan operation, the server returns the OTSSessionExpired error. In this case, you cannot continue to scan data, and you must initiate a new ParallelScan task to query data from the beginning.

  • Maximum number of concurrent queries

    The maximum number of concurrent queries in a single ParallelScan request is determined by the return value of ComputeSplits. The larger the data volume, the more concurrent queries are supported. A single request is specified by one query statement. For example, if you use the Search API to query results where the value of city is Hangzhou, all data that matches this condition is returned in the result. However, if you use the ParallelScan API and the return value of ComputeSplits is 2, each query returns half of the results. The complete result set consists of the two concurrent result sets.

  • Rows to return for each request

    The default value of limit is 2000, and the maximum value of limit is 5000. When the number of rows to return for each request exceeds 5,000, changes of the value of limit will not affect the query performance.

  • Performance

    The performance of the ParallelScan API is five times that of the Search API. When the number of concurrent queries increases, the performance of ParallelScan requests is improved linearly. For example, the performance of a ParallelScan request can be improved fourfold when there are eight concurrent queries.

  • Costs

    ParallelScan requests consume less resources and are offered at a lower price. To export a large amount of data, we recommend that you use the ParallelScan API.

  • Columns to return

    You can only query indexed columns in search indexes. The ReturnType parameter can be set to RETURN_ALL_INDEX or RETURN_SPECIFIED, but not RETURN_ALL.

    In addition, arrays, geographical locations, and nested fields cannot be returned. If you want to query data of these three types, submit a ticket or contact us through DingTalk. We will prioritize your needs.

  • Limits

    The number of concurrent ParallelScan tasks is limited to 10. This limit will be adjusted to meet your requirements in the future. Concurrent tasks with the same session ID and the same ScanQuery parameter value are considered as one task. A ParallelScan task starts when the ParallelScan request is sent for the first time, and ends when all data is scanned or the token expires.

Scenarios

  • Use the Search API if you want to sort or aggregate query results, or if the query request is sent from an end user.
  • Use the ParallelScan API if you do not want to sort query results, but want to obtain all matched results in a quick manner, or if the data is queried by a computing system such as Apache Spark or Presto.

Parameters

Parameter Description
tableName The name of the base table.
indexName The name of the search index.
scanQuery query The query statement of the search index. The operation supports exact query, fuzzy query, query by range, query by geographical location, and nested query.
limit The number of data entries to return at a time.
maxParallel The maximum number of concurrent queries for each request. The maximum concurrent queries of a request varies with the data volume. The larger the data volume, the more concurrent queries are supported. You can use the ComputeSplits operation to query the maximum number of concurrent queries.
currentParallelId The ID of the concurreny. Valid values: [0, Value of maxParallel)
token The token used for paging. The result of the ParallelScan request contains the token for the next paging. You can continue to query data following the previous page based on the token.
aliveTime The validity period of the current ParallelScan task. It is also the validity period of the token. We recommend that you use the default value. If the next request is not initiated within the validity period, no more data can be queried. Constantly sending requests will refresh the validity period of the token.
columnsToGet The columns to obtain. You can use ParallelScan to scan only data in search indexes. You must set store to true to use ParallelScan when you create the search index. You can not use ParallelScan to scan data of the following types: Array, Nested, and GeoPoint.
sessionId The session ID of the ParallelScan task. You can call the ComputeSplits operation to create a session and query the maximum number of concurrent queries that is supported by the task.

Java examples

  • Scan data by a single request

    When you use ParallelScan, the code for a single request is simpler than that for a multi-concurrency request. The currentParallelId and maxParallel parameters are not required for a single request.

    import java.util.Arrays;
    import java.util.List;
    import com.alicloud.openservices.tablestore.SyncClient;
    import com.alicloud.openservices.tablestore.model.ComputeSplitsRequest;
    import com.alicloud.openservices.tablestore.model.ComputeSplitsResponse;
    import com.alicloud.openservices.tablestore.model.Row;
    import com.alicloud.openservices.tablestore.model.SearchIndexSplitsOptions;
    import com.alicloud.openservices.tablestore.model.iterator.RowIterator;
    import com.alicloud.openservices.tablestore.model.search.ParallelScanRequest;
    import com.alicloud.openservices.tablestore.model.search.ParallelScanResponse;
    import com.alicloud.openservices.tablestore.model.search.ScanQuery;
    import com.alicloud.openservices.tablestore.model.search.SearchRequest.ColumnsToGet;
    import com.alicloud.openservices.tablestore.model.search.query.MatchAllQuery;
    import com.alicloud.openservices.tablestore.model.search.query.Query;
    import com.alicloud.openservices.tablestore.model.search.query.QueryBuilders;
     public class Test {
     public static void boolQuery(SyncClient client) {        
            String tableName = "<TableName>";
            String indexName = "<IndexName>";
            // Query the session ID and the maximum number of concurrent queries supported by this request.
            ComputeSplitsRequest computeSplitsRequest = new ComputeSplitsRequest();
            computeSplitsRequest.setTableName(tableName);
            computeSplitsRequest.setSplitsOptions(new SearchIndexSplitsOptions(indexName));
            ComputeSplitsResponse computeSplitsResponse = client.computeSplits(computeSplitsRequest);
            byte[] sessionId = computeSplitsResponse.getSessionId();
            int splitsSize = computeSplitsResponse.getSplitsSize();
            /*
             *  Create a ParallelScan request.
             */
            ParallelScanRequest parallelScanRequest = new ParallelScanRequest();
            parallelScanRequest.setTableName(tableName);
            parallelScanRequest.setIndexName(indexName);
            ScanQuery scanQuery = new ScanQuery();
            // This query determines the range of the data to be scanned. You can create a nested and complex query.
            Query query = new MatchAllQuery();
            scanQuery.setQuery(query);
            // Set the number of rows to return for a single request.
            scanQuery.setLimit(2000);
            parallelScanRequest.setScanQuery(scanQuery);
            ColumnsToGet columnsToGet = new ColumnsToGet();
            columnsToGet.setColumns(Arrays.asList("col_1", "col_2"));
            parallelScanRequest.setColumnsToGet(columnsToGet);
            parallelScanRequest.setSessionId(sessionId);
            /*
             * You can create a ParallelScan request in the builder mode, which serves the same function as the request created by using the previous code.
             */
            ParallelScanRequest parallelScanRequestByBuilder = ParallelScanRequest.newBuilder()
                .tableName(tableName)
                .indexName(indexName)
                .scanQuery(ScanQuery.newBuilder()
                    .query(QueryBuilders.matchAll())
                    .limit(2000)
                    .build())
                .addColumnsToGet("col_1", "col_2")
                .sessionId(sessionId)
                .build();
            /*
             * Use the native API operations to scan data.
             */
            ParallelScanResponse parallelScanResponse = client.parallelScan(parallelScanRequest);
            // Query the token of ScanQuery for the next request.
            byte[] nextToken = parallelScanResponse.getNextToken();
            // Obtain the data.
            List<Row> rows = parallelScanResponse.getRows();
            while (nextToken != null) {
                // Set the token.
                parallelScanRequest.getScanQuery().setToken(nextToken);
                // Continue to scan data.
                parallelScanResponse = client.parallelScan(parallelScanRequest);
                // Obtain the data.
                rows = parallelScanResponse.getRows();
                nextToken = parallelScanResponse.getNextToken();
            }
            /*
             *  Use an iterator to scan all matched data.         
             *  This method takes the same amount of time as the preceding method but is easier to implement. Therefore, we recommend that you use this method.
             */        
            RowIterator iterator = client.createParallelScanIterator(parallelScanRequestByBuilder);
            while (iterator.hasNext()) {
                Row row = iterator.next();
                // Obtain the specific values.            
                String col_1 = row.getLatestColumn("col_1").getValue().asString();
                long col_2 = row.getLatestColumn("col_2").getValue().asLong();
            }
        } 
    }
  • Scan data by using multiple threads at a time
    import java.util.ArrayList;
    import java.util.Arrays;
    import java.util.List;
    import com.alicloud.openservices.tablestore.SyncClient;
    import com.alicloud.openservices.tablestore.model.ComputeSplitsRequest;
    import com.alicloud.openservices.tablestore.model.ComputeSplitsResponse;
    import com.alicloud.openservices.tablestore.model.Row;
    import com.alicloud.openservices.tablestore.model.SearchIndexSplitsOptions;
    import com.alicloud.openservices.tablestore.model.iterator.RowIterator;
    import com.alicloud.openservices.tablestore.model.search.ParallelScanRequest;
    import com.alicloud.openservices.tablestore.model.search.ParallelScanResponse;
    import com.alicloud.openservices.tablestore.model.search.ScanQuery;
    import com.alicloud.openservices.tablestore.model.search.SearchRequest.ColumnsToGet;
    import com.alicloud.openservices.tablestore.model.search.query.MatchAllQuery;
    import com.alicloud.openservices.tablestore.model.search.query.Query;
    import com.alicloud.openservices.tablestore.model.search.query.QueryBuilders;
     public class Test {
         public static void scanQueryWithMultiThread(final SyncClient client) throws InterruptedException {
            final String tableName = "yourTableName";
            final String indexName = "yourIndexName";
            // Query the session ID and the maximum number of concurrent queries supported by this request.
            ComputeSplitsRequest computeSplitsRequest = new ComputeSplitsRequest();
            computeSplitsRequest.setTableName(tableName);
            computeSplitsRequest.setSplitsOptions(new SearchIndexSplitsOptions(indexName));
            ComputeSplitsResponse computeSplitsResponse = client.computeSplits(computeSplitsRequest);
            final byte[] sessionId = computeSplitsResponse.getSessionId();
            final int maxParallel = computeSplitsResponse.getSplitsSize();
            /*
             *  To use a function to implement multithreading, you can build an internal class to inherit the thread.
             *  You can build a typical external class to complete the code.
             */
            final class ThreadForScanQuery extends Thread {
                private ParallelScanRequest parallelScanRequest;
                private ThreadForScanQuery(int currentParallelId) {
                    this.setName("ThreadForScanQuery:" + maxParallel + "-" + currentParallelId);  // Set the thread name.                this.parallelScanRequest = ParallelScanRequest.newBuilder()
                        .tableName(tableName)
                        .indexName(indexName)
                        .scanQuery(ScanQuery.newBuilder()
                            .query(QueryBuilders.range("col_long").lessThan(123)) // Specify what data is queried.                        .limit(2000)
                            .currentParallelId(currentParallelId)
                            .maxParallel(maxParallel)
                            .build())
                        .addColumnsToGet("col_1", "col_2", "col_3")  // You can only query fields in the search index. Alternatively, you can set returnAllColumnsFromIndex to true to obtain data in all indexes.                    //.returnAllColumnsFromIndex(true)
                        .sessionId(sessionId)
                        .build();
                }
                @Override
                public void run() {
                    // Use an iterator to query all the data.
                    RowIterator ltr = client.createParallelScanIterator(parallelScanRequest);
                    System.out.println("thread name:" + this.getName());
                    long count = 0;
                    while (ltr.hasNext()) {
                        Row row = ltr.next();
                        // Add your processing logic.
                        count++;
                    }
                    System.out.println("thread name:" + this.getName() + ", total data records obtained by the thread:" + count);
                }
            }
            // Execute multiple threads at a time. Valid values of currentParallelId: [0, Value of maxParallel).
            List<ThreadForScanQuery> threadList = new ArrayList<ThreadForScanQuery>();
            for (int i = 0; i < maxParallel; i++) {
                ThreadForScanQuery thread = new ThreadForScanQuery(i);
                threadList.add(thread);
            }
            // Initiate all threads.
            for (ThreadForScanQuery thread : threadList) {
                thread.start();
            }
            // Block the main thread until all threads are complete.
            for (ThreadForScanQuery thread : threadList) {
                thread.join();
            }
            System.out.println("all thread done!") ;
        }
     }