全部產品
Search
文件中心

Tablestore:使用 TableStoreReader 並發讀取資料

更新時間:Jun 27, 2025

TableStoreReader 是Table Store基於 Java SDK 實現的簡單易用、高效能的資料讀取工具類,它封裝了用於高並發、高吞吐率資料讀取的介面,可以實現資料的並發讀取,同時支援行層級回調以及自訂配置功能。本文介紹如何使用 TableStoreReader 並發讀取資料。

前提條件

為阿里雲帳號或具有Table Store存取權限的 RAM 使用者建立AccessKey

操作步驟

步驟一:安裝 Tablestore SDK

如果您使用的是Maven專案,請在專案的pom.xml檔案中添加如下依賴:

<dependency>
    <groupId>com.aliyun.openservices</groupId>
    <artifactId>tablestore</artifactId>
    <version>5.17.4</version>
</dependency>                 

更多關於安裝Tablestore SDK的資訊,請參見安裝Tablestore SDK

步驟二:初始化

初始化 TableStoreReader 前,需要先建立Table Store Client 串連執行個體,您可以自訂TableStoreReader 的配置參數和回呼函數,初始化範例程式碼如下所示。

說明

使用多線程時,建議共用一個 TableStoreReader 對象。

public static TableStoreReader createReader() {
        // 初始化Table Store Client
        client = new AsyncClient(endpoint, accessKeyId, accessKeySecret, instanceName);

        // TableStoreReader 配置
        TableStoreReaderConfig config = new TableStoreReaderConfig();

        // 線程池
        executorService = new ThreadPoolExecutor(4, 4, 0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(1024), new ThreadPoolExecutor.CallerRunsPolicy());

        // 回呼函數
        TableStoreCallback<PrimaryKeyWithTable, RowReadResult> callback = new TableStoreCallback<PrimaryKeyWithTable, RowReadResult>() {
            @Override
            public void onCompleted(PrimaryKeyWithTable primaryKeyWithTable, RowReadResult rowReadResult) {
                succeedRows.incrementAndGet();
                System.out.println(rowReadResult.getRowResult());
            }

            @Override
            public void onFailed(PrimaryKeyWithTable primaryKeyWithTable, Exception e) {
                failedRows.incrementAndGet();
                System.out.println("Failed Rows: " + primaryKeyWithTable.getTableName() + " | " + primaryKeyWithTable.getPrimaryKey() + " | " + e.getMessage());
            }
        };

        return new DefaultTableStoreReader(client, config, executorService, callback);
    }

TableStoreReaderConfig 參數說明

名稱

類型

說明

checkTableMeta

boolean

是否開啟 Schema 檢查,預設值為 true。開啟後,資料寫入緩衝區之前,TableStoreReader將會進行以下檢查。

  • 資料表是否存在。

  • 查詢資料的主鍵 Schema 是否與表的主鍵相同。

bufferSize

int

緩衝區隊列大小,必須為 2 的指數倍,預設值為 1024。

concurrency

int

將緩衝區資料發送到Table Store時的最大請求並發數,預設值為 10。

maxBatchRowsCount

int

一次批量請求讀取的最大行數,預設值為 100,最大值為 100。

defaultMaxVersions

int

讀取的資料版本數量,預設值為 1,即唯讀取最新版本的資料。

flushInterval

int

自動發送緩衝區資料到Table Store的時間間隔,預設值為 10000,單位為毫秒。

logInterval

int

發送緩衝區資料到Table Store時,列印任務狀態的時間間隔,預設值為 10000,單位為毫秒。

bucketCount

int

桶數量,每個桶相當於一個緩衝區,預設值為 4。

參數設定範例程式碼如下。

// 設定是否開啟 Schema 檢查
config.setCheckTableMeta(false);
// 設定緩衝區隊列大小
config.setBufferSize(1024);
// 設定並發數
config.setConcurrency(10);
// 設定批量請求最大行數
config.setMaxBatchRowsCount(100);
// 設定讀取資料版本數量
config.setDefaultMaxVersions(1);
// 設定 flush 時間間隔
config.setFlushInterval(10000);
// 設定 log 時間間隔
config.setLogInterval(10000);
// 設定 bucket 數量
config.setBucketCount(4);
  • 如果不需要回呼函數,可以在初始化方法中將回呼函數參數設定為 null。

    return new DefaultTableStoreReader(client, config, executorService, null);

步驟三:查詢資料

  1. 使用 TableStoreReader 查詢資料前,您需要將行資料的主鍵資訊添加到緩衝區中。

    PrimaryKey primaryKey = PrimaryKeyBuilder.createPrimaryKeyBuilder()
            .addPrimaryKeyColumn("id", PrimaryKeyValue.fromString("row1"))
            .build();
    tableStoreReader.addPrimaryKey("test_table", primaryKey);
    • 如果需要擷取查詢後的行資料,您可以使用 addPrimaryKeyWithFuture 方法。

      Future<ReaderResult> readerResult = tableStoreReader.addPrimaryKeyWithFuture("test_table", primaryKey);
    • 您也可以指定查詢參數,例如最大版本數、資料版本範圍、過濾器等。

      RowQueryCriteria rowQueryCriteria = new RowQueryCriteria("test_version");
      // 設定最大讀取版本數
      rowQueryCriteria.setMaxVersions(1);
      // 設定讀取資料版本範圍
      rowQueryCriteria.setTimeRange(new TimeRange(System.currentTimeMillis() - 86400*1000, System.currentTimeMillis()));
      // 設定返回的屬性列
      rowQueryCriteria.addColumnsToGet("col1");
      // 設定過濾條件
      SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter("col1", SingleColumnValueFilter.CompareOperator.EQUAL, ColumnValue.fromString("val1"));
      rowQueryCriteria.setFilter(singleColumnValueFilter);
      // 添加查詢條件
      tableStoreReader.setRowQueryCriteria(rowQueryCriteria);
  2. 主鍵資訊添加到緩衝區中後,TableStoreReader 按照設定的自動發送時間間隔(預設為 10 秒)將緩衝區中的資料發送到Table Store進行查詢,您也可以手動發送緩衝區資料。

    • 同步發送

      tableStoreReader.flush();
    • 非同步發送

      tableStoreReader.send();

步驟四:關閉資源

資料查詢完成後,如果不需要進行其它操作,在不影響業務系統啟動並執行情況下,您可以關閉資源。

tableStoreReader.close();
client.shutdown();
executorService.shutdown();

完整範例程式碼

以下範例程式碼使用 TableStoreReader 並發查詢 test_table 表中的 200 行資料,並在回呼函數中列印查詢結果。

public class TableStoreReaderExample {
    private static final String endpoint = "https://n01k********.cn-hangzhou.ots.aliyuncs.com";
    private static final String instanceName = "n01k********";
    private static final String accessKeyId = System.getenv("TABLESTORE_ACCESS_KEY_ID");
    private static final String accessKeySecret = System.getenv("TABLESTORE_ACCESS_KEY_SECRET");
    private static AsyncClientInterface client;
    private static ExecutorService executorService;
    private static AtomicLong succeedRows = new AtomicLong();
    private static AtomicLong failedRows = new AtomicLong();

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        // 建立 TableStoreReader
        TableStoreReader tableStoreReader = createReader();

        // 添加查詢資料主鍵
        for(int i=0; i<200; i++) {
            PrimaryKey primaryKey = PrimaryKeyBuilder.createPrimaryKeyBuilder()
                    .addPrimaryKeyColumn("id", PrimaryKeyValue.fromString("row" + i))
                    .build();
            tableStoreReader.addPrimaryKey("test_table", primaryKey);
        }

        // 發送緩衝區中的資料
        tableStoreReader.flush();

        // 等待回呼函數執行完成
        Thread.sleep(1000L);

        System.out.println("Succeed Rows Count: " + succeedRows.get());
        System.out.println("Failed Rows Count: " + failedRows.get());

        // 關閉資源
        tableStoreReader.close();
        client.shutdown();
        executorService.shutdown();
    }

    public static TableStoreReader createReader() {
        // 初始化Table Store Client
        client = new AsyncClient(endpoint, accessKeyId, accessKeySecret, instanceName);

        // TableStoreReader 參數配置
        TableStoreReaderConfig config = new TableStoreReaderConfig();

        // 線程池
        executorService = new ThreadPoolExecutor(4, 4, 0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(1024), new ThreadPoolExecutor.CallerRunsPolicy());

        // 回呼函數
        TableStoreCallback<PrimaryKeyWithTable, RowReadResult> callback = new TableStoreCallback<PrimaryKeyWithTable, RowReadResult>() {
            @Override
            public void onCompleted(PrimaryKeyWithTable primaryKeyWithTable, RowReadResult rowReadResult) {
                succeedRows.incrementAndGet();
                System.out.println(rowReadResult.getRowResult());
            }

            @Override
            public void onFailed(PrimaryKeyWithTable primaryKeyWithTable, Exception e) {
                failedRows.incrementAndGet();
                System.out.println("Failed Rows: " + primaryKeyWithTable.getTableName() + " | " + primaryKeyWithTable.getPrimaryKey() + " | " + e.getMessage());
            }
        };

        return new DefaultTableStoreReader(client, config, executorService, callback);
    }
}