All Products
Search
Document Center

Tablestore:Gunakan TableStoreWriter untuk menulis data secara bersamaan

Last Updated:Jul 06, 2025

Topik ini menjelaskan cara menggunakan TableStoreWriter untuk menulis data ke Tablestore dengan konkurensi tinggi.

Informasi latar belakang

Dalam skenario seperti pencatatan log dan pelacakan IoT, sistem menghasilkan sejumlah besar data dalam waktu singkat dan menulis data tersebut ke database. Database perlu menyediakan throughput penulisan yang tinggi hingga puluhan ribu atau bahkan jutaan baris per detik. Namun, operasi BatchWriteRow dari Tablestore hanya dapat menulis hingga 200 baris dalam satu batch.

TableStoreWriter adalah alat impor data berperforma tinggi dan mudah digunakan yang disediakan oleh Tablestore SDK untuk Java. Kelas ini membungkus operasi yang digunakan untuk mengimpor data dengan konkurensi dan throughput tinggi. Anda dapat menggunakan TableStoreWriter untuk menulis data ke tabel Tablestore dengan konkurensi tinggi serta menentukan callback tingkat baris dan konfigurasi kustom. Untuk informasi lebih lanjut, lihat Lampiran 1: Prinsip Kerja TableStoreWriter.

Catatan

TableStoreWriter hanya cocok untuk model Wide Column.

Skenario

Jika Anda memiliki persyaratan bisnis berikut, Anda dapat menggunakan TableStoreWriter untuk menulis data ke Tablestore dalam skenario seperti penyimpanan log, pesan instan, dan antrian terdistribusi:

  • Throughput tinggi diperlukan untuk mendukung konkurensi tinggi aplikasi.

  • Tidak ada persyaratan untuk latensi penulisan pada baris data tunggal.

  • Data dapat ditulis secara asinkron (mode produsen-konsumen dapat digunakan).

  • Baris data yang sama dapat ditulis ulang.

Prasyarat

  • Tablestore telah diaktifkan dan sebuah instance telah dibuat. Untuk informasi lebih lanjut, lihat Aktifkan Tablestore dan Buat Instance.

  • Pasangan AccessKey dari akun Alibaba Cloud Anda atau Pengguna Resource Access Management (RAM) telah diperoleh. Untuk informasi lebih lanjut, lihat Buat Pasangan AccessKey.

Prosedur

Langkah 1: Instal Tablestore SDK untuk Java

Jika Anda menggunakan Maven untuk mengelola proyek Java, tambahkan dependensi berikut ke file pom.xml:

<dependency>
    <groupId>com.aliyun.openservices</groupId>
    <artifactId>tablestore</artifactId>
    <version>5.17.4</version>
</dependency>                 

Untuk informasi tentang cara menginstal Tablestore SDK untuk Java, lihat Instal Tablestore SDK untuk Java.

Langkah 2: Inisialisasi TableStoreWriter

Saat menginisialisasi TableStoreWriter, Anda harus menentukan informasi instance dan tabel serta informasi autentikasi identitas. Anda juga dapat menentukan parameter TableStoreWriter kustom dan fungsi callback. Kami merekomendasikan agar beberapa thread berbagi objek TableStoreWriter yang sama. Contoh kode berikut memberikan contoh cara menginisialisasi TableStoreWriter.

Untuk informasi tentang parameter dan fungsi callback yang didukung oleh TableStoreWriter, lihat Lampiran 2: Parameter TableStoreWriter dan Lampiran 3: Fungsi Callback TableStoreWriter.
private static TableStoreWriter createTablesStoreWriter() {
    
    /**
     * Dalam kebanyakan kasus, Anda dapat mempertahankan nilai default untuk parameter TableStoreWriter. Anda juga dapat mengatur parameter tersebut ke nilai lain berdasarkan kebutuhan bisnis Anda. 
     * Untuk informasi lebih lanjut tentang parameter, lihat Lampiran 2: Parameter TableStoreWriter. 
     * */
    WriterConfig config = new WriterConfig();
    // Tentukan jumlah maksimum baris yang dapat ditulis ke Tablestore dalam permintaan penulisan batch. Nilai default: 200. 
    config.setMaxBatchRowsCount(200); 
    // Tentukan jumlah maksimum permintaan paralel yang digunakan TableStoreWriter untuk menulis data dalam buffer ke Tablestore. Nilai default: 10. Kami merekomendasikan agar Anda mempertahankan nilai default.                          
    config.setConcurrency(10);    
            
    /**
     * Konfigurasikan callback tingkat baris. 
     * Dalam contoh ini, callback dikonfigurasikan untuk menghitung jumlah baris yang berhasil ditulis ke Tablestore dan jumlah baris yang gagal ditulis ke Tablestore. 
     * */
    TableStoreCallback<RowChange, RowWriteResult> resultCallback = new TableStoreCallback<RowChange, RowWriteResult>() {
        @Override
        public void onCompleted(RowChange rowChange, RowWriteResult cc) {
            succeedRows.incrementAndGet();
        }

        @Override
        public void onFailed(RowChange rowChange, Exception ex) {
            failedRows.incrementAndGet();
        }
    };

    /** Konfigurasikan kredensial akses.   **/
    ServiceCredentials credentials = new DefaultCredentials(accessKeyId, accessKeySecret);

    /**
     * Kami merekomendasikan agar Anda menggunakan thread pools dan client bawaan, yang lebih mudah digunakan dan mengisolasi logika inisialisasi dan pelepasan. 
     * */
    DefaultTableStoreWriter writer = new DefaultTableStoreWriter(
        endpoint, credentials, instanceName, tableName, config, resultCallback);

    return writer;
}

Langkah 3: Menulis data

Anda dapat membuat RowChanges berdasarkan operasi tambah, hapus, dan modifikasi yang berbeda, lalu tambahkan RowChanges ke TableStoreWriter.

Tulis satu baris data sekaligus

Contoh kode berikut memberikan contoh cara menulis 1.000 baris data ke tabel dengan menulis satu baris data sekaligus:

public void writeSingleRowWithFuture(TableStoreWriter writer) {
    System.out.println("=========================================================[Mulai]");
    System.out.println("Tulis Satu Baris dengan Future");
    int rowsCount = 1000;
    int columnsCount = 10;
    String strValue = "1234567890";
    AtomicLong rowIndex = new AtomicLong(-1);

    List<Future<WriterResult>> futures = new LinkedList<Future<WriterResult>>();
    for (long index = rowIndex.incrementAndGet(); index < rowsCount; index = rowIndex.incrementAndGet()) {

        PrimaryKey pk = PrimaryKeyBuilder.createPrimaryKeyBuilder()
                .addPrimaryKeyColumn("pk_0", PrimaryKeyValue.fromString(md5Hex(index + "")))
                .addPrimaryKeyColumn("pk_1", PrimaryKeyValue.fromString("pk" + index))
                .addPrimaryKeyColumn("pk_2", PrimaryKeyValue.fromLong(index % 5))
                .build();

        RowUpdateChange rowChange = new RowUpdateChange(tableName, pk);
        for (int j = 0; j < columnsCount; j++) {
            rowChange.put("column_" + j, ColumnValue.fromString(strValue));
        }
        rowChange.put("index", ColumnValue.fromLong(index));
        Future<WriterResult> future = writer.addRowChangeWithFuture(rowChange);
        futures.add(future);
    }

    System.out.println("Thread penulisan selesai.");
    // Flush data dalam buffer ke Tablestore. TableStoreWriter menentukan kapan data dalam buffer diflush ke Tablestore berdasarkan parameter flushInterval dan maxBatchSize. Parameter flushInterval menentukan interval di mana data dalam buffer diflush ke Tablestore. Parameter maxBatchSize menentukan apakah data dalam buffer diflush ke Tablestore berdasarkan jumlah data dalam buffer. 
    writer.flush();
    
    // Tampilkan objek Future. 
    // printFutureResult(futures);

    System.out.println("=========================================================[Selesai]");
}

Tulis beberapa baris data sekaligus

Contoh kode berikut memberikan contoh cara menulis 1.000 baris ke tabel dengan menulis beberapa baris data sekaligus:

public void writeRowListWithFuture(TableStoreWriter writer) {
    System.out.println("=========================================================[Mulai]");
    System.out.println("Tulis Daftar Baris dengan Future");

    int rowsCount = 1000;
    int columnsCount = 10;
    String strValue = "1234567890";
    AtomicLong rowIndex = new AtomicLong(-1);

    List<Future<WriterResult>> futures = new LinkedList<Future<WriterResult>>();
    List<RowChange> rowChanges = new LinkedList<RowChange>();
    for (long index = rowIndex.incrementAndGet(); index < rowsCount; index = rowIndex.incrementAndGet()) {

        PrimaryKey pk = PrimaryKeyBuilder.createPrimaryKeyBuilder()
                .addPrimaryKeyColumn("pk_0", PrimaryKeyValue.fromString(md5Hex(index + "")))
                .addPrimaryKeyColumn("pk_1", PrimaryKeyValue.fromString("pk" + index))
                .addPrimaryKeyColumn("pk_2", PrimaryKeyValue.fromLong(index % 5))
                .build();

        RowUpdateChange rowChange = new RowUpdateChange(tableName, pk);
        for (int j = 0; j < columnsCount; j++) {
            rowChange.put("column_" + j, ColumnValue.fromString(strValue));
        }
        rowChange.put("index", ColumnValue.fromLong(index));
        rowChanges.add(rowChange);
        if (Math.random() > 0.995 || index == rowsCount - 1) {
            Future<WriterResult> future = writer.addRowChangeWithFuture(rowChanges);
            futures.add(future);
            rowChanges.clear();
        }
    }

    System.out.println("Thread penulisan selesai.");
    // Flush data dalam buffer ke Tablestore. TableStoreWriter menentukan kapan data dalam buffer diflush ke Tablestore berdasarkan parameter flushInterval dan maxBatchSize. Parameter flushInterval menentukan interval di mana data dalam buffer diflush ke Tablestore. Parameter maxBatchSize menentukan apakah data dalam buffer diflush ke Tablestore berdasarkan jumlah data dalam buffer. 
    writer.flush();
    
    // Tampilkan objek Future. 
    // printFutureResult(futures);
    
    System.out.println("=========================================================[Selesai]");
}

Langkah 4: Matikan TableStoreWriter

Kami merekomendasikan agar Anda mematikan TableStoreWriter secara manual sebelum keluar dari aplikasi. Sebelum Anda mematikan TableStoreWriter, sistem akan me-flush semua data dalam buffer ke Tablestore.

Catatan

Jika Anda memanggil metode addRowChange untuk menulis data ke buffer saat Anda sedang mematikan TableStoreWriter atau setelah Anda mematikan TableStoreWriter, data mungkin tidak akan ditulis ke Tablestore.

// Matikan TableStoreWriter secara proaktif. Setelah semua data dalam antrian ditulis ke Tablestore, sistem secara otomatis mematikan client dan thread pool internal. 
writer.close();

Kode sampel lengkap

Contoh kode berikut memberikan contoh cara membuat tabel dan menulis data secara bersamaan ke tabel:

import com.alicloud.openservices.tablestore.DefaultTableStoreWriter;
import com.alicloud.openservices.tablestore.SyncClient;
import com.alicloud.openservices.tablestore.TableStoreCallback;
import com.alicloud.openservices.tablestore.TableStoreWriter;
import com.alicloud.openservices.tablestore.core.auth.DefaultCredentials;
import com.alicloud.openservices.tablestore.core.auth.ServiceCredentials;
import com.alicloud.openservices.tablestore.model.*;
import com.alicloud.openservices.tablestore.writer.RowWriteResult;
import com.alicloud.openservices.tablestore.writer.WriterConfig;
import com.alicloud.openservices.tablestore.writer.WriterResult;

import com.aliyuncs.exceptions.ClientException;

import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;

import static org.apache.commons.codec.digest.DigestUtils.md5Hex;

public class TableStoreWriterDemo {

    // Tentukan nama instance.
    private static String instanceName = "yourInstanceName";
    // Tentukan endpoint instance.
    private static String endpoint = "yourEndpoint";
    // Dapatkan ID AccessKey dan rahasia AccessKey dari variabel lingkungan.
    private static String accessKeyId = System.getenv("TABLESTORE_ACCESS_KEY_ID");
    private static String accessKeySecret = System.getenv("TABLESTORE_ACCESS_KEY_SECRET");
    private static String tableName = "<TABLE_NAME>";

    private static AtomicLong succeedRows = new AtomicLong();
    private static AtomicLong failedRows = new AtomicLong();

    public static void main(String[] args) throws ClientException {
        TableStoreWriterDemo sample = new TableStoreWriterDemo();

        /**
         * Pastikan tabel sudah ada sebelum Anda menggunakan TableStoreWriter. 
         * 1. TableStoreWriter memverifikasi apakah tabel ada.
         * 2. Periksa apakah bidang dan tipe bidang data yang ingin Anda tulis sama dengan bidang dan tipe bidang tabel. 
         * */
        sample.tryCreateTable();

        /**
         * Kami merekomendasikan agar Anda menggunakan DefaultTablestoreWriter untuk menginisialisasi TableStoreWriter. 
         * DefaultTableStoreWriter(
         *      String endpoint,                                                   // Endpoint instance. 
         *      ServiceCredentials credentials,                                    // Kredensial akses, termasuk pasangan AccessKey. Token didukung.
         *      String instanceName,                                               // Nama instance. 
         *      String tableName,                                                  // Nama tabel. TableStoreWriter dapat menulis data ke satu tabel saja. 
         *      WriterConfig config,                                               // Konfigurasi TableStoreWriter. 
         *      TableStoreCallback<RowChange, RowWriteResult> resultCallback       // Callback tingkat baris. 
         * )
         * */
        TableStoreWriter writer = sample.createTablesStoreWriter();

        /**
         * Metode untuk menulis satu baris data sekaligus digunakan dalam objek Future.
         * */
        sample.writeSingleRowWithFuture(writer);
        /**
         * Metode untuk menulis beberapa baris data sekaligus digunakan dalam objek Future.
         * */   
        //sample.writeRowListWithFuture(writer);

        System.out.println("Hitungan oleh TablestoreCallback: failedRow=" + failedRows.get() + ", succeedRow=" + succeedRows.get());
        System.out.println("Hitungan oleh WriterStatics: " + writer.getWriterStatistics());

        /**
         * Matikan TableStoreWriter secara proaktif. Setelah semua data dalam antrian ditulis ke Tablestore, sistem secara otomatis mematikan client dan thread pool internal. 
         * */
        writer.close();
    }

    private static TableStoreWriter createTablesStoreWriter() {

        WriterConfig config = new WriterConfig();
        // Tentukan jumlah maksimum baris yang dapat ditulis ke Tablestore dalam permintaan penulisan batch. Nilai default: 200. Jika Anda ingin menulis lebih dari 200 baris data dalam permintaan penulisan batch, tentukan nilai yang lebih besar untuk parameter ini. 
        config.setMaxBatchRowsCount(200); 
        // Tentukan jumlah maksimum permintaan paralel yang digunakan TableStoreWriter untuk menulis data dalam buffer ke Tablestore. Nilai default: 10. Kami merekomendasikan agar Anda mempertahankan nilai default.                          
        config.setConcurrency(10);                                   

        /**
         * Konfigurasikan callback tingkat baris. 
         * Dalam contoh ini, callback dikonfigurasikan untuk menghitung jumlah baris yang berhasil ditulis ke Tablestore dan jumlah baris yang gagal ditulis ke Tablestore. 
         * */
        TableStoreCallback<RowChange, RowWriteResult> resultCallback = new TableStoreCallback<RowChange, RowWriteResult>() {
            @Override
            public void onCompleted(RowChange rowChange, RowWriteResult cc) {
                succeedRows.incrementAndGet();
            }

            @Override
            public void onFailed(RowChange rowChange, Exception ex) {
                failedRows.incrementAndGet();
            }
        };

        ServiceCredentials credentials = new DefaultCredentials(accessKeyId, accessKeySecret);


        /**
         * Kami merekomendasikan agar Anda menggunakan thread pools dan client bawaan, yang lebih mudah digunakan dan mengisolasi logika inisialisasi dan pelepasan. 
         * */
        DefaultTableStoreWriter writer = new DefaultTableStoreWriter(
                endpoint, credentials, instanceName, tableName, config, resultCallback);

        return writer;
    }


    private static void tryCreateTable() throws ClientException {
        SyncClient ots = new SyncClient(endpoint, accessKeyId, accessKeySecret, instanceName);

        try {
            ots.deleteTable(new DeleteTableRequest(tableName));
        } catch (Exception e) {
        }

        TableMeta tableMeta = new TableMeta(tableName);
        tableMeta.addPrimaryKeyColumn("pk_0", PrimaryKeyType.STRING);
        tableMeta.addPrimaryKeyColumn("pk_1", PrimaryKeyType.STRING);
        tableMeta.addPrimaryKeyColumn("pk_2", PrimaryKeyType.INTEGER);
        TableOptions tableOptions = new TableOptions(-1, 1);
        CreateTableRequest request = new CreateTableRequest(
                tableMeta, tableOptions, new ReservedThroughput(new CapacityUnit(0, 0)));

        try {
            CreateTableResponse res = ots.createTable(request);
        } catch (Exception e) {
            throw new ClientException(e);
        } finally {
            ots.shutdown();
        }
    }

    public static void writeSingleRowWithFuture(TableStoreWriter writer) {
        System.out.println("=========================================================[Mulai]");
        System.out.println("Tulis Satu Baris dengan Future");
        int rowsCount = 1000;
        int columnsCount = 10;
        String strValue = "1234567890";
        AtomicLong rowIndex = new AtomicLong(-1);

        List<Future<WriterResult>> futures = new LinkedList<Future<WriterResult>>();
        for (long index = rowIndex.incrementAndGet(); index < rowsCount; index = rowIndex.incrementAndGet()) {

            PrimaryKey pk = PrimaryKeyBuilder.createPrimaryKeyBuilder()
                .addPrimaryKeyColumn("pk_0", PrimaryKeyValue.fromString(md5Hex(index + "")))
                .addPrimaryKeyColumn("pk_1", PrimaryKeyValue.fromString("pk" + index))
                .addPrimaryKeyColumn("pk_2", PrimaryKeyValue.fromLong(index % 5))
                .build();

            RowUpdateChange rowChange = new RowUpdateChange(tableName, pk);
            for (int j = 0; j < columnsCount; j++) {
                rowChange.put("column_" + j, ColumnValue.fromString(strValue));
            }
            rowChange.put("index", ColumnValue.fromLong(index));
            Future<WriterResult> future = writer.addRowChangeWithFuture(rowChange);
            futures.add(future);
        }

        System.out.println("Thread penulisan selesai.");
        writer.flush();
        // Tampilkan objek Future. 
        // printFutureResult(futures);

        System.out.println("=========================================================[Selesai]");
    }
    
    public void writeRowListWithFuture(TableStoreWriter writer) {
        System.out.println("=========================================================[Mulai]");
        System.out.println("Tulis Daftar Baris dengan Future");

        int rowsCount = 1000;
        int columnsCount = 10;
        String strValue = "1234567890";
        AtomicLong rowIndex = new AtomicLong(-1);

        List<Future<WriterResult>> futures = new LinkedList<Future<WriterResult>>();
        List<RowChange> rowChanges = new LinkedList<RowChange>();
        for (long index = rowIndex.incrementAndGet(); index < rowsCount; index = rowIndex.incrementAndGet()) {

            PrimaryKey pk = PrimaryKeyBuilder.createPrimaryKeyBuilder()
                    .addPrimaryKeyColumn("pk_0", PrimaryKeyValue.fromString(md5Hex(index + "")))
                    .addPrimaryKeyColumn("pk_1", PrimaryKeyValue.fromString("pk" + index))
                    .addPrimaryKeyColumn("pk_2", PrimaryKeyValue.fromLong(index % 5))
                    .build();

            RowUpdateChange rowChange = new RowUpdateChange(tableName, pk);
            for (int j = 0; j < columnsCount; j++) {
                rowChange.put("column_" + j, ColumnValue.fromString(strValue));
            }
            rowChange.put("index", ColumnValue.fromLong(index));
            rowChanges.add(rowChange);
            if (Math.random() > 0.995 || index == rowsCount - 1) {
                Future<WriterResult> future = writer.addRowChangeWithFuture(rowChanges);
                futures.add(future);
                rowChanges.clear();
            }
    }

    System.out.println("Thread penulisan selesai.");
    writer.flush();
    // Tampilkan objek Future. 
    // printFutureResult(futures);
    System.out.println("=========================================================[Selesai]");
    }


    private static void printFutureResult(List<Future<WriterResult>> futures) {
        int totalRow = 0;

        for (int index = 0; index < futures.size(); index++) {
            try {
                WriterResult result = futures.get(index).get();
                totalRow += result.getTotalCount();
                System.out.println(String.format(
                        "Future[%d] selesai:\tgagal: %d\tberhasil: %d\tfutureBatch: %d\ttotalFinished: %d",
                        index, result.getFailedRows().size(), result.getSucceedRows().size(),
                        result.getTotalCount(), totalRow));

            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
    }
}

Dalam contoh ini, hasil berikut dikembalikan:

=========================================================[Mulai]
Tulis Satu Baris dengan Future
Thread penulisan selesai.
=========================================================[Selesai]
Hitungan oleh TablestoreCallback: failedRow=0, succeedRow=1000
Hitungan oleh WriterStatics: WriterStatistics: {
    totalRequestCount=6,
    totalRowsCount=1000,
    totalSucceedRowsCount=1000,
    totalFailedRowsCount=0,
    totalSingleRowRequestCount=0,
}

FAQ

Apa yang harus saya lakukan jika kesalahan "The count of attribute columns exceeds the maximum:128" dilaporkan ketika saya menggunakan Tablestore SDK untuk Java untuk menulis data ke tabel Tablestore?

Lampiran

Lampiran 1: Prinsip kerja TableStoreWriter

TableStoreWriter adalah kelas alat yang direenkapsulasi berdasarkan operasi pada lapisan pemrosesan SDK. TableStoreWriter bergantung pada operasi berikut:

  • AsyncClient, digunakan untuk menginisialisasi TableStoreWriter.

  • BatchWriteRow, digunakan untuk mengimpor data ke Tablestore dengan TableStoreWriter.

  • RetryStrategy, digunakan untuk mencoba kembali oleh TableStoreWriter ketika terjadi kegagalan penulisan.

Gambar berikut menunjukkan arsitektur hierarki kode.

EN

TableStoreWriter dioptimalkan dalam hal performa dan kemudahan penggunaan dan menyediakan fitur-fitur berikut:

  • Operasi Asinkron: Lebih sedikit thread digunakan untuk menyediakan konkurensi yang lebih tinggi.

  • Aggregasi Data Otomatis: Antrian buffer digunakan dalam memori untuk memaksimalkan jumlah permintaan penulisan yang dikirim ke Tablestore pada saat yang sama. Ini meningkatkan throughput penulisan.

  • Mode Produsen-Konsumen: Mode ini memfasilitasi pemrosesan asinkron dan agregasi data.

  • Antrian Pertukaran Data Berperforma Tinggi: Disruptor RingBuffer memberikan performa yang lebih baik dalam mode multi-produsen dan single-konsumen.

  • Penyembunyian Enkapsulasi Permintaan Kompleks: Detail pemanggilan operasi BatchWriteRow disembunyikan. Data kotor disaring menggunakan pemeriksaan awal, dan batasan permintaan, seperti batasan pada jumlah atau ukuran baris yang dapat diimpor dalam batch, diproses secara otomatis. Data kotor mengacu pada baris yang skemanya berbeda dari tabel, ukurannya melebihi batas atas, atau di mana jumlah kolom melebihi batas atas.

  • Callback Tingkat Baris: Dibandingkan dengan callback tingkat permintaan yang disediakan oleh Tablestore SDK untuk Java, callback tingkat baris yang disediakan oleh TableStoreWriter memungkinkan logika bisnis memproses data pada tingkat baris.

  • Retries Tingkat Baris: Jika retries tingkat permintaan gagal, retries tingkat baris dilakukan untuk kode kesalahan tertentu untuk memastikan tingkat keberhasilan penulisan tertinggi yang mungkin.

Gambar berikut menunjukkan mekanisme kerja dan detail enkapsulasi TableStoreWriter.

EN2

Lampiran 2: Parameter TableStoreWriter

Saat Anda menginisialisasi TableStoreWriter, Anda dapat memodifikasi WriterConfig untuk menentukan parameter TableStoreWriter berdasarkan kebutuhan bisnis Anda.

Parameter

Tipe

Deskripsi

bucketCount

Integer

Jumlah bucket di TableStoreWriter. Nilai default: 3. Bucket setara dengan buffer yang digunakan untuk menyimpan data sementara. Anda dapat menentukan parameter ini untuk meningkatkan jumlah permintaan penulisan berurutan paralel. Jika bottleneck mesin belum tercapai, jumlah bucket berkorelasi positif dengan laju penulisan.

Catatan

Jika mode penulisan bucket adalah penulisan bersamaan, pertahankan nilai default.

bufferSize

Integer

Jumlah maksimum baris yang dapat disertakan dalam antrian buffer di memori. Nilai default: 1024. Nilai parameter ini harus merupakan kelipatan eksponensial dari 2.

enableSchemaCheck

Boolean

Menentukan apakah akan memeriksa skema saat data diflush ke buffer. Nilai default: true.

  • Nilai true menentukan bahwa sebelum data baris diflush ke buffer, TableStoreWriter melakukan pemeriksaan berikut pada data baris. Jika data baris gagal dalam pemeriksaan sebelumnya, TableStoreWriter menganggap data baris sebagai data kotor. Data kotor tidak diflush ke buffer.

    • Periksa apakah skema kunci utama baris sama dengan tabel.

    • Periksa apakah ukuran nilai setiap kolom kunci utama atau kolom atribut dalam baris melebihi batas atas.

    • Periksa apakah jumlah kolom atribut dalam baris melebihi batas atas.

    • Periksa apakah nama kolom atribut sama dengan kolom kunci utama dalam baris.

    • Periksa apakah ukuran baris melebihi jumlah maksimum data yang dapat diimpor sekaligus menggunakan permintaan.

  • Nilai false menentukan bahwa jika data baris tertentu dalam buffer adalah data kotor, TableStoreWriter tidak dapat menulis data baris ke Tablestore.

dispatchMode

DispatchMode

Mode di mana data didistribusikan ke bucket saat data diflush ke buffer. Nilai yang valid:

  • HASH_PARTITION_KEY: mendistribusikan data ke bucket berdasarkan nilai hash dari partition key. Data yang memiliki nilai hash partition key yang sama ditulis secara berurutan ke bucket yang sama. Ini adalah nilai default.

  • HASH_PRIMARY_KEY: mendistribusikan data ke bucket berdasarkan nilai hash dari primary key. Data yang memiliki nilai hash primary key yang sama ditulis secara berurutan ke bucket yang sama.

  • ROUND_ROBIN: melintasi setiap bucket untuk mendistribusikan data dalam siklus. Data didistribusikan secara acak ke bucket yang berbeda.

Penting

Parameter ini hanya berlaku jika jumlah bucket lebih besar dari atau sama dengan 2.

batchRequestType

BatchRequestType

Tipe permintaan yang digunakan TableStoreWriter untuk menulis data dalam buffer ke Tablestore. Nilai yang valid:

  • BATCH_WRITE_ROW: BatchWriteRowRequest. Ini adalah nilai default.

  • BULK_IMPORT: BulkImportRequest.

concurrency

Integer

Jumlah maksimum permintaan paralel yang digunakan TableStoreWriter untuk menulis data dalam buffer ke Tablestore. Nilai default: 10.

writeMode

WriteMode

Mode di mana data dalam bucket ditulis ke Tablestore ketika TableStoreWriter menulis data dalam buffer ke Tablestore. Nilai yang valid:

  • PARALLEL: menulis data secara paralel dari bucket ke Tablestore dan secara bersamaan dari setiap bucket ke Tablestore. Ini adalah nilai default.

  • SEQUENTIAL: menulis data secara paralel dari bucket ke Tablestore dan secara berurutan dari setiap bucket ke Tablestore.

allowDuplicatedRowInBatchRequest

Boolean

Menentukan apakah baris yang memiliki nilai kunci utama yang sama diizinkan ketika TableStoreWriter membuat permintaan penulisan batch. Nilai default: true.

Penting

Jika indeks sekunder dibuat untuk tabel data, Tablestore mengabaikan pengaturan parameter ini dan tidak mengizinkan baris yang memiliki nilai kunci utama yang sama. Dalam hal ini, TableStoreWriter menambahkan baris yang memiliki nilai kunci utama yang sama ke permintaan yang berbeda ketika TableStoreWriter membuat permintaan.

maxBatchSize

Integer

Jumlah maksimum data yang dapat ditulis ke Tablestore dalam permintaan penulisan batch. Unit: byte. Secara default, hingga 4 MB data dapat ditulis ke Tablestore dalam permintaan penulisan batch.

maxBatchRowsCount

Integer

Jumlah maksimum baris yang dapat ditulis ke Tablestore dalam permintaan penulisan batch. Nilai default: 200. Nilai maksimum: 200.

callbackThreadCount

Integer

Jumlah thread dalam thread pool yang menjalankan callback di dalam TableStoreWriter. Nilai default parameter ini adalah jumlah Prosesor.

callbackThreadPoolQueueSize

Integer

Ukuran antrian thread pool yang menjalankan callback di dalam TableStoreWriter. Nilai default: 1024.

maxColumnsCount

Integer

Jumlah maksimum kolom dalam satu baris saat data diflush ke buffer. Nilai default: 128.

maxAttrColumnSize

Integer

Ukuran maksimum nilai dari satu kolom atribut saat data diflush ke buffer. Unit: byte. Secara default, nilai setiap kolom atribut dapat mencapai hingga 2 MB.

maxPKColumnSize

Integer

Ukuran maksimum nilai dari satu kolom kunci utama saat data diflush ke buffer. Unit: byte. Secara default, nilai setiap kolom kunci utama dapat mencapai hingga 1 KB.

flushInterval

Integer

Interval di mana TableStoreWriter secara otomatis menulis data dalam buffer ke Tablestore. Nilai default: 10000. Unit: milidetik.

logInterval

Integer

Interval di mana status tugas ditampilkan secara otomatis ketika TableStoreWriter menulis data dalam buffer ke Tablestore. Nilai default: 10000. Unit: milidetik.

clientMaxConnections

Integer

Jumlah maksimum koneksi yang digunakan saat client dibangun secara internal. Nilai default: 300.

writerRetryStrategy

WriterRetryStrategy

Kebijakan retry yang digunakan saat client dibangun secara internal. Nilai yang valid:

  • CERTAIN_ERROR_CODE_NOT_RETRY: tidak melakukan retry untuk kode kesalahan tertentu dan melakukan retry untuk kode kesalahan lainnya. Ini adalah nilai default. Kode kesalahan untuk mana tidak dilakukan retry:

    • OTSParameterInvalid

    • OTSConditionCheckFail

    • OTSRequestBodyTooLarge

    • OTSInvalidPK

    • OTSOutOfColumnCountLimit

    • OTSOutOfRowSizeLimit

  • CERTAIN_ERROR_CODE_RETRY: melakukan retry untuk kode kesalahan tertentu dan tidak melakukan retry untuk kode kesalahan lainnya. Kode kesalahan untuk mana dilakukan retry:

    • OTSInternalServerError

    • OTSRequestTimeout

    • OTSPartitionUnavailable

    • OTSTableNotReady

    • OTSRowOperationConflict

    • OTSTimeout

    • OTSServerUnavailable

    • OTSServerBusy

Contoh konfigurasi

WriterConfig config = new WriterConfig();
config.setBucketCount(3);
config.setBufferSize(1024);
config.setEnableSchemaCheck(true);
config.setDispatchMode(DispatchMode.HASH_PARTITION_KEY);
config.setBatchRequestType(BatchRequestType.BATCH_WRITE_ROW);
config.setConcurrency(10);
config.setWriteMode(WriteMode.PARALLEL);
config.setAllowDuplicatedRowInBatchRequest(true);
config.setMaxBatchSize(4 * 1024 * 1024);
config.setMaxBatchRowsCount(200);
config.setCallbackThreadCount(16);
config.setCallbackThreadPoolQueueSize(1024);
config.setMaxColumnsCount(128);
config.setMaxAttrColumnSize(2 * 1024 * 1024);
config.setMaxPKColumnSize(1024);
config.setFlushInterval(10000);
config.setLogInterval(10000);
config.setClientMaxConnections(300);
config.setWriterRetryStrategy(WriterRetryStrategy.CERTAIN_ERROR_CODE_NOT_RETRY);

Lampiran 3: Fungsi callback TableStoreWriter

TableStoreWriter menggunakan callback untuk melaporkan keberhasilan atau kegagalan penulisan. Jika baris data berhasil ditulis ke Tablestore, TableStoreWriter memanggil fungsi onCompleted. Jika baris data gagal ditulis ke Tablestore, TableStoreWriter memanggil fungsi onFailed berdasarkan kategori pengecualian.

Contoh kode berikut memberikan contoh cara menggunakan callback untuk menghitung jumlah baris yang berhasil ditulis ke Tablestore dan jumlah baris yang gagal ditulis ke Tablestore:

private static AtomicLong succeedRows = new AtomicLong();
private static AtomicLong failedRows = new AtomicLong();
TableStoreCallback<RowChange, RowWriteResult> resultCallback = new TableStoreCallback<RowChange, RowWriteResult>() {
    @Override
    public void onCompleted(RowChange rowChange, RowWriteResult cc) {
        // Hitung jumlah baris yang berhasil ditulis ke Tablestore. 
        succeedRows.incrementAndGet();
    }

    @Override
    public void onFailed(RowChange rowChange, Exception ex) {
        // Hitung jumlah baris yang gagal ditulis ke Tablestore. 
        failedRows.incrementAndGet();
    }
};