TableStoreReader は、Tablestore が Java SDK に基づいて実装したシンプルで高性能なデータ読み取りユーティリティクラスです。 高い同時実行性と高いスループットのデータ読み取りのためのインターフェイスをカプセル化し、行レベルのコールバックとカスタム構成機能をサポートしながら、同時データ読み取りを可能にします。 このトピックでは、TableStoreReader を使用して同時データ読み取りを行う方法について説明します。
前提条件
Tablestore へのアクセス権限を持つ Alibaba Cloud アカウントまたは RAM ユーザーの アクセスキー を作成します。
手順
ステップ 1: Tablestore SDK をインストールする
Maven を使用して Java プロジェクトを管理する場合、pom.xml ファイルに次の依存関係を追加できます。
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>tablestore</artifactId>
<version>5.17.4</version>
</dependency> 詳細については、「Tablestore SDK をインストールする」をご参照ください。
ステップ 2: 初期化
TableStoreReader を初期化する前に、Tablestore Client 接続インスタンスを作成する必要があります。 TableStoreReader の構成パラメーターとコールバック関数をカスタマイズできます。 初期化のサンプルコードを以下に示します。
複数のスレッドを使用する場合は、1 つの TableStoreReader オブジェクトを共有することをお勧めします。
public static TableStoreReader createReader() {
// Tablestore Client を初期化します
client = new AsyncClient(endpoint, accessKeyId, accessKeySecret, instanceName);
// TableStoreReader の構成
TableStoreReaderConfig config = new TableStoreReaderConfig();
// スレッドプール
executorService = new ThreadPoolExecutor(4, 4, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(1024), new ThreadPoolExecutor.CallerRunsPolicy());
// コールバック関数
TableStoreCallback<PrimaryKeyWithTable, RowReadResult> callback = new TableStoreCallback<PrimaryKeyWithTable, RowReadResult>() {
@Override
public void onCompleted(PrimaryKeyWithTable primaryKeyWithTable, RowReadResult rowReadResult) {
succeedRows.incrementAndGet();
System.out.println(rowReadResult.getRowResult());
}
@Override
public void onFailed(PrimaryKeyWithTable primaryKeyWithTable, Exception e) {
failedRows.incrementAndGet();
System.out.println("Failed Rows: " + primaryKeyWithTable.getTableName() + " | " + primaryKeyWithTable.getPrimaryKey() + " | " + e.getMessage()); // 失敗した行
}
};
return new DefaultTableStoreReader(client, config, executorService, callback);
}コールバック関数が不要な場合は、初期化メソッドでコールバック関数パラメーターを null に設定できます。
return new DefaultTableStoreReader(client, config, executorService, null);
ステップ 3: データをクエリする
TableStoreReader を使用してデータをクエリする前に、行データのプライマリキー情報をバッファーに追加する必要があります。
PrimaryKey primaryKey = PrimaryKeyBuilder.createPrimaryKeyBuilder() .addPrimaryKeyColumn("id", PrimaryKeyValue.fromString("row1")) .build(); tableStoreReader.addPrimaryKey("test_table", primaryKey);クエリ後に行データを取得する必要がある場合は、
addPrimaryKeyWithFutureメソッドを使用できます。Future<ReaderResult> readerResult = tableStoreReader.addPrimaryKeyWithFuture("test_table", primaryKey);最大バージョン番号、データバージョン範囲、フィルターなど、クエリパラメーターを指定することもできます。
RowQueryCriteria rowQueryCriteria = new RowQueryCriteria("test_version"); // 読み取る最大バージョン数を設定します rowQueryCriteria.setMaxVersions(1); // 読み取るデータバージョン範囲を設定します rowQueryCriteria.setTimeRange(new TimeRange(System.currentTimeMillis() - 86400*1000, System.currentTimeMillis())); // 返す属性列を設定します rowQueryCriteria.addColumnsToGet("col1"); // フィルター条件を設定します SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter("col1", SingleColumnValueFilter.CompareOperator.EQUAL, ColumnValue.fromString("val1")); rowQueryCriteria.setFilter(singleColumnValueFilter); // クエリ条件を追加します tableStoreReader.setRowQueryCriteria(rowQueryCriteria);
プライマリキー情報がバッファーに追加されると、TableStoreReader は設定された自動送信時間間隔(デフォルトは 10 秒)に従って、バッファー内のデータをクエリのために Tablestore に送信します。 バッファーデータを手動で送信することもできます。
同期送信
tableStoreReader.flush();非同期送信
tableStoreReader.send();
ステップ 4: リソースを無効にする
データクエリの完了後、他に操作が必要ない場合は、業務システムの動作に影響を与えることなくリソースを無効にできます。
tableStoreReader.close();
client.shutdown();
executorService.shutdown();完全なサンプルコード
次のサンプルコードでは、TableStoreReader を使用して test_table テーブルの 200 行のデータを同時にクエリし、コールバック関数でクエリ結果を出力します。
public class TableStoreReaderExample {
private static final String endpoint = "https://n01k********.cn-hangzhou.ots.aliyuncs.com";
private static final String instanceName = "n01k********";
private static final String accessKeyId = System.getenv("TABLESTORE_ACCESS_KEY_ID");
private static final String accessKeySecret = System.getenv("TABLESTORE_ACCESS_KEY_SECRET");
private static AsyncClientInterface client;
private static ExecutorService executorService;
private static AtomicLong succeedRows = new AtomicLong();
private static AtomicLong failedRows = new AtomicLong();
public static void main(String[] args) throws InterruptedException, ExecutionException {
// TableStoreReader を作成します
TableStoreReader tableStoreReader = createReader();
// クエリデータのプライマリキーを追加します
for(int i=0; i<200; i++) {
PrimaryKey primaryKey = PrimaryKeyBuilder.createPrimaryKeyBuilder()
.addPrimaryKeyColumn("id", PrimaryKeyValue.fromString("row" + i))
.build();
tableStoreReader.addPrimaryKey("test_table", primaryKey);
}
// バッファー内のデータを送信します
tableStoreReader.flush();
// コールバック関数が完了するまで待機します
Thread.sleep(1000L);
System.out.println("Succeed Rows Count: " + succeedRows.get()); // 成功した行数
System.out.println("Failed Rows Count: " + failedRows.get()); // 失敗した行数
// リソースを無効にします
tableStoreReader.close();
client.shutdown();
executorService.shutdown();
}
public static TableStoreReader createReader() {
// Tablestore Client を初期化します
client = new AsyncClient(endpoint, accessKeyId, accessKeySecret, instanceName);
// TableStoreReader パラメーター構成
TableStoreReaderConfig config = new TableStoreReaderConfig();
// スレッドプール
executorService = new ThreadPoolExecutor(4, 4, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(1024), new ThreadPoolExecutor.CallerRunsPolicy());
// コールバック関数
TableStoreCallback<PrimaryKeyWithTable, RowReadResult> callback = new TableStoreCallback<PrimaryKeyWithTable, RowReadResult>() {
@Override
public void onCompleted(PrimaryKeyWithTable primaryKeyWithTable, RowReadResult rowReadResult) {
succeedRows.incrementAndGet();
System.out.println(rowReadResult.getRowResult());
}
@Override
public void onFailed(PrimaryKeyWithTable primaryKeyWithTable, Exception e) {
failedRows.incrementAndGet();
System.out.println("Failed Rows: " + primaryKeyWithTable.getTableName() + " | " + primaryKeyWithTable.getPrimaryKey() + " | " + e.getMessage()); // 失敗した行
}
};
return new DefaultTableStoreReader(client, config, executorService, callback);
}
}