通過本文您可以瞭解在Table Store中如何通過TableStoreWriter實現高並發資料寫入。
背景介紹
在日誌、物聯網(例如軌跡追蹤或溯源)等情境中,系統會在短時間內產生大量的資料,並將資料寫入到資料庫中。因此資料庫需要提供高並發、高吞吐率的寫入效能,滿足每秒上萬行甚至上百萬行的寫入吞吐率,而Table Store的 BatchWriteRow 介面限制單次只能寫入200行資料。
TableStoreWriter是Table Store基於Java SDK實現的簡單易用、高效能的資料匯入工具類,它封裝了用於高並發、高吞吐率資料匯入的介面,可以實現高並發寫入資料,同時支援行層級回調以及自訂配置功能。更多資訊,請參見附錄 1:TableStoreWriter 實現原理。
TableStoreWriter只適用於寬表模型。
適用情境
如果您的業務情境具備以下特點,則可以使用TableStoreWriter進行資料寫入,典型的應用情境有日誌儲存、即時通訊(IM)系統、分布式隊列等。
高並發,對吞吐率要求很高
對單條資料的寫入延遲沒有要求
寫入可非同步化(可採用生產者消費者模型)
同一條資料可重複寫入
準備工作
擷取阿里雲帳號或者RAM使用者的AccessKey。
操作步驟
步驟 1:安裝 Tablestore SDK
如果您使用的是Maven專案,請在專案的pom.xml檔案中添加如下依賴:
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>tablestore</artifactId>
<version>5.17.4</version>
</dependency> 更多關於安裝Tablestore SDK的資訊,請參見安裝Tablestore SDK。
步驟 2:初始化 TableStoreWriter
初始化TableStoreWriter時,您需要配置執行個體和表的資訊、身份認證資訊,也可以自訂TableStoreWriter的配置參數和回呼函數。使用多線程時,建議共用一個TableStoreWriter對象,TableStoreWriter的初始化範例程式碼如下所示。
TableStoreWriter支援的參數配置和回呼函數,請參見附錄 2:TableStoreWriter 配置參數和附錄 3:TableStoreWriter 回呼函數。
private static TableStoreWriter createTablesStoreWriter() {
/**
* 一般情況下保持預設配置即可,您也可以按需自訂 TableStoreWriter 配置。
* 更多參數說明請參見“配置 TableStoreWriter”文檔。
* */
WriterConfig config = new WriterConfig();
// 配置一次大量匯入的行數上限,預設值為 200。
config.setMaxBatchRowsCount(200);
// 配置最大並發數,預設值為 10。建議保持預設。
config.setConcurrency(10);
/**
* 自訂行層級 Callback。
* 該樣本通過成功、失敗計數,簡單展示回調能力。
* */
TableStoreCallback<RowChange, RowWriteResult> resultCallback = new TableStoreCallback<RowChange, RowWriteResult>() {
@Override
public void onCompleted(RowChange rowChange, RowWriteResult cc) {
succeedRows.incrementAndGet();
}
@Override
public void onFailed(RowChange rowChange, Exception ex) {
failedRows.incrementAndGet();
}
};
/** 配置訪問憑證。 **/
ServiceCredentials credentials = new DefaultCredentials(accessKeyId, accessKeySecret);
/**
* 推薦使用內部構建的線程池與 Client,方便使用者使用,隔離初始化和釋放的邏輯。
* */
DefaultTableStoreWriter writer = new DefaultTableStoreWriter(
endpoint, credentials, instanceName, tableName, config, resultCallback);
return writer;
}步驟 3:寫入資料
您可以根據不同的增刪改操作構造RowChange,然後將RowChange添加到TableStoreWriter中。
單行寫入資料
以下範例程式碼使用單行寫入方式寫入1000行資料。
public void writeSingleRowWithFuture(TableStoreWriter writer) {
System.out.println("=========================================================[Start]");
System.out.println("Write Single Row With Future");
int rowsCount = 1000;
int columnsCount = 10;
String strValue = "1234567890";
AtomicLong rowIndex = new AtomicLong(-1);
List<Future<WriterResult>> futures = new LinkedList<Future<WriterResult>>();
for (long index = rowIndex.incrementAndGet(); index < rowsCount; index = rowIndex.incrementAndGet()) {
PrimaryKey pk = PrimaryKeyBuilder.createPrimaryKeyBuilder()
.addPrimaryKeyColumn("pk_0", PrimaryKeyValue.fromString(md5Hex(index + "")))
.addPrimaryKeyColumn("pk_1", PrimaryKeyValue.fromString("pk" + index))
.addPrimaryKeyColumn("pk_2", PrimaryKeyValue.fromLong(index % 5))
.build();
RowUpdateChange rowChange = new RowUpdateChange(tableName, pk);
for (int j = 0; j < columnsCount; j++) {
rowChange.put("column_" + j, ColumnValue.fromString(strValue));
}
rowChange.put("index", ColumnValue.fromLong(index));
Future<WriterResult> future = writer.addRowChangeWithFuture(rowChange);
futures.add(future);
}
System.out.println("Write thread finished.");
// 對緩衝區中的資料進行 flush。TableStoreWriter也會根據flushInterval和maxBatchSize的配置決定緩衝區的flush時機。其中flushInterval是根據時間定期進行flush,maxBatchSize是根據緩衝區的資料量決定是否進行flush。
writer.flush();
// 列印Future過程。
// printFutureResult(futures);
System.out.println("=========================================================[Finish]");
}批量寫入資料
以下範例程式碼使用批量寫入方式寫入1000行資料。
public void writeRowListWithFuture(TableStoreWriter writer) {
System.out.println("=========================================================[Start]");
System.out.println("Write Row List With Future");
int rowsCount = 1000;
int columnsCount = 10;
String strValue = "1234567890";
AtomicLong rowIndex = new AtomicLong(-1);
List<Future<WriterResult>> futures = new LinkedList<Future<WriterResult>>();
List<RowChange> rowChanges = new LinkedList<RowChange>();
for (long index = rowIndex.incrementAndGet(); index < rowsCount; index = rowIndex.incrementAndGet()) {
PrimaryKey pk = PrimaryKeyBuilder.createPrimaryKeyBuilder()
.addPrimaryKeyColumn("pk_0", PrimaryKeyValue.fromString(md5Hex(index + "")))
.addPrimaryKeyColumn("pk_1", PrimaryKeyValue.fromString("pk" + index))
.addPrimaryKeyColumn("pk_2", PrimaryKeyValue.fromLong(index % 5))
.build();
RowUpdateChange rowChange = new RowUpdateChange(tableName, pk);
for (int j = 0; j < columnsCount; j++) {
rowChange.put("column_" + j, ColumnValue.fromString(strValue));
}
rowChange.put("index", ColumnValue.fromLong(index));
rowChanges.add(rowChange);
if (Math.random() > 0.995 || index == rowsCount - 1) {
Future<WriterResult> future = writer.addRowChangeWithFuture(rowChanges);
futures.add(future);
rowChanges.clear();
}
}
System.out.println("Write thread finished.");
// 對緩衝區中的資料進行 flush。TableStoreWriter也會根據flushInterval和maxBatchSize的配置決定緩衝區的flush時機。其中flushInterval是根據時間定期進行flush,maxBatchSize是根據緩衝區的資料量決定是否進行flush。
writer.flush();
// 列印Future過程。
// printFutureResult(futures);
System.out.println("=========================================================[Finish]");
}步驟 4:關閉 TableStoreWriter
建議您在退出應用程式前手動關閉TableStoreWriter。在TableStoreWriter關閉前,系統會先flush掉緩衝區中的所有資料。
如果在TableStoreWriter關閉過程中或者關閉之後仍然調用addRowChange方法向緩衝區中寫入資料,該部分資料不保證會寫入Table Store。
// 主動關閉Writer,內部等候所有隊列資料寫入後,自動關閉client與內部的線程池。
writer.close();完整範例程式碼
以下樣本用於建立一張新的資料表,並通過並發寫入的方式將資料寫入到資料表中。
import com.alicloud.openservices.tablestore.DefaultTableStoreWriter;
import com.alicloud.openservices.tablestore.SyncClient;
import com.alicloud.openservices.tablestore.TableStoreCallback;
import com.alicloud.openservices.tablestore.TableStoreWriter;
import com.alicloud.openservices.tablestore.core.auth.DefaultCredentials;
import com.alicloud.openservices.tablestore.core.auth.ServiceCredentials;
import com.alicloud.openservices.tablestore.model.*;
import com.alicloud.openservices.tablestore.writer.RowWriteResult;
import com.alicloud.openservices.tablestore.writer.WriterConfig;
import com.alicloud.openservices.tablestore.writer.WriterResult;
import com.aliyuncs.exceptions.ClientException;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
import static org.apache.commons.codec.digest.DigestUtils.md5Hex;
public class TableStoreWriterDemo {
// yourInstanceName 填寫您的執行個體名稱
private static String instanceName = "yourInstanceName";
// yourEndpoint 填寫您的執行個體訪問地址
private static String endpoint = "yourEndpoint";
// 擷取環境變數裡的 AccessKey ID 和 AccessKey Secret
private static String accessKeyId = System.getenv("TABLESTORE_ACCESS_KEY_ID");
private static String accessKeySecret = System.getenv("TABLESTORE_ACCESS_KEY_SECRET");
private static String tableName = "<TABLE_NAME>";
private static AtomicLong succeedRows = new AtomicLong();
private static AtomicLong failedRows = new AtomicLong();
public static void main(String[] args) throws ClientException {
TableStoreWriterDemo sample = new TableStoreWriterDemo();
/**
* 使用Writer前確保表已存在。
* 1、writer會校正表的存在性.
* 2、校正寫入資料是否與表的欄位、類型一致。
* */
sample.tryCreateTable();
/**
* 初始化建議使用。
* DefaultTableStoreWriter(
* String endpoint, // 執行個體的服務地址。
* ServiceCredentials credentials, // 認證資訊:含 AK,也支援 token
* String instanceName, // 執行個體名。
* String tableName, // 表名:一個 writer 僅針對一個表。
* WriterConfig config, // writer 的配置。
* TableStoreCallback<RowChange, RowWriteResult> resultCallback // 行層級回調。
* )
* */
TableStoreWriter writer = sample.createTablesStoreWriter();
/**
* Future使用:單行寫
* */
sample.writeSingleRowWithFuture(writer);
/**
* Future使用:批量寫
* */
//sample.writeRowListWithFuture(writer);
System.out.println("Count by TablestoreCallback: failedRow=" + failedRows.get() + ", succeedRow=" + succeedRows.get());
System.out.println("Count by WriterStatics: " + writer.getWriterStatistics());
/**
* 您需要主動關閉Writer,內部等候所有隊列資料寫入後,自動關閉 client 與內部的線程池。
* */
writer.close();
}
private static TableStoreWriter createTablesStoreWriter() {
WriterConfig config = new WriterConfig();
// 配置一次大量匯入的行數上限,預設值為 200。如果希望一次寫入超過 200 行資料,請調大該值。
config.setMaxBatchRowsCount(200);
// 配置最大並發數,預設值為 10。建議保持預設即可。
config.setConcurrency(10);
/**
* 自訂的行層級 Callback。
* 該樣本通過成功、失敗計數,簡單展示回調能力。
* */
TableStoreCallback<RowChange, RowWriteResult> resultCallback = new TableStoreCallback<RowChange, RowWriteResult>() {
@Override
public void onCompleted(RowChange rowChange, RowWriteResult cc) {
succeedRows.incrementAndGet();
}
@Override
public void onFailed(RowChange rowChange, Exception ex) {
failedRows.incrementAndGet();
}
};
ServiceCredentials credentials = new DefaultCredentials(accessKeyId, accessKeySecret);
/**
* 推薦使用內部構建的線程池與 client,方便使用者使用,隔離初始化和釋放的邏輯。
* */
DefaultTableStoreWriter writer = new DefaultTableStoreWriter(
endpoint, credentials, instanceName, tableName, config, resultCallback);
return writer;
}
private static void tryCreateTable() throws ClientException {
SyncClient ots = new SyncClient(endpoint, accessKeyId, accessKeySecret, instanceName);
try {
ots.deleteTable(new DeleteTableRequest(tableName));
} catch (Exception e) {
}
TableMeta tableMeta = new TableMeta(tableName);
tableMeta.addPrimaryKeyColumn("pk_0", PrimaryKeyType.STRING);
tableMeta.addPrimaryKeyColumn("pk_1", PrimaryKeyType.STRING);
tableMeta.addPrimaryKeyColumn("pk_2", PrimaryKeyType.INTEGER);
TableOptions tableOptions = new TableOptions(-1, 1);
CreateTableRequest request = new CreateTableRequest(
tableMeta, tableOptions, new ReservedThroughput(new CapacityUnit(0, 0)));
try {
CreateTableResponse res = ots.createTable(request);
} catch (Exception e) {
throw new ClientException(e);
} finally {
ots.shutdown();
}
}
public static void writeSingleRowWithFuture(TableStoreWriter writer) {
System.out.println("=========================================================[Start]");
System.out.println("Write Single Row With Future");
int rowsCount = 1000;
int columnsCount = 10;
String strValue = "1234567890";
AtomicLong rowIndex = new AtomicLong(-1);
List<Future<WriterResult>> futures = new LinkedList<Future<WriterResult>>();
for (long index = rowIndex.incrementAndGet(); index < rowsCount; index = rowIndex.incrementAndGet()) {
PrimaryKey pk = PrimaryKeyBuilder.createPrimaryKeyBuilder()
.addPrimaryKeyColumn("pk_0", PrimaryKeyValue.fromString(md5Hex(index + "")))
.addPrimaryKeyColumn("pk_1", PrimaryKeyValue.fromString("pk" + index))
.addPrimaryKeyColumn("pk_2", PrimaryKeyValue.fromLong(index % 5))
.build();
RowUpdateChange rowChange = new RowUpdateChange(tableName, pk);
for (int j = 0; j < columnsCount; j++) {
rowChange.put("column_" + j, ColumnValue.fromString(strValue));
}
rowChange.put("index", ColumnValue.fromLong(index));
Future<WriterResult> future = writer.addRowChangeWithFuture(rowChange);
futures.add(future);
}
System.out.println("Write thread finished.");
writer.flush();
// 列印future過程。
// printFutureResult(futures);
System.out.println("=========================================================[Finish]");
}
public void writeRowListWithFuture(TableStoreWriter writer) {
System.out.println("=========================================================[Start]");
System.out.println("Write Row List With Future");
int rowsCount = 1000;
int columnsCount = 10;
String strValue = "1234567890";
AtomicLong rowIndex = new AtomicLong(-1);
List<Future<WriterResult>> futures = new LinkedList<Future<WriterResult>>();
List<RowChange> rowChanges = new LinkedList<RowChange>();
for (long index = rowIndex.incrementAndGet(); index < rowsCount; index = rowIndex.incrementAndGet()) {
PrimaryKey pk = PrimaryKeyBuilder.createPrimaryKeyBuilder()
.addPrimaryKeyColumn("pk_0", PrimaryKeyValue.fromString(md5Hex(index + "")))
.addPrimaryKeyColumn("pk_1", PrimaryKeyValue.fromString("pk" + index))
.addPrimaryKeyColumn("pk_2", PrimaryKeyValue.fromLong(index % 5))
.build();
RowUpdateChange rowChange = new RowUpdateChange(tableName, pk);
for (int j = 0; j < columnsCount; j++) {
rowChange.put("column_" + j, ColumnValue.fromString(strValue));
}
rowChange.put("index", ColumnValue.fromLong(index));
rowChanges.add(rowChange);
if (Math.random() > 0.995 || index == rowsCount - 1) {
Future<WriterResult> future = writer.addRowChangeWithFuture(rowChanges);
futures.add(future);
rowChanges.clear();
}
}
System.out.println("Write thread finished.");
writer.flush();
// 列印future過程。
// printFutureResult(futures);
System.out.println("=========================================================[Finish]");
}
private static void printFutureResult(List<Future<WriterResult>> futures) {
int totalRow = 0;
for (int index = 0; index < futures.size(); index++) {
try {
WriterResult result = futures.get(index).get();
totalRow += result.getTotalCount();
System.out.println(String.format(
"Future[%d] finished:\tfailed: %d\tsucceed: %d\tfutureBatch: %d\ttotalFinished: %d",
index, result.getFailedRows().size(), result.getSucceedRows().size(),
result.getTotalCount(), totalRow));
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
}執行結果如下所示。
=========================================================[Start]
Write Single Row With Future
Write thread finished.
=========================================================[Finish]
Count by TablestoreCallback: failedRow=0, succeedRow=1000
Count by WriterStatics: WriterStatistics: {
totalRequestCount=6,
totalRowsCount=1000,
totalSucceedRowsCount=1000,
totalFailedRowsCount=0,
totalSingleRowRequestCount=0,
}常見問題
使用Java SDK寫入資料時報錯:The count of attribute columns exceeds the maximum:128
附錄
附錄 1:TableStoreWriter 實現原理
TableStoreWriter是基於SDK處理層介面重新封裝的工具類,它依賴了以下介面。
TableStoreWriter初始化依賴AsyncClient非同步介面。
TableStoreWriter匯入資料使用BatchWriteRow介面。
TableStoreWriter異常重試依賴RetryStrategy介面。
代碼分層架構如下圖所示。

TableStoreWriter在介面效能和易用性上做了最佳化,具備以下特性:
使用非同步介面:使用更少的線程,提供更高的並發。
自動資料彙總:在記憶體中使用緩衝隊列,讓一次發給Table Store的批量寫請求盡量大,提高寫入吞吐率。
採用生產者消費者模式:更易於非同步化和資料聚集。
使用高效能資料交換隊列:選用Disruptor RingBuffer,採用多生產者單消費者的模型,提供更好的效能。
屏蔽複雜請求封裝:屏蔽調用BatchWriteRow介面細節,通過預檢查自動過濾髒資料(例如主鍵格式與表預定義的不符、行大小超限、行列數超限等),自動處理要求節流(例如一次批量的行數限制、一次批量的大小限制等)。
行層級Callback:相對於Table StoreJava SDK提供的請求層級的Callback,TableStoreWriter提供行層級的Callback,讓商務邏輯可以實現行層級資料處理。
行層級重試:請求層級重試失敗會根據特定的錯誤碼進行行層級重試,最大程度保證資料寫入成功率。
TableStoreWriter的實現和封裝細節如下圖所示。

附錄 2:TableStoreWriter 配置參數
初始化TableStoreWriter時,您可以通過修改 WriterConfig 自訂TableStoreWriter的配置參數。
參數 | 類型 | 說明 |
bucketCount | 整型 | Writer內部的分桶數,預設值為3。一個分桶相當於一個緩衝空間,用於緩衝部分資料,此參數可用於提升按序寫並發,當未達機器瓶頸時,分桶數與寫入速率正相關。 說明 當分桶內的寫入模式為並發寫時,保持預設配置即可。 |
bufferSize | 整型 | 記憶體中緩衝隊列的大小,預設值為1024。參數值必須為2的指數倍。 |
enableSchemaCheck | 布爾值 | 資料寫入到緩衝區時,是否進行schema檢查,預設值為true。
|
dispatchMode | DispatchMode | 資料寫入到緩衝區時,將資料分發到分桶內的模式。取值範圍如下:
重要 當分桶數大於等於2時,此參數才有效。 |
batchRequestType | BatchRequestType | Writer將緩衝區資料發送到Table Store時,構建的請求類型。取值範圍如下:
|
concurrency | 整型 | Writer將緩衝區資料發送到Table Store時的最大請求並發數,預設值為10。 |
writeMode | WriteMode | Writer將緩衝區資料寫入到Table Store時,每個分桶內資料寫入到Table Store中的模式。取值範圍如下:
|
allowDuplicatedRowInBatchRequest | 布爾值 | 構建批量請求將資料寫入Table Store時,是否允許有主鍵相同的行,預設值為true。 重要 當資料表上存在二級索引時,Table Store會忽略此參數的配置,不允許有主鍵相同的行。此時TableStoreWriter在構建請求時會將主鍵相同的行加入到不同請求中。 |
maxBatchSize | 整型 | 一次批量請求寫入Table Store的最巨量資料量,預設值為4 MB,單位為位元組。 |
maxBatchRowsCount | 整型 | 一次批量請求寫入Table Store的最大行數,預設值為200,最大值為200。 |
callbackThreadCount | 整型 | Writer內部Callback啟動並執行線程池線程數,預設值為處理器個數。 |
callbackThreadPoolQueueSize | 整型 | Writer內部Callback啟動並執行線程池隊列大小,預設值為1024。 |
maxColumnsCount | 整型 | 寫入資料到緩衝區時,一行資料的最大列數,預設值為128。 |
maxAttrColumnSize | 整型 | 寫入資料到緩衝區時,單一屬性列值的最大大小,預設值為2 MB,單位為位元組。 |
maxPKColumnSize | 整型 | 寫入資料到緩衝區時,單一主鍵列值的最大大小,預設值為1 KB,單位為位元組。 |
flushInterval | 整型 | Writer將緩衝區資料發送到Table Store時,自動flush緩衝區的時間間隔,預設值為10000,單位為毫秒。 |
logInterval | 整型 | Writer將緩衝區資料發送到Table Store時,自動列印任務狀態的時間間隔,預設值為10000,單位為毫秒。 |
clientMaxConnections | 整型 | 內部構建Client時使用的最大串連數配置,預設值為300。 |
writerRetryStrategy | WriterRetryStrategy | 內部構建Client時使用的重試策略,取值範圍如下:
|
配置樣本
WriterConfig config = new WriterConfig();
config.setBucketCount(3);
config.setBufferSize(1024);
config.setEnableSchemaCheck(true);
config.setDispatchMode(DispatchMode.HASH_PARTITION_KEY);
config.setBatchRequestType(BatchRequestType.BATCH_WRITE_ROW);
config.setConcurrency(10);
config.setWriteMode(WriteMode.PARALLEL);
config.setAllowDuplicatedRowInBatchRequest(true);
config.setMaxBatchSize(4 * 1024 * 1024);
config.setMaxBatchRowsCount(200);
config.setCallbackThreadCount(16);
config.setCallbackThreadPoolQueueSize(1024);
config.setMaxColumnsCount(128);
config.setMaxAttrColumnSize(2 * 1024 * 1024);
config.setMaxPKColumnSize(1024);
config.setFlushInterval(10000);
config.setLogInterval(10000);
config.setClientMaxConnections(300);
config.setWriterRetryStrategy(WriterRetryStrategy.CERTAIN_ERROR_CODE_NOT_RETRY);附錄 3:TableStoreWriter 回呼函數
TableStoreWriter通過Callback來反饋寫入行資料的成功或者失敗資訊。如果行資料寫入成功,則Writer會調用 onCompleted 函數,如果行資料寫入失敗,則Writer會根據異常的類別調用對應的 onFailed 函數。
以下樣本用於統計成功和失敗的行資料數量。
private static AtomicLong succeedRows = new AtomicLong();
private static AtomicLong failedRows = new AtomicLong();
TableStoreCallback<RowChange, RowWriteResult> resultCallback = new TableStoreCallback<RowChange, RowWriteResult>() {
@Override
public void onCompleted(RowChange rowChange, RowWriteResult cc) {
//統計成功行數。
succeedRows.incrementAndGet();
}
@Override
public void onFailed(RowChange rowChange, Exception ex) {
//統計失敗行數。
failedRows.incrementAndGet();
}
};