全部產品
Search
文件中心

Tablestore:並發匯出資料

更新時間:May 12, 2022

當使用情境中不關心整個結果集的順序時,可以使用並發匯出資料(ParallelScan)功能以更快的速度將匹配的資料全部返回。

前提條件

  • 已初始化Client。具體操作,請參見初始化

  • 已建立資料表並寫入資料。

  • 已在資料表上建立多元索引。具體操作,請參見建立多元索引

參數

參數

說明

TableName

資料表名稱。

IndexName

多元索引名稱。

ScanQuery

Query

多元索引的查詢語句。支援精確查詢、模糊查詢、範圍查詢、地理位置查詢、巢狀查詢等,功能和Search介面一致。

Limit

掃描資料時一次能返回的資料行數。

MaxParallel

最大並發數。請求支援的最大並發數由使用者資料量決定。資料量越大,支援的並發數越多,每次任務前可以通過ComputeSplits API進行擷取。

CurrentParallelID

當前並發ID。取值範圍為[0, MaxParallel)。

Token

用於翻頁功能。ParallelScan請求結果中有下一次進行翻頁的Token,使用該Token可以接著上一次的結果繼續讀取資料。

AliveTime

ParallelScan的當前任務有效時間,也是Token的有效時間。預設值為60,建議使用預設值,單位為秒。如果在有效時間內沒有發起下一次請求,則不能繼續讀取資料。持續發起請求會重新整理Token有效時間。

說明

由於服務端採用非同步方式清理到期任務,因此當前任務只保證在設定的有效時間內不會到期,但不能保證有效時間之後一定到期。

ColumnsToGet

ParallelScan目前僅可以掃描多元索引中的資料,需要在建立多元索引時設定附加儲存(即Store=true)。

SessionId

本次並發掃描資料任務的SessionId。您可以通過ComputeSplits API建立Session,同時獲得本次任務支援的最大並發數。

樣本

單並發掃描資料和多線程並發掃描資料的程式碼範例如下:

  • 單並發掃描資料

    /**
     * ParallelScan單並發掃描資料。
     */
    func ParallelScanSingleConcurrency(client *tablestore.TableStoreClient, tableName string, indexName string) {
            computeSplitsResp, err := computeSplits(client, tableName, indexName)
            if err != nil {
                    fmt.Printf("%#v", err)
                    return
            }
    
            query := search.NewScanQuery().SetQuery(&search.MatchAllQuery{}).SetLimit(2)
    
            req := &tablestore.ParallelScanRequest{}
            req.SetTableName(tableName).
                    SetIndexName(indexName).
                    SetColumnsToGet(&tablestore.ColumnsToGet{ReturnAllFromIndex: false}).
                    SetScanQuery(query).
                    SetSessionId(computeSplitsResp.SessionId)
    
            res, err := client.ParallelScan(req)
            if err != nil {
                    fmt.Printf("%#v", err)
                    return
            }
    
            total := len(res.Rows)
            for res.NextToken != nil {
                    req.SetScanQuery(query.SetToken(res.NextToken))
                    res, err = client.ParallelScan(req)
                    if err != nil {
                            fmt.Printf("%#v", err)
                            return
                    }
    
                    total += len(res.Rows) //process rows each loop
            }
            fmt.Println("total: ", total)
    }
    
  • 多線程並發掃描資料

    /**
     * ParallelScan多並發掃描資料。
     */
    func ParallelScanMultiConcurrency(client *tablestore.TableStoreClient, tableName string, indexName string) {
            computeSplitsResp, err := computeSplits(client, tableName, indexName)
            if err != nil {
                    fmt.Printf("%#v", err)
                    return
            }
    
            var wg sync.WaitGroup
            wg.Add(int(computeSplitsResp.SplitsSize))
    
            for i := int32(0); i < computeSplitsResp.SplitsSize; i++ {
                    current := i
                    go func() {
                            defer wg.Done()
                            query := search.NewScanQuery().
                                    SetQuery(&search.MatchAllQuery{}).
                                    SetCurrentParallelID(current).
                                    SetMaxParallel(computeSplitsResp.SplitsSize).
                                    SetLimit(2)
    
                            req := &tablestore.ParallelScanRequest{}
                            req.SetTableName(tableName).
                                    SetIndexName(indexName).
                                    SetColumnsToGet(&tablestore.ColumnsToGet{ReturnAllFromIndex: false}).
                                    SetScanQuery(query).
                                    SetSessionId(computeSplitsResp.SessionId)
    
                            res, err := client.ParallelScan(req)
                            if err != nil {
                                    fmt.Printf("%#v", err)
                                    return
                            }
    
                            total := len(res.Rows)
                            for res.NextToken != nil {
                                    req.SetScanQuery(query.SetToken(res.NextToken))
                                    res, err = client.ParallelScan(req)
                                    if err != nil {
                                            fmt.Printf("%#v", err)
                                            return
                                    }
    
                                    total += len(res.Rows) //process rows each loop
                            }
                            fmt.Println("total: ", total)
                    }()
            }
            wg.Wait()
    }