This topic describes how to use TableStoreWriter to write data to Tablestore with high concurrency.
Background information
In scenarios such as logging and IoT tracing, the system generates a large amount of data in a short period of time and writes the data to databases. The databases need to provide high write concurrency and write throughput as high as tens of thousands or even millions of rows per second. However, you can use the BatchWriteRow operation of Tablestore to write only up to 200 rows in a batch.
TableStoreWriter is an easy-to-use and high-performance data import tool class provided by Tablestore SDK for Java. This class encapsulates the operations used to import data with high concurrency and high throughput. You can use TableStoreWriter to write data to Tablestore data tables with high concurrency and specify row-level callbacks and custom configurations. For more information, see Appendix 1: Working principles of TableStoreWriter.
TableStoreWriter is suitable only for the Wide Column model.
Scenarios
If you have the following business requirements, you can use TableStoreWriter to write data to Tablestore in scenarios such as log storage, instant messaging, and distributed queues:
High throughput is required to support the high concurrency of applications.
No requirements exist for the write latency of a single row of data.
Data can be asynchronously written (the producer-consumer mode can be used).
The same row of data can be repeatedly written.
Prerequisites
Tablestore is activated and an instance is created. For more information, see Activate Tablestore and create an instance.
An AccessKey pair of your Alibaba Cloud account or a Resource Access Management (RAM) user is obtained. For more information, see Create an AccessKey pair.
Procedure
Step 1: Install Tablestore SDK for Java
If you use Maven to manage Java projects, add the following dependency to the pom.xml file:
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>tablestore</artifactId>
<version>5.17.4</version>
</dependency>
For information about how to install Tablestore SDK for Java, see Install Tablestore SDK for Java.
Step 2: Initialize TableStoreWriter
When you initialize TableStoreWriter, you must specify the instance and table information and the identity authentication information. You can also specify custom TableStoreWriter parameters and callback functions. We recommend that multiple threads share the same TableStoreWriter object. The following sample code provides an example on how to initialize TableStoreWriter.
For information about the parameters and callback functions supported by TableStoreWriter, see Appendix 2: TableStoreWriter parameters and Appendix 3: TableStoreWriter callback functions.
private static TableStoreWriter createTablesStoreWriter() {
/**
* In most cases, you can retain the default values for the TableStoreWriter parameters. You can also set them to other values based on your business requirements.
* For more information about the parameters, see Appendix 2: TableStoreWriter parameters.
* */
WriterConfig config = new WriterConfig();
// Specify the maximum number of rows that can be written to Tablestore in a batch write request. Default value: 200.
config.setMaxBatchRowsCount(200);
// Specify the maximum number of parallel requests that TableStoreWriter uses to write data in the buffer to Tablestore. Default value: 10. We recommend that you retain the default value.
config.setConcurrency(10);
/**
* Configure row-level callbacks.
* In this example, callbacks are configured to count the number of rows that are written to Tablestore and the number of rows that fail to be written to Tablestore.
* */
TableStoreCallback<RowChange, RowWriteResult> resultCallback = new TableStoreCallback<RowChange, RowWriteResult>() {
@Override
public void onCompleted(RowChange rowChange, RowWriteResult cc) {
succeedRows.incrementAndGet();
}
@Override
public void onFailed(RowChange rowChange, Exception ex) {
failedRows.incrementAndGet();
}
};
/** Configure access credentials. **/
ServiceCredentials credentials = new DefaultCredentials(accessKeyId, accessKeySecret);
/**
* We recommend that you use the built-in thread pools and client, which are easier to use and isolate the initialization and release logic.
* */
DefaultTableStoreWriter writer = new DefaultTableStoreWriter(
endpoint, credentials, instanceName, tableName, config, resultCallback);
return writer;
}
Step 3: Write data
You can construct RowChanges based on different add, delete, and modify operations, and then add the RowChanges to TableStoreWriter.
Write a single row of data at a time
The following sample code provides an example on how to write 1,000 rows of data to a data table by writing a single row of data at a time:
public void writeSingleRowWithFuture(TableStoreWriter writer) {
System.out.println("=========================================================[Start]");
System.out.println("Write Single Row With Future");
int rowsCount = 1000;
int columnsCount = 10;
String strValue = "1234567890";
AtomicLong rowIndex = new AtomicLong(-1);
List<Future<WriterResult>> futures = new LinkedList<Future<WriterResult>>();
for (long index = rowIndex.incrementAndGet(); index < rowsCount; index = rowIndex.incrementAndGet()) {
PrimaryKey pk = PrimaryKeyBuilder.createPrimaryKeyBuilder()
.addPrimaryKeyColumn("pk_0", PrimaryKeyValue.fromString(md5Hex(index + "")))
.addPrimaryKeyColumn("pk_1", PrimaryKeyValue.fromString("pk" + index))
.addPrimaryKeyColumn("pk_2", PrimaryKeyValue.fromLong(index % 5))
.build();
RowUpdateChange rowChange = new RowUpdateChange(tableName, pk);
for (int j = 0; j < columnsCount; j++) {
rowChange.put("column_" + j, ColumnValue.fromString(strValue));
}
rowChange.put("index", ColumnValue.fromLong(index));
Future<WriterResult> future = writer.addRowChangeWithFuture(rowChange);
futures.add(future);
}
System.out.println("Write thread finished.");
// Flush data in the buffer to Tablestore. TableStoreWriter determines when data in the buffer is flushed to Tablestore based on the flushInterval and maxBatchSize parameters. The flushInterval parameter specifies the interval at which data in the buffer is flushed to Tablestore. The maxBatchSize parameter specifies whether to flush data in the buffer to Tablestore based on the amount of data in the buffer.
writer.flush();
// Display the Future object.
// printFutureResult(futures);
System.out.println("=========================================================[Finish]");
}
Write multiple rows of data at the same time
The following sample code provides an example on how to write 1,000 rows to a data table by writing multiple rows of data at the same time:
public void writeRowListWithFuture(TableStoreWriter writer) {
System.out.println("=========================================================[Start]");
System.out.println("Write Row List With Future");
int rowsCount = 1000;
int columnsCount = 10;
String strValue = "1234567890";
AtomicLong rowIndex = new AtomicLong(-1);
List<Future<WriterResult>> futures = new LinkedList<Future<WriterResult>>();
List<RowChange> rowChanges = new LinkedList<RowChange>();
for (long index = rowIndex.incrementAndGet(); index < rowsCount; index = rowIndex.incrementAndGet()) {
PrimaryKey pk = PrimaryKeyBuilder.createPrimaryKeyBuilder()
.addPrimaryKeyColumn("pk_0", PrimaryKeyValue.fromString(md5Hex(index + "")))
.addPrimaryKeyColumn("pk_1", PrimaryKeyValue.fromString("pk" + index))
.addPrimaryKeyColumn("pk_2", PrimaryKeyValue.fromLong(index % 5))
.build();
RowUpdateChange rowChange = new RowUpdateChange(tableName, pk);
for (int j = 0; j < columnsCount; j++) {
rowChange.put("column_" + j, ColumnValue.fromString(strValue));
}
rowChange.put("index", ColumnValue.fromLong(index));
rowChanges.add(rowChange);
if (Math.random() > 0.995 || index == rowsCount - 1) {
Future<WriterResult> future = writer.addRowChangeWithFuture(rowChanges);
futures.add(future);
rowChanges.clear();
}
}
System.out.println("Write thread finished.");
// Flush data in the buffer to Tablestore. TableStoreWriter determines when data in the buffer is flushed to Tablestore based on the flushInterval and maxBatchSize parameters. The flushInterval parameter specifies the interval at which data in the buffer is flushed to Tablestore. The maxBatchSize parameter specifies whether to flush data in the buffer to Tablestore based on the amount of data in the buffer.
writer.flush();
// Display the Future object.
// printFutureResult(futures);
System.out.println("=========================================================[Finish]");
}
Step 4: Shut down TableStoreWriter
We recommend that you manually shut down TableStoreWriter before you exit the application. Before you shut down TableStoreWriter, the system flushes all data in the buffer to Tablestore.
If you call the addRowChange method to write data to the buffer while you are shutting down TableStoreWriter or after you shut down TableStoreWriter, the data may not be written to Tablestore.
// Proactively shut down TableStoreWriter. After all data in the queues is written to Tablestore, the system automatically shuts down the client and the internal thread pools.
writer.close();
Complete sample code
The following sample code provides an example on how to create a data table and concurrently write data to the table:
import com.alicloud.openservices.tablestore.DefaultTableStoreWriter;
import com.alicloud.openservices.tablestore.SyncClient;
import com.alicloud.openservices.tablestore.TableStoreCallback;
import com.alicloud.openservices.tablestore.TableStoreWriter;
import com.alicloud.openservices.tablestore.core.auth.DefaultCredentials;
import com.alicloud.openservices.tablestore.core.auth.ServiceCredentials;
import com.alicloud.openservices.tablestore.model.*;
import com.alicloud.openservices.tablestore.writer.RowWriteResult;
import com.alicloud.openservices.tablestore.writer.WriterConfig;
import com.alicloud.openservices.tablestore.writer.WriterResult;
import com.aliyuncs.exceptions.ClientException;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
import static org.apache.commons.codec.digest.DigestUtils.md5Hex;
public class TableStoreWriterDemo {
// Specify the name of the instance.
private static String instanceName = "yourInstanceName";
// Specify the endpoint of the instance.
private static String endpoint = "yourEndpoint";
// Obtain the AccessKey ID and AccessKey secret from the environment variables.
private static String accessKeyId = System.getenv("TABLESTORE_ACCESS_KEY_ID");
private static String accessKeySecret = System.getenv("TABLESTORE_ACCESS_KEY_SECRET");
private static String tableName = "<TABLE_NAME>";
private static AtomicLong succeedRows = new AtomicLong();
private static AtomicLong failedRows = new AtomicLong();
public static void main(String[] args) throws ClientException {
TableStoreWriterDemo sample = new TableStoreWriterDemo();
/**
* Make sure that the table already exists before you use TableStoreWriter.
* 1. TableStoreWriter verifies whether the table exists.
* 2. Check whether the fields and field types of the data that you want to write are the same as the fields and field types of the table.
* */
sample.tryCreateTable();
/**
* We recommend that you use DefaultTablestoreWriter to initialize TableStoreWriter.
* DefaultTableStoreWriter(
* String endpoint, // The endpoint of the instance.
* ServiceCredentials credentials, // The access credentials, including the AccessKey pair. Tokens are supported.
* String instanceName, // The name of the instance.
* String tableName, // The name of the table. TableStoreWriter can write data to only one table.
* WriterConfig config, // The configurations of TableStoreWriter.
* TableStoreCallback<RowChange, RowWriteResult> resultCallback // Row-level callbacks.
* )
* */
TableStoreWriter writer = sample.createTablesStoreWriter();
/**
* The method for writing a single row of data at a time is used in the Future object.
* */
sample.writeSingleRowWithFuture(writer);
/**
* The method for writing multiple rows of data at the same time is used in the Future object.
* */
//sample.writeRowListWithFuture(writer);
System.out.println("Count by TablestoreCallback: failedRow=" + failedRows.get() + ", succeedRow=" + succeedRows.get());
System.out.println("Count by WriterStatics: " + writer.getWriterStatistics());
/**
* Proactively shut down TableStoreWriter. After all data in the queues is written to Tablestore, the system automatically shuts down the client and the internal thread pools.
* */
writer.close();
}
private static TableStoreWriter createTablesStoreWriter() {
WriterConfig config = new WriterConfig();
// Specify the maximum number of rows that can be written to Tablestore in a batch write request. Default value: 200. If you want to write more than 200 rows of data in a batch write request, specify a greater value for this parameter.
config.setMaxBatchRowsCount(200);
// Specify the maximum number of parallel requests that TableStoreWriter uses to write data in the buffer to Tablestore. Default value: 10. We recommend that you retain the default value.
config.setConcurrency(10);
/**
* Configure row-level callbacks.
* In this example, callbacks are configured to count the number of rows that are written to Tablestore and the number of rows that fail to be written to Tablestore.
* */
TableStoreCallback<RowChange, RowWriteResult> resultCallback = new TableStoreCallback<RowChange, RowWriteResult>() {
@Override
public void onCompleted(RowChange rowChange, RowWriteResult cc) {
succeedRows.incrementAndGet();
}
@Override
public void onFailed(RowChange rowChange, Exception ex) {
failedRows.incrementAndGet();
}
};
ServiceCredentials credentials = new DefaultCredentials(accessKeyId, accessKeySecret);
/**
* We recommend that you use the built-in thread pools and client, which are easier to use and isolate the initialization and release logic.
* */
DefaultTableStoreWriter writer = new DefaultTableStoreWriter(
endpoint, credentials, instanceName, tableName, config, resultCallback);
return writer;
}
private static void tryCreateTable() throws ClientException {
SyncClient ots = new SyncClient(endpoint, accessKeyId, accessKeySecret, instanceName);
try {
ots.deleteTable(new DeleteTableRequest(tableName));
} catch (Exception e) {
}
TableMeta tableMeta = new TableMeta(tableName);
tableMeta.addPrimaryKeyColumn("pk_0", PrimaryKeyType.STRING);
tableMeta.addPrimaryKeyColumn("pk_1", PrimaryKeyType.STRING);
tableMeta.addPrimaryKeyColumn("pk_2", PrimaryKeyType.INTEGER);
TableOptions tableOptions = new TableOptions(-1, 1);
CreateTableRequest request = new CreateTableRequest(
tableMeta, tableOptions, new ReservedThroughput(new CapacityUnit(0, 0)));
try {
CreateTableResponse res = ots.createTable(request);
} catch (Exception e) {
throw new ClientException(e);
} finally {
ots.shutdown();
}
}
public static void writeSingleRowWithFuture(TableStoreWriter writer) {
System.out.println("=========================================================[Start]");
System.out.println("Write Single Row With Future");
int rowsCount = 1000;
int columnsCount = 10;
String strValue = "1234567890";
AtomicLong rowIndex = new AtomicLong(-1);
List<Future<WriterResult>> futures = new LinkedList<Future<WriterResult>>();
for (long index = rowIndex.incrementAndGet(); index < rowsCount; index = rowIndex.incrementAndGet()) {
PrimaryKey pk = PrimaryKeyBuilder.createPrimaryKeyBuilder()
.addPrimaryKeyColumn("pk_0", PrimaryKeyValue.fromString(md5Hex(index + "")))
.addPrimaryKeyColumn("pk_1", PrimaryKeyValue.fromString("pk" + index))
.addPrimaryKeyColumn("pk_2", PrimaryKeyValue.fromLong(index % 5))
.build();
RowUpdateChange rowChange = new RowUpdateChange(tableName, pk);
for (int j = 0; j < columnsCount; j++) {
rowChange.put("column_" + j, ColumnValue.fromString(strValue));
}
rowChange.put("index", ColumnValue.fromLong(index));
Future<WriterResult> future = writer.addRowChangeWithFuture(rowChange);
futures.add(future);
}
System.out.println("Write thread finished.");
writer.flush();
// Display the Future object.
// printFutureResult(futures);
System.out.println("=========================================================[Finish]");
}
public void writeRowListWithFuture(TableStoreWriter writer) {
System.out.println("=========================================================[Start]");
System.out.println("Write Row List With Future");
int rowsCount = 1000;
int columnsCount = 10;
String strValue = "1234567890";
AtomicLong rowIndex = new AtomicLong(-1);
List<Future<WriterResult>> futures = new LinkedList<Future<WriterResult>>();
List<RowChange> rowChanges = new LinkedList<RowChange>();
for (long index = rowIndex.incrementAndGet(); index < rowsCount; index = rowIndex.incrementAndGet()) {
PrimaryKey pk = PrimaryKeyBuilder.createPrimaryKeyBuilder()
.addPrimaryKeyColumn("pk_0", PrimaryKeyValue.fromString(md5Hex(index + "")))
.addPrimaryKeyColumn("pk_1", PrimaryKeyValue.fromString("pk" + index))
.addPrimaryKeyColumn("pk_2", PrimaryKeyValue.fromLong(index % 5))
.build();
RowUpdateChange rowChange = new RowUpdateChange(tableName, pk);
for (int j = 0; j < columnsCount; j++) {
rowChange.put("column_" + j, ColumnValue.fromString(strValue));
}
rowChange.put("index", ColumnValue.fromLong(index));
rowChanges.add(rowChange);
if (Math.random() > 0.995 || index == rowsCount - 1) {
Future<WriterResult> future = writer.addRowChangeWithFuture(rowChanges);
futures.add(future);
rowChanges.clear();
}
}
System.out.println("Write thread finished.");
writer.flush();
// Display the Future object.
// printFutureResult(futures);
System.out.println("=========================================================[Finish]");
}
private static void printFutureResult(List<Future<WriterResult>> futures) {
int totalRow = 0;
for (int index = 0; index < futures.size(); index++) {
try {
WriterResult result = futures.get(index).get();
totalRow += result.getTotalCount();
System.out.println(String.format(
"Future[%d] finished:\tfailed: %d\tsucceed: %d\tfutureBatch: %d\ttotalFinished: %d",
index, result.getFailedRows().size(), result.getSucceedRows().size(),
result.getTotalCount(), totalRow));
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
}
In this example, the following result is returned:
=========================================================[Start]
Write Single Row With Future
Write thread finished.
=========================================================[Finish]
Count by TablestoreCallback: failedRow=0, succeedRow=1000
Count by WriterStatics: WriterStatistics: {
totalRequestCount=6,
totalRowsCount=1000,
totalSucceedRowsCount=1000,
totalFailedRowsCount=0,
totalSingleRowRequestCount=0,
}
FAQ
Appendixes
Appendix 1: Working principles of TableStoreWriter
TableStoreWriter is a tool class re-encapsulated based on the operations at the SDK processing layer. TableStoreWriter depends on the following operations:
AsyncClient, used to initialize TableStoreWriter.
BatchWriteRow, used to import data to Tablestore with TableStoreWriter.
RetryStrategy, used for retries by TableStoreWriter when a write failure occurs.
The following figure shows the code hierarchy architecture.
TableStoreWriter is optimized in terms of performance and ease of use and provides the following features:
Asynchronous operations: Fewer threads are used to provide higher concurrency.
Automatic data aggregation: A buffer queue is used in the memory to maximize the number of write requests sent to Tablestore at the same time. This improves the write throughput.
Producer-consumer mode: This mode facilitates asynchronous processing and data aggregation.
High-performance data exchange queues: Disruptor RingBuffer provides better performance in multi-producer and single-consumer mode.
Masking of complex request encapsulation: The details of calling the BatchWriteRow operation are masked. Dirty data is filtered out by using pre-checks, and the request limits, such as the limit on the number or size of rows that can be imported in a batch, are automatically processed. Dirty data refers to rows whose schema is different from that of the table, whose size exceeds the upper limit, or in which the number of columns exceeds the upper limit.
Row-level callbacks: Compared with the request-level callbacks provided by Tablestore SDK for Java, the row-level callbacks provided by TableStoreWriter allow the business logic to process data at the row level.
Row-level retries: If request-level retries fail, row-level retries are performed for specific error codes to ensure the highest possible write success rate.
The following figure shows the working mechanisms and encapsulation details of TableStoreWriter.
Appendix 2: TableStoreWriter parameters
When you initialize TableStoreWriter, you can modify WriterConfig
to specify TableStoreWriter parameters based on your business requirements.
Parameter | Type | Description |
bucketCount | Integer | The number of buckets in TableStoreWriter. Default value: 3. A bucket is equivalent to a buffer used to cache data. You can specify this parameter to increase the number of parallel sequential write requests. If the machine bottleneck is not reached, the number of buckets is positively correlated with the write rate. Note If the write mode of a bucket is concurrent write, retain the default value. |
bufferSize | Integer | The maximum number of rows that can be included in the buffer queue in the memory. Default value: 1024. The value of this parameter must be an exponential multiple of 2. |
enableSchemaCheck | Boolean | Specifies whether to check the schema when data is flushed to the buffer. Default value: true.
|
dispatchMode | The mode in which data is dispatched to buckets when data is flushed to the buffer. Valid values:
Important This parameter takes effect only if the number of buckets is greater than or equal to 2. | |
batchRequestType | The type of the request that TableStoreWriter uses to write data in the buffer to Tablestore. Valid values:
| |
concurrency | Integer | The maximum number of parallel requests that TableStoreWriter uses to write data in the buffer to Tablestore. Default value: 10. |
writeMode | The mode in which data in buckets is written to Tablestore when TableStoreWriter writes data in the buffer to Tablestore. Valid values:
| |
allowDuplicatedRowInBatchRequest | Boolean | Specifies whether rows that have the same primary key value are allowed when TableStoreWriter creates a batch write request. Default value: true. Important If a secondary index is created for the data table, Tablestore ignores the setting of this parameter and does not allow rows that have the same primary key value. In this case, TableStoreWriter adds the rows that have the same primary key value to different requests when TableStoreWriter creates requests. |
maxBatchSize | Integer | The maximum amount of data that can be written to Tablestore in a batch write request. Unit: bytes. By default, up to 4 MB of data can be written to Tablestore in a batch write request. |
maxBatchRowsCount | Integer | The maximum number of rows that can be written to Tablestore in a batch write request. Default value: 200. Maximum value: 200. |
callbackThreadCount | Integer | The number of threads in the thread pool that runs callbacks within TableStoreWriter. The default value of this parameter is the number of processors. |
callbackThreadPoolQueueSize | Integer | The queue size of the thread pool that runs callbacks within TableStoreWriter. Default value: 1024. |
maxColumnsCount | Integer | The maximum number of columns in a row when data is flushed to the buffer. Default value: 128. |
maxAttrColumnSize | Integer | The maximum size of the value of a single attribute column when data is flushed to the buffer. Unit: bytes. By default, the value of each attribute column can be up to 2 MB in size. |
maxPKColumnSize | Integer | The maximum size of the value of a single primary key column when data is flushed to the buffer. Unit: bytes. By default, the value of each primary key column can be up to 1 KB in size. |
flushInterval | Integer | The interval at which TableStoreWriter automatically writes data in the buffer to Tablestore. Default value: 10000. Unit: milliseconds. |
logInterval | Integer | The interval at which the task status is automatically displayed when TableStoreWriter writes data in the buffer to Tablestore. Default value: 10000. Unit: milliseconds. |
clientMaxConnections | Integer | The maximum number of connections that are used when the client is built internally. Default value: 300. |
writerRetryStrategy | The retry policy that is used when the client is built internally. Valid values:
|
Configuration example
WriterConfig config = new WriterConfig();
config.setBucketCount(3);
config.setBufferSize(1024);
config.setEnableSchemaCheck(true);
config.setDispatchMode(DispatchMode.HASH_PARTITION_KEY);
config.setBatchRequestType(BatchRequestType.BATCH_WRITE_ROW);
config.setConcurrency(10);
config.setWriteMode(WriteMode.PARALLEL);
config.setAllowDuplicatedRowInBatchRequest(true);
config.setMaxBatchSize(4 * 1024 * 1024);
config.setMaxBatchRowsCount(200);
config.setCallbackThreadCount(16);
config.setCallbackThreadPoolQueueSize(1024);
config.setMaxColumnsCount(128);
config.setMaxAttrColumnSize(2 * 1024 * 1024);
config.setMaxPKColumnSize(1024);
config.setFlushInterval(10000);
config.setLogInterval(10000);
config.setClientMaxConnections(300);
config.setWriterRetryStrategy(WriterRetryStrategy.CERTAIN_ERROR_CODE_NOT_RETRY);
Appendix 3: TableStoreWriter callback functions
TableStoreWriter uses callbacks to report write successes or failures. If a row of data is written to Tablestore, TableStoreWriter invokes the onCompleted
function. If a row of data fails to be written to Tablestore, TableStoreWriter invokes the onFailed
function based on the category of the exception.
The following sample code provides an example on how to use callbacks to count the number of rows that are written to Tablestore and the number of rows that fail to be written to Tablestore:
private static AtomicLong succeedRows = new AtomicLong();
private static AtomicLong failedRows = new AtomicLong();
TableStoreCallback<RowChange, RowWriteResult> resultCallback = new TableStoreCallback<RowChange, RowWriteResult>() {
@Override
public void onCompleted(RowChange rowChange, RowWriteResult cc) {
// Count the number of rows that are written to Tablestore.
succeedRows.incrementAndGet();
}
@Override
public void onFailed(RowChange rowChange, Exception ex) {
// Count the number of rows that fail to be written to Tablestore.
failedRows.incrementAndGet();
}
};