TableStoreReader adalah utilitas pembacaan data sederhana dan berperforma tinggi yang diimplementasikan oleh Tablestore berdasarkan Java SDK. Utilitas ini mengenkapsulasi antarmuka untuk pembacaan data dengan konkurensi dan throughput tinggi, memungkinkan pembacaan data secara bersamaan serta mendukung callback tingkat baris dan fitur konfigurasi kustom. Topik ini menjelaskan cara menggunakan TableStoreReader untuk pembacaan data secara bersamaan.
Prasyarat
Sebuah pasangan AccessKey telah dibuat untuk akun Alibaba Cloud Anda atau pengguna RAM yang diberi izin untuk mengakses Tablestore.
Prosedur
Langkah 1: Instal Tablestore SDK untuk Java
Jika Anda menggunakan Maven untuk mengelola proyek Java, tambahkan dependensi berikut ke file pom.xml:
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>tablestore</artifactId>
<version>5.17.4</version>
</dependency> Untuk informasi lebih lanjut, lihat Instal Tablestore SDK untuk Java.
Langkah 2: Inisialisasi
Sebelum menginisialisasi TableStoreReader, buat instance koneksi klien Tablestore. Anda dapat menyesuaikan parameter dan fungsi callback untuk TableStoreReader. Contoh kode inisialisasi ditunjukkan di bawah ini.
Saat menggunakan beberapa thread, disarankan agar thread berbagi satu objek TableStoreReader.
public static TableStoreReader createReader() {
// Inisialisasi klien Tablestore.
client = new AsyncClient(endpoint, accessKeyId, accessKeySecret, instanceName);
// Konfigurasi TableStoreReader
TableStoreReaderConfig config = new TableStoreReaderConfig();
// Thread pool
executorService = new ThreadPoolExecutor(4, 4, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(1024), new ThreadPoolExecutor.CallerRunsPolicy());
// Fungsi callback
TableStoreCallback<PrimaryKeyWithTable, RowReadResult> callback = new TableStoreCallback<PrimaryKeyWithTable, RowReadResult>() {
@Override
public void onCompleted(PrimaryKeyWithTable primaryKeyWithTable, RowReadResult rowReadResult) {
succeedRows.incrementAndGet();
System.out.println(rowReadResult.getRowResult());
}
@Override
public void onFailed(PrimaryKeyWithTable primaryKeyWithTable, Exception e) {
failedRows.incrementAndGet();
System.out.println("Baris Gagal: " + primaryKeyWithTable.getTableName() + " | " + primaryKeyWithTable.getPrimaryKey() + " | " + e.getMessage());
}
};
return new DefaultTableStoreReader(client, config, executorService, callback);
}Jika Anda tidak memerlukan fungsi callback, atur parameter fungsi callback menjadi null dalam metode inisialisasi.
return new DefaultTableStoreReader(client, config, executorService, null);
Langkah 3: Query data
Sebelum menggunakan TableStoreReader untuk query data, tambahkan informasi kunci utama dari data baris ke buffer.
PrimaryKey primaryKey = PrimaryKeyBuilder.createPrimaryKeyBuilder() .addPrimaryKeyColumn("id", PrimaryKeyValue.fromString("row1")) .build(); tableStoreReader.addPrimaryKey("test_table", primaryKey);Jika Anda perlu mengambil data baris setelah query, gunakan metode
addPrimaryKeyWithFuture.Future<ReaderResult> readerResult = tableStoreReader.addPrimaryKeyWithFuture("test_table", primaryKey);Anda juga dapat menentukan parameter query, seperti nomor versi maksimum, rentang versi data, dan filter.
RowQueryCriteria rowQueryCriteria = new RowQueryCriteria("test_version"); // Tentukan jumlah maksimum versi yang dibaca. rowQueryCriteria.setMaxVersions(1); // Tentukan rentang versi data yang dibaca. rowQueryCriteria.setTimeRange(new TimeRange(System.currentTimeMillis() - 86400*1000, System.currentTimeMillis())); // Tentukan kolom atribut yang dikembalikan. rowQueryCriteria.addColumnsToGet("col1"); // Tentukan kondisi filter. SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter("col1", SingleColumnValueFilter.CompareOperator.EQUAL, ColumnValue.fromString("val1")); rowQueryCriteria.setFilter(singleColumnValueFilter); // Tambahkan kriteria query. tableStoreReader.setRowQueryCriteria(rowQueryCriteria);
Setelah informasi kunci utama ditambahkan ke buffer, TableStoreReader secara otomatis mengirimkan data di buffer ke Tablestore untuk query sesuai dengan interval yang ditentukan (interval default: 10 detik). Anda juga dapat mengirimkan data buffer secara manual.
Transmisi sinkron
tableStoreReader.flush();Transmisi asinkron
tableStoreReader.send();
Langkah 4: Nonaktifkan sumber daya
Setelah query data selesai, jika tidak ada operasi lain yang diperlukan, nonaktifkan sumber daya tanpa memengaruhi operasi sistem bisnis.
tableStoreReader.close();
client.shutdown();
executorService.shutdown();Kode contoh lengkap
Kode contoh berikut menggunakan TableStoreReader untuk query 200 baris data secara bersamaan di tabel test_table dan mencetak hasil query di fungsi callback.
public class TableStoreReaderExample {
private static final String endpoint = "https://n01k********.cn-hangzhou.ots.aliyuncs.com";
private static final String instanceName = "n01k********";
private static final String accessKeyId = System.getenv("TABLESTORE_ACCESS_KEY_ID");
private static final String accessKeySecret = System.getenv("TABLESTORE_ACCESS_KEY_SECRET");
private static AsyncClientInterface client;
private static ExecutorService executorService;
private static AtomicLong succeedRows = new AtomicLong();
private static AtomicLong failedRows = new AtomicLong();
public static void main(String[] args) throws InterruptedException, ExecutionException {
// Buat TableStoreReader.
TableStoreReader tableStoreReader = createReader();
// Tambahkan kunci utama untuk query data.
for(int i=0; i<200; i++) {
PrimaryKey primaryKey = PrimaryKeyBuilder.createPrimaryKeyBuilder()
.addPrimaryKeyColumn("id", PrimaryKeyValue.fromString("row" + i))
.build();
tableStoreReader.addPrimaryKey("test_table", primaryKey);
}
// Kirim data di buffer.
tableStoreReader.flush();
// Tunggu fungsi callback selesai.
Thread.sleep(1000L);
System.out.println("Jumlah Baris Berhasil: " + succeedRows.get());
System.out.println("Jumlah Baris Gagal: " + failedRows.get());
// Nonaktifkan sumber daya.
tableStoreReader.close();
client.shutdown();
executorService.shutdown();
}
public static TableStoreReader createReader() {
// Inisialisasi klien Tablestore.
client = new AsyncClient(endpoint, accessKeyId, accessKeySecret, instanceName);
// Konfigurasi parameter TableStoreReader
TableStoreReaderConfig config = new TableStoreReaderConfig();
// Thread pool
executorService = new ThreadPoolExecutor(4, 4, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(1024), new ThreadPoolExecutor.CallerRunsPolicy());
// Fungsi callback
TableStoreCallback<PrimaryKeyWithTable, RowReadResult> callback = new TableStoreCallback<PrimaryKeyWithTable, RowReadResult>() {
@Override
public void onCompleted(PrimaryKeyWithTable primaryKeyWithTable, RowReadResult rowReadResult) {
succeedRows.incrementAndGet();
System.out.println(rowReadResult.getRowResult());
}
@Override
public void onFailed(PrimaryKeyWithTable primaryKeyWithTable, Exception e) {
failedRows.incrementAndGet();
System.out.println("Baris Gagal: " + primaryKeyWithTable.getTableName() + " | " + primaryKeyWithTable.getPrimaryKey() + " | " + e.getMessage());
}
};
return new DefaultTableStoreReader(client, config, executorService, callback);
}
}