TableStoreReader 是Table Store基於 Java SDK 實現的簡單易用、高效能的資料讀取工具類,它封裝了用於高並發、高吞吐率資料讀取的介面,可以實現資料的並發讀取,同時支援行層級回調以及自訂配置功能。本文介紹如何使用 TableStoreReader 並發讀取資料。
前提條件
為阿里雲帳號或具有Table Store存取權限的 RAM 使用者建立AccessKey。
操作步驟
步驟一:安裝 Tablestore SDK
如果您使用的是Maven專案,請在專案的pom.xml檔案中添加如下依賴:
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>tablestore</artifactId>
<version>5.17.4</version>
</dependency> 更多關於安裝Tablestore SDK的資訊,請參見安裝Tablestore SDK。
步驟二:初始化
初始化 TableStoreReader 前,需要先建立Table Store Client 串連執行個體,您可以自訂TableStoreReader 的配置參數和回呼函數,初始化範例程式碼如下所示。
使用多線程時,建議共用一個 TableStoreReader 對象。
public static TableStoreReader createReader() {
// 初始化Table Store Client
client = new AsyncClient(endpoint, accessKeyId, accessKeySecret, instanceName);
// TableStoreReader 配置
TableStoreReaderConfig config = new TableStoreReaderConfig();
// 線程池
executorService = new ThreadPoolExecutor(4, 4, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(1024), new ThreadPoolExecutor.CallerRunsPolicy());
// 回呼函數
TableStoreCallback<PrimaryKeyWithTable, RowReadResult> callback = new TableStoreCallback<PrimaryKeyWithTable, RowReadResult>() {
@Override
public void onCompleted(PrimaryKeyWithTable primaryKeyWithTable, RowReadResult rowReadResult) {
succeedRows.incrementAndGet();
System.out.println(rowReadResult.getRowResult());
}
@Override
public void onFailed(PrimaryKeyWithTable primaryKeyWithTable, Exception e) {
failedRows.incrementAndGet();
System.out.println("Failed Rows: " + primaryKeyWithTable.getTableName() + " | " + primaryKeyWithTable.getPrimaryKey() + " | " + e.getMessage());
}
};
return new DefaultTableStoreReader(client, config, executorService, callback);
}如果不需要回呼函數,可以在初始化方法中將回呼函數參數設定為 null。
return new DefaultTableStoreReader(client, config, executorService, null);
步驟三:查詢資料
使用 TableStoreReader 查詢資料前,您需要將行資料的主鍵資訊添加到緩衝區中。
PrimaryKey primaryKey = PrimaryKeyBuilder.createPrimaryKeyBuilder() .addPrimaryKeyColumn("id", PrimaryKeyValue.fromString("row1")) .build(); tableStoreReader.addPrimaryKey("test_table", primaryKey);如果需要擷取查詢後的行資料,您可以使用
addPrimaryKeyWithFuture方法。Future<ReaderResult> readerResult = tableStoreReader.addPrimaryKeyWithFuture("test_table", primaryKey);您也可以指定查詢參數,例如最大版本數、資料版本範圍、過濾器等。
RowQueryCriteria rowQueryCriteria = new RowQueryCriteria("test_version"); // 設定最大讀取版本數 rowQueryCriteria.setMaxVersions(1); // 設定讀取資料版本範圍 rowQueryCriteria.setTimeRange(new TimeRange(System.currentTimeMillis() - 86400*1000, System.currentTimeMillis())); // 設定返回的屬性列 rowQueryCriteria.addColumnsToGet("col1"); // 設定過濾條件 SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter("col1", SingleColumnValueFilter.CompareOperator.EQUAL, ColumnValue.fromString("val1")); rowQueryCriteria.setFilter(singleColumnValueFilter); // 添加查詢條件 tableStoreReader.setRowQueryCriteria(rowQueryCriteria);
主鍵資訊添加到緩衝區中後,TableStoreReader 按照設定的自動發送時間間隔(預設為 10 秒)將緩衝區中的資料發送到Table Store進行查詢,您也可以手動發送緩衝區資料。
同步發送
tableStoreReader.flush();非同步發送
tableStoreReader.send();
步驟四:關閉資源
資料查詢完成後,如果不需要進行其它操作,在不影響業務系統啟動並執行情況下,您可以關閉資源。
tableStoreReader.close();
client.shutdown();
executorService.shutdown();完整範例程式碼
以下範例程式碼使用 TableStoreReader 並發查詢 test_table 表中的 200 行資料,並在回呼函數中列印查詢結果。
public class TableStoreReaderExample {
private static final String endpoint = "https://n01k********.cn-hangzhou.ots.aliyuncs.com";
private static final String instanceName = "n01k********";
private static final String accessKeyId = System.getenv("TABLESTORE_ACCESS_KEY_ID");
private static final String accessKeySecret = System.getenv("TABLESTORE_ACCESS_KEY_SECRET");
private static AsyncClientInterface client;
private static ExecutorService executorService;
private static AtomicLong succeedRows = new AtomicLong();
private static AtomicLong failedRows = new AtomicLong();
public static void main(String[] args) throws InterruptedException, ExecutionException {
// 建立 TableStoreReader
TableStoreReader tableStoreReader = createReader();
// 添加查詢資料主鍵
for(int i=0; i<200; i++) {
PrimaryKey primaryKey = PrimaryKeyBuilder.createPrimaryKeyBuilder()
.addPrimaryKeyColumn("id", PrimaryKeyValue.fromString("row" + i))
.build();
tableStoreReader.addPrimaryKey("test_table", primaryKey);
}
// 發送緩衝區中的資料
tableStoreReader.flush();
// 等待回呼函數執行完成
Thread.sleep(1000L);
System.out.println("Succeed Rows Count: " + succeedRows.get());
System.out.println("Failed Rows Count: " + failedRows.get());
// 關閉資源
tableStoreReader.close();
client.shutdown();
executorService.shutdown();
}
public static TableStoreReader createReader() {
// 初始化Table Store Client
client = new AsyncClient(endpoint, accessKeyId, accessKeySecret, instanceName);
// TableStoreReader 參數配置
TableStoreReaderConfig config = new TableStoreReaderConfig();
// 線程池
executorService = new ThreadPoolExecutor(4, 4, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(1024), new ThreadPoolExecutor.CallerRunsPolicy());
// 回呼函數
TableStoreCallback<PrimaryKeyWithTable, RowReadResult> callback = new TableStoreCallback<PrimaryKeyWithTable, RowReadResult>() {
@Override
public void onCompleted(PrimaryKeyWithTable primaryKeyWithTable, RowReadResult rowReadResult) {
succeedRows.incrementAndGet();
System.out.println(rowReadResult.getRowResult());
}
@Override
public void onFailed(PrimaryKeyWithTable primaryKeyWithTable, Exception e) {
failedRows.incrementAndGet();
System.out.println("Failed Rows: " + primaryKeyWithTable.getTableName() + " | " + primaryKeyWithTable.getPrimaryKey() + " | " + e.getMessage());
}
};
return new DefaultTableStoreReader(client, config, executorService, callback);
}
}