全部产品
Search
文档中心

Tablestore:Gunakan TableStoreReader untuk membaca data secara bersamaan

更新时间:Jul 06, 2025

TableStoreReader adalah utilitas pembacaan data sederhana dan berperforma tinggi yang diimplementasikan oleh Tablestore berdasarkan Java SDK. Utilitas ini mengenkapsulasi antarmuka untuk pembacaan data dengan konkurensi dan throughput tinggi, memungkinkan pembacaan data secara bersamaan serta mendukung callback tingkat baris dan fitur konfigurasi kustom. Topik ini menjelaskan cara menggunakan TableStoreReader untuk pembacaan data secara bersamaan.

Prasyarat

Sebuah pasangan AccessKey telah dibuat untuk akun Alibaba Cloud Anda atau pengguna RAM yang diberi izin untuk mengakses Tablestore.

Prosedur

Langkah 1: Instal Tablestore SDK untuk Java

Jika Anda menggunakan Maven untuk mengelola proyek Java, tambahkan dependensi berikut ke file pom.xml:

<dependency>
    <groupId>com.aliyun.openservices</groupId>
    <artifactId>tablestore</artifactId>
    <version>5.17.4</version>
</dependency>                 

Untuk informasi lebih lanjut, lihat Instal Tablestore SDK untuk Java.

Langkah 2: Inisialisasi

Sebelum menginisialisasi TableStoreReader, buat instance koneksi klien Tablestore. Anda dapat menyesuaikan parameter dan fungsi callback untuk TableStoreReader. Contoh kode inisialisasi ditunjukkan di bawah ini.

Catatan

Saat menggunakan beberapa thread, disarankan agar thread berbagi satu objek TableStoreReader.

public static TableStoreReader createReader() {
        // Inisialisasi klien Tablestore.
        client = new AsyncClient(endpoint, accessKeyId, accessKeySecret, instanceName);

        // Konfigurasi TableStoreReader
        TableStoreReaderConfig config = new TableStoreReaderConfig();

        // Thread pool
        executorService = new ThreadPoolExecutor(4, 4, 0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(1024), new ThreadPoolExecutor.CallerRunsPolicy());

        // Fungsi callback
        TableStoreCallback<PrimaryKeyWithTable, RowReadResult> callback = new TableStoreCallback<PrimaryKeyWithTable, RowReadResult>() {
            @Override
            public void onCompleted(PrimaryKeyWithTable primaryKeyWithTable, RowReadResult rowReadResult) {
                succeedRows.incrementAndGet();
                System.out.println(rowReadResult.getRowResult());
            }

            @Override
            public void onFailed(PrimaryKeyWithTable primaryKeyWithTable, Exception e) {
                failedRows.incrementAndGet();
                System.out.println("Baris Gagal: " + primaryKeyWithTable.getTableName() + " | " + primaryKeyWithTable.getPrimaryKey() + " | " + e.getMessage());
            }
        };

        return new DefaultTableStoreReader(client, config, executorService, callback);
    }

Parameter TableStoreReaderConfig

Parameter

Tipe

Deskripsi

checkTableMeta

boolean

Menentukan apakah akan mengaktifkan pemeriksaan skema. Nilai defaultnya adalah true. Saat diaktifkan, TableStoreReader melakukan pemeriksaan berikut sebelum data ditulis ke buffer.

  • Apakah tabel data ada.

  • Apakah skema kunci utama dari data yang ingin Anda query sama dengan kunci utama tabel.

bufferSize

int

Ukuran antrian buffer. Ukurannya harus merupakan pangkat dua. Nilai defaultnya adalah 1024.

concurrency

int

Jumlah maksimum permintaan bersamaan saat mengirim data buffer ke Tablestore. Nilai defaultnya adalah 10.

maxBatchRowsCount

int

Jumlah maksimum baris yang dibaca dalam permintaan batch. Nilai defaultnya adalah 100. Nilai maksimumnya adalah 100.

defaultMaxVersions

int

Jumlah versi data yang dibaca. Nilai defaultnya adalah 1, yang berarti hanya versi terbaru dari data yang dibaca.

flushInterval

int

Interval waktu untuk mengirim data buffer ke Tablestore secara otomatis. Nilai defaultnya adalah 10000. Satuannya adalah milidetik.

logInterval

int

Interval waktu untuk mencetak status tugas saat mengirim data buffer ke Tablestore. Nilai defaultnya adalah 10000. Satuannya adalah milidetik.

bucketCount

int

Jumlah bucket. Setiap bucket setara dengan sebuah buffer. Nilai defaultnya adalah 4.

Contoh kode berikut menunjukkan cara mengonfigurasi parameter.

// Tentukan apakah akan mengaktifkan pemeriksaan skema.
config.setCheckTableMeta(false);
// Tentukan ukuran antrian buffer.
config.setBufferSize(1024);
// Tentukan konkurensi.
config.setConcurrency(10);
// Tentukan jumlah maksimum baris untuk permintaan batch.
config.setMaxBatchRowsCount(100);
// Tentukan jumlah versi data yang dibaca.
config.setDefaultMaxVersions(1);
// Tentukan interval flush.
config.setFlushInterval(10000);
// Tentukan interval log.
config.setLogInterval(10000);
// Tentukan jumlah bucket.
config.setBucketCount(4);
  • Jika Anda tidak memerlukan fungsi callback, atur parameter fungsi callback menjadi null dalam metode inisialisasi.

    return new DefaultTableStoreReader(client, config, executorService, null);

Langkah 3: Query data

  1. Sebelum menggunakan TableStoreReader untuk query data, tambahkan informasi kunci utama dari data baris ke buffer.

    PrimaryKey primaryKey = PrimaryKeyBuilder.createPrimaryKeyBuilder()
            .addPrimaryKeyColumn("id", PrimaryKeyValue.fromString("row1"))
            .build();
    tableStoreReader.addPrimaryKey("test_table", primaryKey);
    • Jika Anda perlu mengambil data baris setelah query, gunakan metode addPrimaryKeyWithFuture.

      Future<ReaderResult> readerResult = tableStoreReader.addPrimaryKeyWithFuture("test_table", primaryKey);
    • Anda juga dapat menentukan parameter query, seperti nomor versi maksimum, rentang versi data, dan filter.

      RowQueryCriteria rowQueryCriteria = new RowQueryCriteria("test_version");
      // Tentukan jumlah maksimum versi yang dibaca.
      rowQueryCriteria.setMaxVersions(1);
      // Tentukan rentang versi data yang dibaca.
      rowQueryCriteria.setTimeRange(new TimeRange(System.currentTimeMillis() - 86400*1000, System.currentTimeMillis()));
      // Tentukan kolom atribut yang dikembalikan.
      rowQueryCriteria.addColumnsToGet("col1");
      // Tentukan kondisi filter.
      SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter("col1", SingleColumnValueFilter.CompareOperator.EQUAL, ColumnValue.fromString("val1"));
      rowQueryCriteria.setFilter(singleColumnValueFilter);
      // Tambahkan kriteria query.
      tableStoreReader.setRowQueryCriteria(rowQueryCriteria);
  2. Setelah informasi kunci utama ditambahkan ke buffer, TableStoreReader secara otomatis mengirimkan data di buffer ke Tablestore untuk query sesuai dengan interval yang ditentukan (interval default: 10 detik). Anda juga dapat mengirimkan data buffer secara manual.

    • Transmisi sinkron

      tableStoreReader.flush();
    • Transmisi asinkron

      tableStoreReader.send();

Langkah 4: Nonaktifkan sumber daya

Setelah query data selesai, jika tidak ada operasi lain yang diperlukan, nonaktifkan sumber daya tanpa memengaruhi operasi sistem bisnis.

tableStoreReader.close();
client.shutdown();
executorService.shutdown();

Kode contoh lengkap

Kode contoh berikut menggunakan TableStoreReader untuk query 200 baris data secara bersamaan di tabel test_table dan mencetak hasil query di fungsi callback.

public class TableStoreReaderExample {
    private static final String endpoint = "https://n01k********.cn-hangzhou.ots.aliyuncs.com";
    private static final String instanceName = "n01k********";
    private static final String accessKeyId = System.getenv("TABLESTORE_ACCESS_KEY_ID");
    private static final String accessKeySecret = System.getenv("TABLESTORE_ACCESS_KEY_SECRET");
    private static AsyncClientInterface client;
    private static ExecutorService executorService;
    private static AtomicLong succeedRows = new AtomicLong();
    private static AtomicLong failedRows = new AtomicLong();

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        // Buat TableStoreReader.
        TableStoreReader tableStoreReader = createReader();

        // Tambahkan kunci utama untuk query data.
        for(int i=0; i<200; i++) {
            PrimaryKey primaryKey = PrimaryKeyBuilder.createPrimaryKeyBuilder()
                    .addPrimaryKeyColumn("id", PrimaryKeyValue.fromString("row" + i))
                    .build();
            tableStoreReader.addPrimaryKey("test_table", primaryKey);
        }

        // Kirim data di buffer.
        tableStoreReader.flush();

        // Tunggu fungsi callback selesai.
        Thread.sleep(1000L);

        System.out.println("Jumlah Baris Berhasil: " + succeedRows.get());
        System.out.println("Jumlah Baris Gagal: " + failedRows.get());

        // Nonaktifkan sumber daya.
        tableStoreReader.close();
        client.shutdown();
        executorService.shutdown();
    }

    public static TableStoreReader createReader() {
        // Inisialisasi klien Tablestore.
        client = new AsyncClient(endpoint, accessKeyId, accessKeySecret, instanceName);

        // Konfigurasi parameter TableStoreReader
        TableStoreReaderConfig config = new TableStoreReaderConfig();

        // Thread pool
        executorService = new ThreadPoolExecutor(4, 4, 0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(1024), new ThreadPoolExecutor.CallerRunsPolicy());

        // Fungsi callback
        TableStoreCallback<PrimaryKeyWithTable, RowReadResult> callback = new TableStoreCallback<PrimaryKeyWithTable, RowReadResult>() {
            @Override
            public void onCompleted(PrimaryKeyWithTable primaryKeyWithTable, RowReadResult rowReadResult) {
                succeedRows.incrementAndGet();
                System.out.println(rowReadResult.getRowResult());
            }

            @Override
            public void onFailed(PrimaryKeyWithTable primaryKeyWithTable, Exception e) {
                failedRows.incrementAndGet();
                System.out.println("Baris Gagal: " + primaryKeyWithTable.getTableName() + " | " + primaryKeyWithTable.getPrimaryKey() + " | " + e.getMessage());
            }
        };

        return new DefaultTableStoreReader(client, config, executorService, callback);
    }
}