當使用情境中不關心整個結果集的順序時,可以使用並發匯出資料功能以更快的速度將命中的資料全部返回。
背景
多元索引中提供了Search介面,Search介面支援全功能集,包括所有的查詢功能,以及排序、統計彙總等分析能力,其結果會按照指定的順序返回。
ParallelScan介面相對於Search介面,保留了所有的查詢功能,但是捨棄了排序、統計彙總等分析功能,帶來了5倍以上的效能提升,因此可以實現1分鐘內上億層級資料行的匯出能力,匯出能力可以水平擴充,不存在上限。
在介面的實現中,單次請求的limit限制更寬鬆。目前Search介面的limit最大允許100行,但是ParallelScan介面的limit最大允許2000行。同時支援多個線程一起並發請求,因此匯出速度會極快。
情境
- 如果請求關心排序、統計彙總,或者是終端客戶的直接查詢請求,需要用Search介面。
- 如果請求不關心排序,只關心把所有合格資料儘快返回,或者是計算系統(Spark、Presto等)拉取資料,可以考慮ParallelScan介面。
特點
ParallelScan介面相對於Search介面的典型特徵如下:
- 結果穩定性
ParallelScan任務是有狀態的。在一個Session請求中,擷取到的結果集是確定的,由發起第一次請求時的資料狀態決定。如果發起第一次請求後插入了新的資料或者修改了原有的資料不會對結果集造成影響。
- 新增會話(Session)概念
在ParallelScan系列介面中新增了Session概念。使用sessionId能夠保證擷取到的結果集是穩定的,具體流程如下:
- 通過ComputeSplits介面擷取最大並發數和當前sessionId。
- 通過發起多個並發ParallelScan請求讀取資料,請求中需要指定當前的sessionId和當前並發ID。
在某些不易擷取sessionId的情境中,ParallelScan也支援不攜帶sessionId發起請求,但是不使用sessionId可能會有極小的機率導致擷取到的結果集中有重複資料。
如果在ParallelScan過程中發生網路異常、線程異常、動態Schema修改、索引切換等情況,導致ParallelScan不能繼續掃描資料,服務端會返回“OTSSessionExpired”錯誤碼,此時需要重新開始一個新的ParallelScan任務,從最開始重新拉取資料。
- 最大並發數
ParallelScan支援的單請求的最大並發數由ComputeSplits的傳回值確定。資料越多,支援的並發數越大。
單請求指同一個查詢語句,例如查詢city=“杭州”的結果,如果使用Search介面,那麼Search請求的傳回值中會包括所有city=“杭州”的結果;如果使用ParallelScan介面且並發數是2,那麼每個ParallelScan請求返回50%的結果,然後將兩個並發的結果集合并在一起才是完整的結果集。
- 每次返回行數
limit預設為2000,最大可以到2000。超過2000後,limit的變化對效能基本無影響。
- 效能
ParallelScan介面單並發掃描資料的效能是Search介面的5倍。當增大並發數時,效能可以繼續線性提高,例如8並發時仍可以繼續提高4倍效能。
- 成本
由於ParallelScan請求對資源的消耗更少,價格會更便宜,所以對於巨量資料量的匯出類需求,強烈建議使用ParallelScan介面。
- 返回列
只支援返回多元索引中已建立索引的列,不能返回多元索引中沒有的列,可以支援的ReturnType是RETURN_ALL_INDEX或者RETURN_SPECIFIED,不支援RETURN_ALL。
目前已支援返回數組和地理位置,但是返回的欄位值會被格式化,可能和寫入資料表的值不一致。例如對於數群組類型,寫入資料表的值為"[1,2, 3, 4]",則通過ParallelScan介面匯出的值為"[1,2,3,4]";對於地理位置類型,寫入資料表的值為
10,50
,則通過ParallelScan介面匯出的值為10.0,50.0
。 - 限制項
同時存在的ParallelScan任務數量有一定的限制,當前為10個,後續會根據客戶需求繼續調整。同一個sessionId且ScanQuery相同的多個並發任務視為一個任務。一個ParallelScan任務的生命週期定義為:開始時間是第一次發出ParallelScan請求,結束時間是翻頁掃描完所有資料或者請求的token失效。
介面
- ComputeSplits:擷取當前ParallelScan單個請求的最大並發數。
- ParallelScan:執行具體的資料匯出功能。
使用
您可以使用如下語言的SDK並發匯出資料。
參數
參數 | 說明 | |
---|---|---|
tableName | 資料表名稱。 | |
indexName | 多元索引名稱。 | |
scanQuery | query | 多元索引的查詢語句。支援精確查詢、模糊查詢、範圍查詢、地理位置查詢、巢狀查詢等,功能和Search介面一致。 |
limit | 掃描資料時一次能返回的資料行數。 | |
maxParallel | 最大並發數。請求支援的最大並發數由使用者資料量決定。資料量越大,支援的並發數越多,每次任務前可以通過ComputeSplits API進行擷取。 | |
currentParallelId | 當前並發ID。取值範圍為[0, maxParallel)。 | |
token | 用於翻頁功能。ParallelScan請求結果中有下一次進行翻頁的token,使用該token可以接著上一次的結果繼續讀取資料。 | |
aliveTime | ParallelScan的當前任務有效時間,也是token的有效時間。預設值為60,建議使用預設值,單位為秒。如果在有效時間內沒有發起下一次請求,則不能繼續讀取資料。持續發起請求會重新整理token有效時間。
说明 由於服務端採用非同步方式清理到期任務,因此當前任務只保證在設定的有效時間內不會到期,但不能保證有效時間之後一定到期。
|
|
columnsToGet | ParallelScan目前僅可以掃描多元索引中的資料,需要在建立多元索引時設定附加儲存(即store=true)。 | |
sessionId | 本次並發掃描資料任務的sessionId。建立Session可以通過ComputeSplits API來建立,同時獲得本次任務支援的最大並發數。 |
樣本
單並發掃描資料和多線程並發掃描資料的程式碼範例如下:
- 單並發掃描資料
相對於多並發掃描資料,單並發掃描資料的代碼更簡單,單並發代碼無需關心currentParallelId和maxParallel參數。單並發使用方式的整體吞吐比Search介面方式高,但是比多線程多並發使用方式的吞吐低,多線程多並發方式請參見最下方的“多線程並發掃描資料”範例程式碼。
import java.util.ArrayList; import java.util.Arrays; import java.util.List; import com.alicloud.openservices.tablestore.SyncClient; import com.alicloud.openservices.tablestore.model.ComputeSplitsRequest; import com.alicloud.openservices.tablestore.model.ComputeSplitsResponse; import com.alicloud.openservices.tablestore.model.Row; import com.alicloud.openservices.tablestore.model.SearchIndexSplitsOptions; import com.alicloud.openservices.tablestore.model.iterator.RowIterator; import com.alicloud.openservices.tablestore.model.search.ParallelScanRequest; import com.alicloud.openservices.tablestore.model.search.ParallelScanResponse; import com.alicloud.openservices.tablestore.model.search.ScanQuery; import com.alicloud.openservices.tablestore.model.search.SearchRequest.ColumnsToGet; import com.alicloud.openservices.tablestore.model.search.query.MatchAllQuery; import com.alicloud.openservices.tablestore.model.search.query.Query; import com.alicloud.openservices.tablestore.model.search.query.QueryBuilders; public class Test { public static List<Row> scanQuery(final SyncClient client) { String tableName = "<TableName>"; String indexName = "<IndexName>"; //擷取sessionId和本次請求支援的最大並發數。 ComputeSplitsRequest computeSplitsRequest = new ComputeSplitsRequest(); computeSplitsRequest.setTableName(tableName); computeSplitsRequest.setSplitsOptions(new SearchIndexSplitsOptions(indexName)); ComputeSplitsResponse computeSplitsResponse = client.computeSplits(computeSplitsRequest); byte[] sessionId = computeSplitsResponse.getSessionId(); int splitsSize = computeSplitsResponse.getSplitsSize(); /* * 建立並發掃描資料請求。 */ ParallelScanRequest parallelScanRequest = new ParallelScanRequest(); parallelScanRequest.setTableName(tableName); parallelScanRequest.setIndexName(indexName); ScanQuery scanQuery = new ScanQuery(); //該query決定了掃描出的資料範圍,可用於構建嵌套的複雜的query。 Query query = new MatchAllQuery(); scanQuery.setQuery(query); //設定單次請求返回的資料行數。 scanQuery.setLimit(2000); parallelScanRequest.setScanQuery(scanQuery); ColumnsToGet columnsToGet = new ColumnsToGet(); columnsToGet.setColumns(Arrays.asList("col_1", "col_2")); parallelScanRequest.setColumnsToGet(columnsToGet); parallelScanRequest.setSessionId(sessionId); /* * 使用builder模式建立並發掃描資料請求,功能與前面一致。 */ ParallelScanRequest parallelScanRequestByBuilder = ParallelScanRequest.newBuilder() .tableName(tableName) .indexName(indexName) .scanQuery(ScanQuery.newBuilder() .query(QueryBuilders.matchAll()) .limit(2000) .build()) .addColumnsToGet("col_1", "col_2") .sessionId(sessionId) .build(); List<Row> result = new ArrayList<>(); /* * 使用原生的API掃描資料。 */ { ParallelScanResponse parallelScanResponse = client.parallelScan(parallelScanRequest); //下次請求的ScanQuery的token。 byte[] nextToken = parallelScanResponse.getNextToken(); //擷取資料。 List<Row> rows = parallelScanResponse.getRows(); result.addAll(rows); while (nextToken != null) { //設定token。 parallelScanRequest.getScanQuery().setToken(nextToken); //繼續掃描資料。 parallelScanResponse = client.parallelScan(parallelScanRequest); //擷取資料。 rows = parallelScanResponse.getRows(); result.addAll(rows); nextToken = parallelScanResponse.getNextToken(); } } /* * 推薦方式。 * 使用iterator方式掃描所有匹配資料。使用方式上更簡單,速度和前面方法一致。 */ { RowIterator iterator = client.createParallelScanIterator(parallelScanRequestByBuilder); while (iterator.hasNext()) { Row row = iterator.next(); result.add(row); //擷取具體的值。 String col_1 = row.getLatestColumn("col_1").getValue().asString(); long col_2 = row.getLatestColumn("col_2").getValue().asLong(); } } /* * 關於失敗重試的問題,如果本函數的外部調用者有重試機制或者不需要考慮失敗重試問題,可以忽略此部分內容。 * 為了保證可用性,遇到任何異常均推薦進行任務層級的重試,重新開始一個新的ParallelScan任務。 * 異常分為如下兩種: * 1、服務端Session異常OTSSessionExpired。 * 2、調用者用戶端網路等異常。 */ try { //正常處理邏輯。 { RowIterator iterator = client.createParallelScanIterator(parallelScanRequestByBuilder); while (iterator.hasNext()) { Row row = iterator.next(); //處理row,記憶體足夠大時可直接放到list中。 result.add(row); } } } catch (Exception ex) { //重試。 { result.clear(); RowIterator iterator = client.createParallelScanIterator(parallelScanRequestByBuilder); while (iterator.hasNext()) { Row row = iterator.next(); //處理row,記憶體足夠大時可直接放到list中。 result.add(row); } } } return result; } }
- 多線程並發掃描資料
import com.alicloud.openservices.tablestore.SyncClient; import com.alicloud.openservices.tablestore.model.ComputeSplitsRequest; import com.alicloud.openservices.tablestore.model.ComputeSplitsResponse; import com.alicloud.openservices.tablestore.model.Row; import com.alicloud.openservices.tablestore.model.SearchIndexSplitsOptions; import com.alicloud.openservices.tablestore.model.iterator.RowIterator; import com.alicloud.openservices.tablestore.model.search.ParallelScanRequest; import com.alicloud.openservices.tablestore.model.search.ScanQuery; import com.alicloud.openservices.tablestore.model.search.query.QueryBuilders; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicLong; public class Test { public static void scanQueryWithMultiThread(final SyncClient client, String tableName, String indexName) throws InterruptedException { // 擷取機器的CPU數量。 final int cpuProcessors = Runtime.getRuntime().availableProcessors(); // 指定用戶端多線程的並發數量。建議和用戶端的CPU核心數一致,避免用戶端壓力太大,影響查詢效能。 final Semaphore semaphore = new Semaphore(cpuProcessors); // 擷取sessionId和本次請求支援的最大並發數。 ComputeSplitsRequest computeSplitsRequest = new ComputeSplitsRequest(); computeSplitsRequest.setTableName(tableName); computeSplitsRequest.setSplitsOptions(new SearchIndexSplitsOptions(indexName)); ComputeSplitsResponse computeSplitsResponse = client.computeSplits(computeSplitsRequest); final byte[] sessionId = computeSplitsResponse.getSessionId(); final int maxParallel = computeSplitsResponse.getSplitsSize(); // 業務統計行數使用。 AtomicLong rowCount = new AtomicLong(0); /* * 為了使用一個函數實現多線程功能,此處構建一個內部類繼承Thread來使用多線程。 * 您也可以構建一個正常的外部類,使代碼更有條理。 */ final class ThreadForScanQuery extends Thread { private final int currentParallelId; private ThreadForScanQuery(int currentParallelId) { this.currentParallelId = currentParallelId; this.setName("ThreadForScanQuery:" + maxParallel + "-" + currentParallelId); // 設定線程名稱。 } @Override public void run() { System.out.println("start thread:" + this.getName()); try { // 正常處理邏輯。 { ParallelScanRequest parallelScanRequest = ParallelScanRequest.newBuilder() .tableName(tableName) .indexName(indexName) .scanQuery(ScanQuery.newBuilder() .query(QueryBuilders.range("col_long").lessThan(10_0000)) // 此處的query決定了擷取什麼資料。 .limit(2000) .currentParallelId(currentParallelId) .maxParallel(maxParallel) .build()) .addColumnsToGet("col_long", "col_keyword", "col_bool") // 設定要返回的多元索引中的部分欄位,或者使用下行注釋的內容擷取多元索引中全部資料。 //.returnAllColumnsFromIndex(true) .sessionId(sessionId) .build(); // 使用Iterator形式擷取所有資料。 RowIterator ltr = client.createParallelScanIterator(parallelScanRequest); long count = 0; while (ltr.hasNext()) { Row row = ltr.next(); // 增加自訂的處理邏輯,此處代碼以統計行數為例介紹。 count++; } rowCount.addAndGet(count); System.out.println("thread[" + this.getName() + "] finished. this thread get rows:" + count); } } catch (Exception ex) { // 如果有異常,此處需要考慮重試正常處理邏輯。 } finally { semaphore.release(); } } } // 多個線程同時執行,currentParallelId取值範圍為[0, maxParallel)。 List<ThreadForScanQuery> threadList = new ArrayList<ThreadForScanQuery>(); for (int currentParallelId = 0; currentParallelId < maxParallel; currentParallelId++) { ThreadForScanQuery thread = new ThreadForScanQuery(currentParallelId); threadList.add(thread); } // 同時啟動。 for (ThreadForScanQuery thread : threadList) { // 利用semaphore限制同時啟動的線程數量,避免用戶端瓶頸。 semaphore.acquire(); thread.start(); } // 主線程阻塞等待所有線程完成任務。 for (ThreadForScanQuery thread : threadList) { thread.join(); } System.out.println("all thread finished! total rows:" + rowCount.get()); } }