All Products
Search
Document Center

Tablestore:Use TableStoreWriter to concurrently write data

Last Updated:Mar 27, 2025

This topic describes how to use TableStoreWriter to write data to Tablestore with high concurrency.

Background information

In scenarios such as logging and IoT tracing, the system generates a large amount of data in a short period of time and writes the data to databases. The databases need to provide high write concurrency and write throughput as high as tens of thousands or even millions of rows per second. However, you can use the BatchWriteRow operation of Tablestore to write only up to 200 rows in a batch.

TableStoreWriter is an easy-to-use and high-performance data import tool class provided by Tablestore SDK for Java. This class encapsulates the operations used to import data with high concurrency and high throughput. You can use TableStoreWriter to write data to Tablestore data tables with high concurrency and specify row-level callbacks and custom configurations. For more information, see Appendix 1: Working principles of TableStoreWriter.

Note

TableStoreWriter is suitable only for the Wide Column model.

Scenarios

If you have the following business requirements, you can use TableStoreWriter to write data to Tablestore in scenarios such as log storage, instant messaging, and distributed queues:

  • High throughput is required to support the high concurrency of applications.

  • No requirements exist for the write latency of a single row of data.

  • Data can be asynchronously written (the producer-consumer mode can be used).

  • The same row of data can be repeatedly written.

Prerequisites

Procedure

Step 1: Install Tablestore SDK for Java

If you use Maven to manage Java projects, add the following dependency to the pom.xml file:

<dependency>
    <groupId>com.aliyun.openservices</groupId>
    <artifactId>tablestore</artifactId>
    <version>5.17.4</version>
</dependency>                 

For information about how to install Tablestore SDK for Java, see Install Tablestore SDK for Java.

Step 2: Initialize TableStoreWriter

When you initialize TableStoreWriter, you must specify the instance and table information and the identity authentication information. You can also specify custom TableStoreWriter parameters and callback functions. We recommend that multiple threads share the same TableStoreWriter object. The following sample code provides an example on how to initialize TableStoreWriter.

For information about the parameters and callback functions supported by TableStoreWriter, see Appendix 2: TableStoreWriter parameters and Appendix 3: TableStoreWriter callback functions.
private static TableStoreWriter createTablesStoreWriter() {
    
    /**
     * In most cases, you can retain the default values for the TableStoreWriter parameters. You can also set them to other values based on your business requirements. 
     * For more information about the parameters, see Appendix 2: TableStoreWriter parameters. 
     * */
    WriterConfig config = new WriterConfig();
    // Specify the maximum number of rows that can be written to Tablestore in a batch write request. Default value: 200. 
    config.setMaxBatchRowsCount(200); 
    // Specify the maximum number of parallel requests that TableStoreWriter uses to write data in the buffer to Tablestore. Default value: 10. We recommend that you retain the default value.                          
    config.setConcurrency(10);    
            
    /**
     * Configure row-level callbacks. 
     * In this example, callbacks are configured to count the number of rows that are written to Tablestore and the number of rows that fail to be written to Tablestore. 
     * */
    TableStoreCallback<RowChange, RowWriteResult> resultCallback = new TableStoreCallback<RowChange, RowWriteResult>() {
        @Override
        public void onCompleted(RowChange rowChange, RowWriteResult cc) {
            succeedRows.incrementAndGet();
        }

        @Override
        public void onFailed(RowChange rowChange, Exception ex) {
            failedRows.incrementAndGet();
        }
    };

    /** Configure access credentials.   **/
    ServiceCredentials credentials = new DefaultCredentials(accessKeyId, accessKeySecret);

    /**
     * We recommend that you use the built-in thread pools and client, which are easier to use and isolate the initialization and release logic. 
     * */
    DefaultTableStoreWriter writer = new DefaultTableStoreWriter(
        endpoint, credentials, instanceName, tableName, config, resultCallback);

    return writer;
}

Step 3: Write data

You can construct RowChanges based on different add, delete, and modify operations, and then add the RowChanges to TableStoreWriter.

Write a single row of data at a time

The following sample code provides an example on how to write 1,000 rows of data to a data table by writing a single row of data at a time:

public void writeSingleRowWithFuture(TableStoreWriter writer) {
    System.out.println("=========================================================[Start]");
    System.out.println("Write Single Row With Future");
    int rowsCount = 1000;
    int columnsCount = 10;
    String strValue = "1234567890";
    AtomicLong rowIndex = new AtomicLong(-1);

    List<Future<WriterResult>> futures = new LinkedList<Future<WriterResult>>();
    for (long index = rowIndex.incrementAndGet(); index < rowsCount; index = rowIndex.incrementAndGet()) {

        PrimaryKey pk = PrimaryKeyBuilder.createPrimaryKeyBuilder()
                .addPrimaryKeyColumn("pk_0", PrimaryKeyValue.fromString(md5Hex(index + "")))
                .addPrimaryKeyColumn("pk_1", PrimaryKeyValue.fromString("pk" + index))
                .addPrimaryKeyColumn("pk_2", PrimaryKeyValue.fromLong(index % 5))
                .build();

        RowUpdateChange rowChange = new RowUpdateChange(tableName, pk);
        for (int j = 0; j < columnsCount; j++) {
            rowChange.put("column_" + j, ColumnValue.fromString(strValue));
        }
        rowChange.put("index", ColumnValue.fromLong(index));
        Future<WriterResult> future = writer.addRowChangeWithFuture(rowChange);
        futures.add(future);
    }

    System.out.println("Write thread finished.");
    // Flush data in the buffer to Tablestore. TableStoreWriter determines when data in the buffer is flushed to Tablestore based on the flushInterval and maxBatchSize parameters. The flushInterval parameter specifies the interval at which data in the buffer is flushed to Tablestore. The maxBatchSize parameter specifies whether to flush data in the buffer to Tablestore based on the amount of data in the buffer. 
    writer.flush();
    
    // Display the Future object. 
    // printFutureResult(futures);

    System.out.println("=========================================================[Finish]");
}

Write multiple rows of data at the same time

The following sample code provides an example on how to write 1,000 rows to a data table by writing multiple rows of data at the same time:

public void writeRowListWithFuture(TableStoreWriter writer) {
    System.out.println("=========================================================[Start]");
    System.out.println("Write Row List With Future");

    int rowsCount = 1000;
    int columnsCount = 10;
    String strValue = "1234567890";
    AtomicLong rowIndex = new AtomicLong(-1);

    List<Future<WriterResult>> futures = new LinkedList<Future<WriterResult>>();
    List<RowChange> rowChanges = new LinkedList<RowChange>();
    for (long index = rowIndex.incrementAndGet(); index < rowsCount; index = rowIndex.incrementAndGet()) {

        PrimaryKey pk = PrimaryKeyBuilder.createPrimaryKeyBuilder()
                .addPrimaryKeyColumn("pk_0", PrimaryKeyValue.fromString(md5Hex(index + "")))
                .addPrimaryKeyColumn("pk_1", PrimaryKeyValue.fromString("pk" + index))
                .addPrimaryKeyColumn("pk_2", PrimaryKeyValue.fromLong(index % 5))
                .build();

        RowUpdateChange rowChange = new RowUpdateChange(tableName, pk);
        for (int j = 0; j < columnsCount; j++) {
            rowChange.put("column_" + j, ColumnValue.fromString(strValue));
        }
        rowChange.put("index", ColumnValue.fromLong(index));
        rowChanges.add(rowChange);
        if (Math.random() > 0.995 || index == rowsCount - 1) {
            Future<WriterResult> future = writer.addRowChangeWithFuture(rowChanges);
            futures.add(future);
            rowChanges.clear();
        }
    }

    System.out.println("Write thread finished.");
    // Flush data in the buffer to Tablestore. TableStoreWriter determines when data in the buffer is flushed to Tablestore based on the flushInterval and maxBatchSize parameters. The flushInterval parameter specifies the interval at which data in the buffer is flushed to Tablestore. The maxBatchSize parameter specifies whether to flush data in the buffer to Tablestore based on the amount of data in the buffer. 
    writer.flush();
    
    // Display the Future object. 
    // printFutureResult(futures);
    
    System.out.println("=========================================================[Finish]");
}

Step 4: Shut down TableStoreWriter

We recommend that you manually shut down TableStoreWriter before you exit the application. Before you shut down TableStoreWriter, the system flushes all data in the buffer to Tablestore.

Note

If you call the addRowChange method to write data to the buffer while you are shutting down TableStoreWriter or after you shut down TableStoreWriter, the data may not be written to Tablestore.

// Proactively shut down TableStoreWriter. After all data in the queues is written to Tablestore, the system automatically shuts down the client and the internal thread pools. 
writer.close();

Complete sample code

The following sample code provides an example on how to create a data table and concurrently write data to the table:

import com.alicloud.openservices.tablestore.DefaultTableStoreWriter;
import com.alicloud.openservices.tablestore.SyncClient;
import com.alicloud.openservices.tablestore.TableStoreCallback;
import com.alicloud.openservices.tablestore.TableStoreWriter;
import com.alicloud.openservices.tablestore.core.auth.DefaultCredentials;
import com.alicloud.openservices.tablestore.core.auth.ServiceCredentials;
import com.alicloud.openservices.tablestore.model.*;
import com.alicloud.openservices.tablestore.writer.RowWriteResult;
import com.alicloud.openservices.tablestore.writer.WriterConfig;
import com.alicloud.openservices.tablestore.writer.WriterResult;

import com.aliyuncs.exceptions.ClientException;

import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;

import static org.apache.commons.codec.digest.DigestUtils.md5Hex;

public class TableStoreWriterDemo {

    // Specify the name of the instance.
    private static String instanceName = "yourInstanceName";
    // Specify the endpoint of the instance.
    private static String endpoint = "yourEndpoint";
    // Obtain the AccessKey ID and AccessKey secret from the environment variables.
    private static String accessKeyId = System.getenv("TABLESTORE_ACCESS_KEY_ID");
    private static String accessKeySecret = System.getenv("TABLESTORE_ACCESS_KEY_SECRET");
    private static String tableName = "<TABLE_NAME>";

    private static AtomicLong succeedRows = new AtomicLong();
    private static AtomicLong failedRows = new AtomicLong();

    public static void main(String[] args) throws ClientException {
        TableStoreWriterDemo sample = new TableStoreWriterDemo();

        /**
         * Make sure that the table already exists before you use TableStoreWriter. 
         * 1. TableStoreWriter verifies whether the table exists.
         * 2. Check whether the fields and field types of the data that you want to write are the same as the fields and field types of the table. 
         * */
        sample.tryCreateTable();

        /**
         * We recommend that you use DefaultTablestoreWriter to initialize TableStoreWriter. 
         * DefaultTableStoreWriter(
         *      String endpoint,                                                   // The endpoint of the instance. 
         *      ServiceCredentials credentials,                                    // The access credentials, including the AccessKey pair. Tokens are supported.
         *      String instanceName,                                               // The name of the instance. 
         *      String tableName,                                                  // The name of the table. TableStoreWriter can write data to only one table. 
         *      WriterConfig config,                                               // The configurations of TableStoreWriter. 
         *      TableStoreCallback<RowChange, RowWriteResult> resultCallback       // Row-level callbacks. 
         * )
         * */
        TableStoreWriter writer = sample.createTablesStoreWriter();

        /**
         * The method for writing a single row of data at a time is used in the Future object.
         * */
        sample.writeSingleRowWithFuture(writer);
        /**
         * The method for writing multiple rows of data at the same time is used in the Future object.
         * */   
        //sample.writeRowListWithFuture(writer);

        System.out.println("Count by TablestoreCallback: failedRow=" + failedRows.get() + ", succeedRow=" + succeedRows.get());
        System.out.println("Count by WriterStatics: " + writer.getWriterStatistics());

        /**
         * Proactively shut down TableStoreWriter. After all data in the queues is written to Tablestore, the system automatically shuts down the client and the internal thread pools. 
         * */
        writer.close();
    }

    private static TableStoreWriter createTablesStoreWriter() {

        WriterConfig config = new WriterConfig();
        // Specify the maximum number of rows that can be written to Tablestore in a batch write request. Default value: 200. If you want to write more than 200 rows of data in a batch write request, specify a greater value for this parameter. 
        config.setMaxBatchRowsCount(200); 
        // Specify the maximum number of parallel requests that TableStoreWriter uses to write data in the buffer to Tablestore. Default value: 10. We recommend that you retain the default value.                          
        config.setConcurrency(10);                                   

        /**
         * Configure row-level callbacks. 
         * In this example, callbacks are configured to count the number of rows that are written to Tablestore and the number of rows that fail to be written to Tablestore. 
         * */
        TableStoreCallback<RowChange, RowWriteResult> resultCallback = new TableStoreCallback<RowChange, RowWriteResult>() {
            @Override
            public void onCompleted(RowChange rowChange, RowWriteResult cc) {
                succeedRows.incrementAndGet();
            }

            @Override
            public void onFailed(RowChange rowChange, Exception ex) {
                failedRows.incrementAndGet();
            }
        };

        ServiceCredentials credentials = new DefaultCredentials(accessKeyId, accessKeySecret);


        /**
         * We recommend that you use the built-in thread pools and client, which are easier to use and isolate the initialization and release logic. 
         * */
        DefaultTableStoreWriter writer = new DefaultTableStoreWriter(
                endpoint, credentials, instanceName, tableName, config, resultCallback);

        return writer;
    }


    private static void tryCreateTable() throws ClientException {
        SyncClient ots = new SyncClient(endpoint, accessKeyId, accessKeySecret, instanceName);

        try {
            ots.deleteTable(new DeleteTableRequest(tableName));
        } catch (Exception e) {
        }

        TableMeta tableMeta = new TableMeta(tableName);
        tableMeta.addPrimaryKeyColumn("pk_0", PrimaryKeyType.STRING);
        tableMeta.addPrimaryKeyColumn("pk_1", PrimaryKeyType.STRING);
        tableMeta.addPrimaryKeyColumn("pk_2", PrimaryKeyType.INTEGER);
        TableOptions tableOptions = new TableOptions(-1, 1);
        CreateTableRequest request = new CreateTableRequest(
                tableMeta, tableOptions, new ReservedThroughput(new CapacityUnit(0, 0)));

        try {
            CreateTableResponse res = ots.createTable(request);
        } catch (Exception e) {
            throw new ClientException(e);
        } finally {
            ots.shutdown();
        }
    }

    public static void writeSingleRowWithFuture(TableStoreWriter writer) {
        System.out.println("=========================================================[Start]");
        System.out.println("Write Single Row With Future");
        int rowsCount = 1000;
        int columnsCount = 10;
        String strValue = "1234567890";
        AtomicLong rowIndex = new AtomicLong(-1);

        List<Future<WriterResult>> futures = new LinkedList<Future<WriterResult>>();
        for (long index = rowIndex.incrementAndGet(); index < rowsCount; index = rowIndex.incrementAndGet()) {

            PrimaryKey pk = PrimaryKeyBuilder.createPrimaryKeyBuilder()
                .addPrimaryKeyColumn("pk_0", PrimaryKeyValue.fromString(md5Hex(index + "")))
                .addPrimaryKeyColumn("pk_1", PrimaryKeyValue.fromString("pk" + index))
                .addPrimaryKeyColumn("pk_2", PrimaryKeyValue.fromLong(index % 5))
                .build();

            RowUpdateChange rowChange = new RowUpdateChange(tableName, pk);
            for (int j = 0; j < columnsCount; j++) {
                rowChange.put("column_" + j, ColumnValue.fromString(strValue));
            }
            rowChange.put("index", ColumnValue.fromLong(index));
            Future<WriterResult> future = writer.addRowChangeWithFuture(rowChange);
            futures.add(future);
        }

        System.out.println("Write thread finished.");
        writer.flush();
        // Display the Future object. 
        // printFutureResult(futures);

        System.out.println("=========================================================[Finish]");
    }
    
    public void writeRowListWithFuture(TableStoreWriter writer) {
        System.out.println("=========================================================[Start]");
        System.out.println("Write Row List With Future");

        int rowsCount = 1000;
        int columnsCount = 10;
        String strValue = "1234567890";
        AtomicLong rowIndex = new AtomicLong(-1);

        List<Future<WriterResult>> futures = new LinkedList<Future<WriterResult>>();
        List<RowChange> rowChanges = new LinkedList<RowChange>();
        for (long index = rowIndex.incrementAndGet(); index < rowsCount; index = rowIndex.incrementAndGet()) {

            PrimaryKey pk = PrimaryKeyBuilder.createPrimaryKeyBuilder()
                    .addPrimaryKeyColumn("pk_0", PrimaryKeyValue.fromString(md5Hex(index + "")))
                    .addPrimaryKeyColumn("pk_1", PrimaryKeyValue.fromString("pk" + index))
                    .addPrimaryKeyColumn("pk_2", PrimaryKeyValue.fromLong(index % 5))
                    .build();

            RowUpdateChange rowChange = new RowUpdateChange(tableName, pk);
            for (int j = 0; j < columnsCount; j++) {
                rowChange.put("column_" + j, ColumnValue.fromString(strValue));
            }
            rowChange.put("index", ColumnValue.fromLong(index));
            rowChanges.add(rowChange);
            if (Math.random() > 0.995 || index == rowsCount - 1) {
                Future<WriterResult> future = writer.addRowChangeWithFuture(rowChanges);
                futures.add(future);
                rowChanges.clear();
            }
    }

    System.out.println("Write thread finished.");
    writer.flush();
    // Display the Future object. 
    // printFutureResult(futures);
    System.out.println("=========================================================[Finish]");
    }


    private static void printFutureResult(List<Future<WriterResult>> futures) {
        int totalRow = 0;

        for (int index = 0; index < futures.size(); index++) {
            try {
                WriterResult result = futures.get(index).get();
                totalRow += result.getTotalCount();
                System.out.println(String.format(
                        "Future[%d] finished:\tfailed: %d\tsucceed: %d\tfutureBatch: %d\ttotalFinished: %d",
                        index, result.getFailedRows().size(), result.getSucceedRows().size(),
                        result.getTotalCount(), totalRow));

            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
    }
}

In this example, the following result is returned:

=========================================================[Start]
Write Single Row With Future
Write thread finished.
=========================================================[Finish]
Count by TablestoreCallback: failedRow=0, succeedRow=1000
Count by WriterStatics: WriterStatistics: {
    totalRequestCount=6,
    totalRowsCount=1000,
    totalSucceedRowsCount=1000,
    totalFailedRowsCount=0,
    totalSingleRowRequestCount=0,
}

FAQ

What do I do if the error "The count of attribute columns exceeds the maximum:128: is reported when I use Tablestore SDK for Java to write data to a Tablestore data table?

Appendixes

Appendix 1: Working principles of TableStoreWriter

TableStoreWriter is a tool class re-encapsulated based on the operations at the SDK processing layer. TableStoreWriter depends on the following operations:

  • AsyncClient, used to initialize TableStoreWriter.

  • BatchWriteRow, used to import data to Tablestore with TableStoreWriter.

  • RetryStrategy, used for retries by TableStoreWriter when a write failure occurs.

The following figure shows the code hierarchy architecture.

EN

TableStoreWriter is optimized in terms of performance and ease of use and provides the following features:

  • Asynchronous operations: Fewer threads are used to provide higher concurrency.

  • Automatic data aggregation: A buffer queue is used in the memory to maximize the number of write requests sent to Tablestore at the same time. This improves the write throughput.

  • Producer-consumer mode: This mode facilitates asynchronous processing and data aggregation.

  • High-performance data exchange queues: Disruptor RingBuffer provides better performance in multi-producer and single-consumer mode.

  • Masking of complex request encapsulation: The details of calling the BatchWriteRow operation are masked. Dirty data is filtered out by using pre-checks, and the request limits, such as the limit on the number or size of rows that can be imported in a batch, are automatically processed. Dirty data refers to rows whose schema is different from that of the table, whose size exceeds the upper limit, or in which the number of columns exceeds the upper limit.

  • Row-level callbacks: Compared with the request-level callbacks provided by Tablestore SDK for Java, the row-level callbacks provided by TableStoreWriter allow the business logic to process data at the row level.

  • Row-level retries: If request-level retries fail, row-level retries are performed for specific error codes to ensure the highest possible write success rate.

The following figure shows the working mechanisms and encapsulation details of TableStoreWriter.

EN2

Appendix 2: TableStoreWriter parameters

When you initialize TableStoreWriter, you can modify WriterConfig to specify TableStoreWriter parameters based on your business requirements.

Parameter

Type

Description

bucketCount

Integer

The number of buckets in TableStoreWriter. Default value: 3. A bucket is equivalent to a buffer used to cache data. You can specify this parameter to increase the number of parallel sequential write requests. If the machine bottleneck is not reached, the number of buckets is positively correlated with the write rate.

Note

If the write mode of a bucket is concurrent write, retain the default value.

bufferSize

Integer

The maximum number of rows that can be included in the buffer queue in the memory. Default value: 1024. The value of this parameter must be an exponential multiple of 2.

enableSchemaCheck

Boolean

Specifies whether to check the schema when data is flushed to the buffer. Default value: true.

  • A value of true specifies that before the row data is flushed to the buffer, TableStoreWriter performs the following checks on the row data. If the row data fails the preceding checks, TableStoreWriter considers the row data as dirty data. Dirty data is not flushed to the buffer.

    • Check whether the schema of the primary key of the row is the same as that of the table.

    • Check whether the value size of each primary key column or attribute column in the row exceeds the upper limit.

    • Check whether the number of attribute columns in the row exceeds the upper limit.

    • Check whether the name of an attribute column is the same as that of a primary key column in the row.

    • Check whether the size of the row exceeds the maximum amount of data that can be imported at the same time by using a request.

  • A value of false specifies that if specific row data in the buffer is dirty data, TableStoreWriter cannot write the row data to Tablestore.

dispatchMode

DispatchMode

The mode in which data is dispatched to buckets when data is flushed to the buffer. Valid values:

  • HASH_PARTITION_KEY: dispatches data to buckets based on the hash value of the partition key. Data whose partition key has the same hash value is sequentially written to the same bucket. This is the default value.

  • HASH_PRIMARY_KEY: dispatches data to buckets based on the hash value of the primary key. Data whose primary key has the same hash value is sequentially written to the same bucket.

  • ROUND_ROBIN: traverses each bucket to dispatch data in a loop. Data is randomly dispatched to different buckets.

Important

This parameter takes effect only if the number of buckets is greater than or equal to 2.

batchRequestType

BatchRequestType

The type of the request that TableStoreWriter uses to write data in the buffer to Tablestore. Valid values:

  • BATCH_WRITE_ROW: BatchWriteRowRequest. This is the default value.

  • BULK_IMPORT: BulkImportRequest.

concurrency

Integer

The maximum number of parallel requests that TableStoreWriter uses to write data in the buffer to Tablestore. Default value: 10.

writeMode

WriteMode

The mode in which data in buckets is written to Tablestore when TableStoreWriter writes data in the buffer to Tablestore. Valid values:

  • PARALLEL: writes data in parallel from buckets to Tablestore and concurrently from each bucket to Tablestore. This is the default value.

  • SEQUENTIAL: writes data in parallel from buckets to Tablestore and sequentially from each bucket to Tablestore.

allowDuplicatedRowInBatchRequest

Boolean

Specifies whether rows that have the same primary key value are allowed when TableStoreWriter creates a batch write request. Default value: true.

Important

If a secondary index is created for the data table, Tablestore ignores the setting of this parameter and does not allow rows that have the same primary key value. In this case, TableStoreWriter adds the rows that have the same primary key value to different requests when TableStoreWriter creates requests.

maxBatchSize

Integer

The maximum amount of data that can be written to Tablestore in a batch write request. Unit: bytes. By default, up to 4 MB of data can be written to Tablestore in a batch write request.

maxBatchRowsCount

Integer

The maximum number of rows that can be written to Tablestore in a batch write request. Default value: 200. Maximum value: 200.

callbackThreadCount

Integer

The number of threads in the thread pool that runs callbacks within TableStoreWriter. The default value of this parameter is the number of processors.

callbackThreadPoolQueueSize

Integer

The queue size of the thread pool that runs callbacks within TableStoreWriter. Default value: 1024.

maxColumnsCount

Integer

The maximum number of columns in a row when data is flushed to the buffer. Default value: 128.

maxAttrColumnSize

Integer

The maximum size of the value of a single attribute column when data is flushed to the buffer. Unit: bytes. By default, the value of each attribute column can be up to 2 MB in size.

maxPKColumnSize

Integer

The maximum size of the value of a single primary key column when data is flushed to the buffer. Unit: bytes. By default, the value of each primary key column can be up to 1 KB in size.

flushInterval

Integer

The interval at which TableStoreWriter automatically writes data in the buffer to Tablestore. Default value: 10000. Unit: milliseconds.

logInterval

Integer

The interval at which the task status is automatically displayed when TableStoreWriter writes data in the buffer to Tablestore. Default value: 10000. Unit: milliseconds.

clientMaxConnections

Integer

The maximum number of connections that are used when the client is built internally. Default value: 300.

writerRetryStrategy

WriterRetryStrategy

The retry policy that is used when the client is built internally. Valid values:

  • CERTAIN_ERROR_CODE_NOT_RETRY: does not perform retries for specific error codes and performs retries for other error codes. This is the default value. Error codes for which no retries are performed:

    • OTSParameterInvalid

    • OTSConditionCheckFail

    • OTSRequestBodyTooLarge

    • OTSInvalidPK

    • OTSOutOfColumnCountLimit

    • OTSOutOfRowSizeLimit

  • CERTAIN_ERROR_CODE_RETRY: performs retries for specific error codes and does not perform retries for other error codes. Error codes for which retries are performed:

    • OTSInternalServerError

    • OTSRequestTimeout

    • OTSPartitionUnavailable

    • OTSTableNotReady

    • OTSRowOperationConflict

    • OTSTimeout

    • OTSServerUnavailable

    • OTSServerBusy

Configuration example

WriterConfig config = new WriterConfig();
config.setBucketCount(3);
config.setBufferSize(1024);
config.setEnableSchemaCheck(true);
config.setDispatchMode(DispatchMode.HASH_PARTITION_KEY);
config.setBatchRequestType(BatchRequestType.BATCH_WRITE_ROW);
config.setConcurrency(10);
config.setWriteMode(WriteMode.PARALLEL);
config.setAllowDuplicatedRowInBatchRequest(true);
config.setMaxBatchSize(4 * 1024 * 1024);
config.setMaxBatchRowsCount(200);
config.setCallbackThreadCount(16);
config.setCallbackThreadPoolQueueSize(1024);
config.setMaxColumnsCount(128);
config.setMaxAttrColumnSize(2 * 1024 * 1024);
config.setMaxPKColumnSize(1024);
config.setFlushInterval(10000);
config.setLogInterval(10000);
config.setClientMaxConnections(300);
config.setWriterRetryStrategy(WriterRetryStrategy.CERTAIN_ERROR_CODE_NOT_RETRY);

Appendix 3: TableStoreWriter callback functions

TableStoreWriter uses callbacks to report write successes or failures. If a row of data is written to Tablestore, TableStoreWriter invokes the onCompleted function. If a row of data fails to be written to Tablestore, TableStoreWriter invokes the onFailed function based on the category of the exception.

The following sample code provides an example on how to use callbacks to count the number of rows that are written to Tablestore and the number of rows that fail to be written to Tablestore:

private static AtomicLong succeedRows = new AtomicLong();
private static AtomicLong failedRows = new AtomicLong();
TableStoreCallback<RowChange, RowWriteResult> resultCallback = new TableStoreCallback<RowChange, RowWriteResult>() {
    @Override
    public void onCompleted(RowChange rowChange, RowWriteResult cc) {
        // Count the number of rows that are written to Tablestore. 
        succeedRows.incrementAndGet();
    }

    @Override
    public void onFailed(RowChange rowChange, Exception ex) {
        // Count the number of rows that fail to be written to Tablestore. 
        failedRows.incrementAndGet();
    }
};