當使用情境中不關心整個結果集的順序時,可以使用並發匯出資料(ParallelScan)功能以更快的速度將匹配的資料全部返回。
前提條件
參數
參數 | 說明 | |
---|---|---|
TableName | 資料表名稱。 | |
IndexName | 多元索引名稱。 | |
ScanQuery | Query | 多元索引的查詢語句。支援精確查詢、模糊查詢、範圍查詢、地理位置查詢、巢狀查詢等,功能和Search介面一致。 |
Limit | 掃描資料時一次能返回的資料行數。 | |
MaxParallel | 最大並發數。請求支援的最大並發數由使用者資料量決定。資料量越大,支援的並發數越多,每次任務前可以通過ComputeSplits API進行擷取。 | |
CurrentParallelID | 當前並發ID。取值範圍為[0, MaxParallel)。 | |
Token | 用於翻頁功能。ParallelScan請求結果中有下一次進行翻頁的Token,使用該Token可以接著上一次的結果繼續讀取資料。 | |
AliveTime | ParallelScan的當前任務有效時間,也是Token的有效時間。預設值為60,建議使用預設值,單位為秒。如果在有效時間內沒有發起下一次請求,則不能繼續讀取資料。持續發起請求會重新整理Token有效時間。 說明 由於服務端採用非同步方式清理到期任務,因此當前任務只保證在設定的有效時間內不會到期,但不能保證有效時間之後一定到期。 | |
ColumnsToGet | ParallelScan目前僅可以掃描多元索引中的資料,需要在建立多元索引時設定附加儲存(即Store=true)。 | |
SessionId | 本次並發掃描資料任務的SessionId。您可以通過ComputeSplits API建立Session,同時獲得本次任務支援的最大並發數。 |
樣本
單並發掃描資料和多線程並發掃描資料的程式碼範例如下:
單並發掃描資料
/** * ParallelScan單並發掃描資料。 */ func ParallelScanSingleConcurrency(client *tablestore.TableStoreClient, tableName string, indexName string) { computeSplitsResp, err := computeSplits(client, tableName, indexName) if err != nil { fmt.Printf("%#v", err) return } query := search.NewScanQuery().SetQuery(&search.MatchAllQuery{}).SetLimit(2) req := &tablestore.ParallelScanRequest{} req.SetTableName(tableName). SetIndexName(indexName). SetColumnsToGet(&tablestore.ColumnsToGet{ReturnAllFromIndex: false}). SetScanQuery(query). SetSessionId(computeSplitsResp.SessionId) res, err := client.ParallelScan(req) if err != nil { fmt.Printf("%#v", err) return } total := len(res.Rows) for res.NextToken != nil { req.SetScanQuery(query.SetToken(res.NextToken)) res, err = client.ParallelScan(req) if err != nil { fmt.Printf("%#v", err) return } total += len(res.Rows) //process rows each loop } fmt.Println("total: ", total) }
多線程並發掃描資料
/** * ParallelScan多並發掃描資料。 */ func ParallelScanMultiConcurrency(client *tablestore.TableStoreClient, tableName string, indexName string) { computeSplitsResp, err := computeSplits(client, tableName, indexName) if err != nil { fmt.Printf("%#v", err) return } var wg sync.WaitGroup wg.Add(int(computeSplitsResp.SplitsSize)) for i := int32(0); i < computeSplitsResp.SplitsSize; i++ { current := i go func() { defer wg.Done() query := search.NewScanQuery(). SetQuery(&search.MatchAllQuery{}). SetCurrentParallelID(current). SetMaxParallel(computeSplitsResp.SplitsSize). SetLimit(2) req := &tablestore.ParallelScanRequest{} req.SetTableName(tableName). SetIndexName(indexName). SetColumnsToGet(&tablestore.ColumnsToGet{ReturnAllFromIndex: false}). SetScanQuery(query). SetSessionId(computeSplitsResp.SessionId) res, err := client.ParallelScan(req) if err != nil { fmt.Printf("%#v", err) return } total := len(res.Rows) for res.NextToken != nil { req.SetScanQuery(query.SetToken(res.NextToken)) res, err = client.ParallelScan(req) if err != nil { fmt.Printf("%#v", err) return } total += len(res.Rows) //process rows each loop } fmt.Println("total: ", total) }() } wg.Wait() }