すべてのプロダクト
Search
ドキュメントセンター

Tablestore:TableStoreReader を使用した同時データ読み取り

最終更新日:Jul 01, 2025

TableStoreReader は、Tablestore が Java SDK に基づいて実装したシンプルで高性能なデータ読み取りユーティリティクラスです。 高い同時実行性と高いスループットのデータ読み取りのためのインターフェイスをカプセル化し、行レベルのコールバックとカスタム構成機能をサポートしながら、同時データ読み取りを可能にします。 このトピックでは、TableStoreReader を使用して同時データ読み取りを行う方法について説明します。

前提条件

Tablestore へのアクセス権限を持つ Alibaba Cloud アカウントまたは RAM ユーザーの アクセスキー を作成します。

手順

ステップ 1: Tablestore SDK をインストールする

Maven を使用して Java プロジェクトを管理する場合、pom.xml ファイルに次の依存関係を追加できます。

<dependency>
    <groupId>com.aliyun.openservices</groupId>
    <artifactId>tablestore</artifactId>
    <version>5.17.4</version>
</dependency>                 

詳細については、「Tablestore SDK をインストールする」をご参照ください。

ステップ 2: 初期化

TableStoreReader を初期化する前に、Tablestore Client 接続インスタンスを作成する必要があります。 TableStoreReader の構成パラメーターとコールバック関数をカスタマイズできます。 初期化のサンプルコードを以下に示します。

説明

複数のスレッドを使用する場合は、1 つの TableStoreReader オブジェクトを共有することをお勧めします。

public static TableStoreReader createReader() {
        // Tablestore Client を初期化します
        client = new AsyncClient(endpoint, accessKeyId, accessKeySecret, instanceName);

        // TableStoreReader の構成
        TableStoreReaderConfig config = new TableStoreReaderConfig();

        // スレッドプール
        executorService = new ThreadPoolExecutor(4, 4, 0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(1024), new ThreadPoolExecutor.CallerRunsPolicy());

        // コールバック関数
        TableStoreCallback<PrimaryKeyWithTable, RowReadResult> callback = new TableStoreCallback<PrimaryKeyWithTable, RowReadResult>() {
            @Override
            public void onCompleted(PrimaryKeyWithTable primaryKeyWithTable, RowReadResult rowReadResult) {
                succeedRows.incrementAndGet();
                System.out.println(rowReadResult.getRowResult());
            }

            @Override
            public void onFailed(PrimaryKeyWithTable primaryKeyWithTable, Exception e) {
                failedRows.incrementAndGet();
                System.out.println("Failed Rows: " + primaryKeyWithTable.getTableName() + " | " + primaryKeyWithTable.getPrimaryKey() + " | " + e.getMessage()); // 失敗した行
            }
        };

        return new DefaultTableStoreReader(client, config, executorService, callback);
    }

TableStoreReaderConfig パラメーターの説明

名前

タイプ

説明

checkTableMeta

boolean

スキーマチェックを有効にするかどうかを指定します。 デフォルト値は true です。 有効にすると、TableStoreReader はデータがバッファーに書き込まれる前に次のチェックを実行します。

  • データテーブルが存在するかどうか。

  • クエリデータのプライマリキースキーマがテーブルのプライマリキーと同じかどうか。

bufferSize

int

バッファーキューのサイズ。 2 の累乗である必要があります。 デフォルト値は 1024 です。

concurrency

int

バッファーデータを Tablestore に送信する際の同時リクエストの最大数。 デフォルト値は 10 です。

maxBatchRowsCount

int

バッチリクエストで読み取る行の最大数。 デフォルト値は 100 です。 最大値は 100 です。

defaultMaxVersions

int

読み取るデータバージョンの数。 デフォルト値は 1 で、最新のデータバージョンのみが読み取られることを意味します。

flushInterval

int

バッファーデータを Tablestore に自動的に送信する時間間隔。 デフォルト値は 10000 です。 単位はミリ秒です。

logInterval

int

バッファーデータを Tablestore に送信する際のタスクステータスの出力時間間隔。 デフォルト値は 10000 です。 単位はミリ秒です。

bucketCount

int

バケットの数。 各バケットはバッファーに相当します。 デフォルト値は 4 です。

パラメーター設定のサンプルコードは次のとおりです。

// スキーマチェックを有効にするかどうかを設定します
config.setCheckTableMeta(false);
// バッファーキューサイズを設定します
config.setBufferSize(1024);
// 同時実行性を設定します
config.setConcurrency(10);
// バッチリクエストの最大行数を設定します
config.setMaxBatchRowsCount(100);
// 読み取るデータバージョンの数を設定します
config.setDefaultMaxVersions(1);
// フラッシュ間隔を設定します
config.setFlushInterval(10000);
// ログ間隔を設定します
config.setLogInterval(10000);
// バケット数を設定します
config.setBucketCount(4);
  • コールバック関数が不要な場合は、初期化メソッドでコールバック関数パラメーターを null に設定できます。

    return new DefaultTableStoreReader(client, config, executorService, null);

ステップ 3: データをクエリする

  1. TableStoreReader を使用してデータをクエリする前に、行データのプライマリキー情報をバッファーに追加する必要があります。

    PrimaryKey primaryKey = PrimaryKeyBuilder.createPrimaryKeyBuilder()
            .addPrimaryKeyColumn("id", PrimaryKeyValue.fromString("row1"))
            .build();
    tableStoreReader.addPrimaryKey("test_table", primaryKey);
    • クエリ後に行データを取得する必要がある場合は、addPrimaryKeyWithFuture メソッドを使用できます。

      Future<ReaderResult> readerResult = tableStoreReader.addPrimaryKeyWithFuture("test_table", primaryKey);
    • 最大バージョン番号、データバージョン範囲、フィルターなど、クエリパラメーターを指定することもできます。

      RowQueryCriteria rowQueryCriteria = new RowQueryCriteria("test_version");
      // 読み取る最大バージョン数を設定します
      rowQueryCriteria.setMaxVersions(1);
      // 読み取るデータバージョン範囲を設定します
      rowQueryCriteria.setTimeRange(new TimeRange(System.currentTimeMillis() - 86400*1000, System.currentTimeMillis()));
      // 返す属性列を設定します
      rowQueryCriteria.addColumnsToGet("col1");
      // フィルター条件を設定します
      SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter("col1", SingleColumnValueFilter.CompareOperator.EQUAL, ColumnValue.fromString("val1"));
      rowQueryCriteria.setFilter(singleColumnValueFilter);
      // クエリ条件を追加します
      tableStoreReader.setRowQueryCriteria(rowQueryCriteria);
  2. プライマリキー情報がバッファーに追加されると、TableStoreReader は設定された自動送信時間間隔(デフォルトは 10 秒)に従って、バッファー内のデータをクエリのために Tablestore に送信します。 バッファーデータを手動で送信することもできます。

    • 同期送信

      tableStoreReader.flush();
    • 非同期送信

      tableStoreReader.send();

ステップ 4: リソースを無効にする

データクエリの完了後、他に操作が必要ない場合は、業務システムの動作に影響を与えることなくリソースを無効にできます。

tableStoreReader.close();
client.shutdown();
executorService.shutdown();

完全なサンプルコード

次のサンプルコードでは、TableStoreReader を使用して test_table テーブルの 200 行のデータを同時にクエリし、コールバック関数でクエリ結果を出力します。

public class TableStoreReaderExample {
    private static final String endpoint = "https://n01k********.cn-hangzhou.ots.aliyuncs.com";
    private static final String instanceName = "n01k********";
    private static final String accessKeyId = System.getenv("TABLESTORE_ACCESS_KEY_ID");
    private static final String accessKeySecret = System.getenv("TABLESTORE_ACCESS_KEY_SECRET");
    private static AsyncClientInterface client;
    private static ExecutorService executorService;
    private static AtomicLong succeedRows = new AtomicLong();
    private static AtomicLong failedRows = new AtomicLong();

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        // TableStoreReader を作成します
        TableStoreReader tableStoreReader = createReader();

        // クエリデータのプライマリキーを追加します
        for(int i=0; i<200; i++) {
            PrimaryKey primaryKey = PrimaryKeyBuilder.createPrimaryKeyBuilder()
                    .addPrimaryKeyColumn("id", PrimaryKeyValue.fromString("row" + i))
                    .build();
            tableStoreReader.addPrimaryKey("test_table", primaryKey);
        }

        // バッファー内のデータを送信します
        tableStoreReader.flush();

        // コールバック関数が完了するまで待機します
        Thread.sleep(1000L);

        System.out.println("Succeed Rows Count: " + succeedRows.get()); // 成功した行数
        System.out.println("Failed Rows Count: " + failedRows.get()); // 失敗した行数

        // リソースを無効にします
        tableStoreReader.close();
        client.shutdown();
        executorService.shutdown();
    }

    public static TableStoreReader createReader() {
        // Tablestore Client を初期化します
        client = new AsyncClient(endpoint, accessKeyId, accessKeySecret, instanceName);

        // TableStoreReader パラメーター構成
        TableStoreReaderConfig config = new TableStoreReaderConfig();

        // スレッドプール
        executorService = new ThreadPoolExecutor(4, 4, 0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(1024), new ThreadPoolExecutor.CallerRunsPolicy());

        // コールバック関数
        TableStoreCallback<PrimaryKeyWithTable, RowReadResult> callback = new TableStoreCallback<PrimaryKeyWithTable, RowReadResult>() {
            @Override
            public void onCompleted(PrimaryKeyWithTable primaryKeyWithTable, RowReadResult rowReadResult) {
                succeedRows.incrementAndGet();
                System.out.println(rowReadResult.getRowResult());
            }

            @Override
            public void onFailed(PrimaryKeyWithTable primaryKeyWithTable, Exception e) {
                failedRows.incrementAndGet();
                System.out.println("Failed Rows: " + primaryKeyWithTable.getTableName() + " | " + primaryKeyWithTable.getPrimaryKey() + " | " + e.getMessage()); // 失敗した行
            }
        };

        return new DefaultTableStoreReader(client, config, executorService, callback);
    }
}