全部产品
Search
文档中心

并发导出数据

更新时间: 2022-01-26

当使用场景中不关心整个结果集的顺序时,可以使用并发导出数据(ParallelScan)功能以更快的速度将匹配的数据全部返回。

前提条件

  • 已初始化Client。具体操作,请参见初始化

  • 已创建数据表并写入数据。

  • 已在数据表上创建多元索引。具体操作,请参见创建多元索引

参数

参数

说明

TableName

数据表名称。

IndexName

多元索引名称。

ScanQuery

Query

多元索引的查询语句。支持精确查询、模糊查询、范围查询、地理位置查询、嵌套查询等,功能和Search接口一致。

Limit

扫描数据时一次能返回的数据行数。

MaxParallel

最大并发数。请求支持的最大并发数由用户数据量决定。数据量越大,支持的并发数越多,每次任务前可以通过ComputeSplits API进行获取。

CurrentParallelID

当前并发ID。取值范围为[0, MaxParallel)。

Token

用于翻页功能。ParallelScan请求结果中有下一次进行翻页的Token,使用该Token可以接着上一次的结果继续读取数据。

AliveTime

ParallelScan的当前任务有效时间,也是Token的有效时间。默认值为60,建议使用默认值,单位为秒。如果在有效时间内没有发起下一次请求,则不能继续读取数据。持续发起请求会刷新Token有效时间。

说明

由于服务端采用异步方式清理过期任务,因此当前任务只保证在设置的有效时间内不会过期,但不能保证有效时间之后一定过期。

ColumnsToGet

ParallelScan目前仅可以扫描多元索引中的数据,需要在创建多元索引时设置附加存储(即Store=true)。

SessionId

本次并发扫描数据任务的SessionId。您可以通过ComputeSplits API创建Session,同时获得本次任务支持的最大并发数。

示例

单并发扫描数据和多线程并发扫描数据的代码示例如下:

  • 单并发扫描数据

    /**
     * ParallelScan单并发扫描数据。
     */
    func ParallelScanSingleConcurrency(client *tablestore.TableStoreClient, tableName string, indexName string) {
            computeSplitsResp, err := computeSplits(client, tableName, indexName)
            if err != nil {
                    fmt.Printf("%#v", err)
                    return
            }
    
            query := search.NewScanQuery().SetQuery(&search.MatchAllQuery{}).SetLimit(2)
    
            req := &tablestore.ParallelScanRequest{}
            req.SetTableName(tableName).
                    SetIndexName(indexName).
                    SetColumnsToGet(&tablestore.ColumnsToGet{ReturnAllFromIndex: false}).
                    SetScanQuery(query).
                    SetSessionId(computeSplitsResp.SessionId)
    
            res, err := client.ParallelScan(req)
            if err != nil {
                    fmt.Printf("%#v", err)
                    return
            }
    
            total := len(res.Rows)
            for res.NextToken != nil {
                    req.SetScanQuery(query.SetToken(res.NextToken))
                    res, err = client.ParallelScan(req)
                    if err != nil {
                            fmt.Printf("%#v", err)
                            return
                    }
    
                    total += len(res.Rows) //process rows each loop
            }
            fmt.Println("total: ", total)
    }
    
  • 多线程并发扫描数据

    /**
     * ParallelScan多并发扫描数据。
     */
    func ParallelScanMultiConcurrency(client *tablestore.TableStoreClient, tableName string, indexName string) {
            computeSplitsResp, err := computeSplits(client, tableName, indexName)
            if err != nil {
                    fmt.Printf("%#v", err)
                    return
            }
    
            var wg sync.WaitGroup
            wg.Add(int(computeSplitsResp.SplitsSize))
    
            for i := int32(0); i < computeSplitsResp.SplitsSize; i++ {
                    current := i
                    go func() {
                            defer wg.Done()
                            query := search.NewScanQuery().
                                    SetQuery(&search.MatchAllQuery{}).
                                    SetCurrentParallelID(current).
                                    SetMaxParallel(computeSplitsResp.SplitsSize).
                                    SetLimit(2)
    
                            req := &tablestore.ParallelScanRequest{}
                            req.SetTableName(tableName).
                                    SetIndexName(indexName).
                                    SetColumnsToGet(&tablestore.ColumnsToGet{ReturnAllFromIndex: false}).
                                    SetScanQuery(query).
                                    SetSessionId(computeSplitsResp.SessionId)
    
                            res, err := client.ParallelScan(req)
                            if err != nil {
                                    fmt.Printf("%#v", err)
                                    return
                            }
    
                            total := len(res.Rows)
                            for res.NextToken != nil {
                                    req.SetScanQuery(query.SetToken(res.NextToken))
                                    res, err = client.ParallelScan(req)
                                    if err != nil {
                                            fmt.Printf("%#v", err)
                                            return
                                    }
    
                                    total += len(res.Rows) //process rows each loop
                            }
                            fmt.Println("total: ", total)
                    }()
            }
            wg.Wait()
    }