Java SDK 在大数据离线场景下将多行数据批量写入数据表,支持插入、修改、删除三种行操作。
前提条件
安装 Tablestore Java SDK并初始化客户端。
功能说明
public BulkImportResponse bulkImport(BulkImportRequest bulkImportRequest) throws TableStoreException, ClientException
在单个请求中提交多行数据操作,由服务端按行独立写入并按行返回结果,适用于大数据场景下的离线批量导入。
-
通过
addRowChange(RowChange)或addRowChanges(List<RowChange>)添加行操作;支持RowPutChange(插入)、RowUpdateChange(修改)、RowDeleteChange(删除)三种子类,可在同一请求中混合使用。 -
通过
BulkImportResponse.getResult(succeedRows, failedRows)拿成功和失败的行;isAllSucceed()判断全量成功。每行结果独立,部分行失败不影响其他行写入。 -
BulkImportRequest.createRequestForRetry(failedRows)基于失败行生成只包含失败行的重试请求,用于按行级别重试。
以下示例将 5 行数据批量插入数据表 bulk_import_demo,并打印成功和失败行数。
String tableName = "bulk_import_demo";
BulkImportRequest request = new BulkImportRequest(tableName);
List<RowChange> rowChanges = new ArrayList<RowChange>();
for (int i = 0; i < 5; i++) {
PrimaryKey pk = PrimaryKeyBuilder.createPrimaryKeyBuilder()
.addPrimaryKeyColumn("pk", PrimaryKeyValue.fromString("row" + i))
.build();
RowPutChange put = new RowPutChange(tableName, pk);
put.addColumn(new Column("col1", ColumnValue.fromString("v" + i)));
rowChanges.add(put);
}
request.addRowChanges(rowChanges);
BulkImportResponse response = client.bulkImport(request);
// 通过 succeedRows / failedRows 拿成功和失败的行
List<BulkImportResponse.RowResult> succeedRows = new ArrayList<BulkImportResponse.RowResult>();
List<BulkImportResponse.RowResult> failedRows = new ArrayList<BulkImportResponse.RowResult>();
response.getResult(succeedRows, failedRows);
System.out.println("All succeed: " + response.isAllSucceed());
System.out.println("Succeed: " + succeedRows.size() + ", Failed: " + failedRows.size());
参数说明
|
名称 |
类型 |
说明 |
|
tableName(必选) |
String |
数据表名称。 |
|
rowChanges(必选) |
List<RowChange> |
行操作列表。每个元素为 |
场景示例
单请求混合三种行操作
在同一个 BulkImportRequest 中同时提交插入、修改、删除操作,服务端按行独立处理。
String tableName = "bulk_import_demo";
BulkImportRequest request = new BulkImportRequest(tableName);
// 插入新行
PrimaryKey pkPut = PrimaryKeyBuilder.createPrimaryKeyBuilder()
.addPrimaryKeyColumn("pk", PrimaryKeyValue.fromString("mixed_put"))
.build();
RowPutChange put = new RowPutChange(tableName, pkPut);
put.addColumn(new Column("col1", ColumnValue.fromString("put_value")));
request.addRowChange(put);
// 更新已有行(追加列)
PrimaryKey pkUpdate = PrimaryKeyBuilder.createPrimaryKeyBuilder()
.addPrimaryKeyColumn("pk", PrimaryKeyValue.fromString("row0"))
.build();
RowUpdateChange update = new RowUpdateChange(tableName, pkUpdate);
update.put(new Column("col2", ColumnValue.fromLong(100)));
request.addRowChange(update);
// 删除已有行
PrimaryKey pkDelete = PrimaryKeyBuilder.createPrimaryKeyBuilder()
.addPrimaryKeyColumn("pk", PrimaryKeyValue.fromString("row1"))
.build();
RowDeleteChange delete = new RowDeleteChange(tableName, pkDelete);
request.addRowChange(delete);
BulkImportResponse response = client.bulkImport(request);
System.out.println("Mixed all succeed: " + response.isAllSucceed());
失败行重试
通过 createRequestForRetry(failedRows) 基于失败行生成新请求,只重试失败部分,避免重复写入已成功的行。
String tableName = "bulk_import_demo";
BulkImportRequest request = new BulkImportRequest(tableName);
PrimaryKey pk = PrimaryKeyBuilder.createPrimaryKeyBuilder()
.addPrimaryKeyColumn("pk", PrimaryKeyValue.fromString("good"))
.build();
RowPutChange row = new RowPutChange(tableName, pk);
row.addColumn(new Column("col1", ColumnValue.fromString("ok")));
request.addRowChange(row);
BulkImportResponse response = client.bulkImport(request);
List<BulkImportResponse.RowResult> succeedRows = new ArrayList<BulkImportResponse.RowResult>();
List<BulkImportResponse.RowResult> failedRows = new ArrayList<BulkImportResponse.RowResult>();
response.getResult(succeedRows, failedRows);
// 仅当存在失败行时,基于失败行生成只包含失败行的重试请求
if (!failedRows.isEmpty()) {
BulkImportRequest retryRequest = request.createRequestForRetry(failedRows);
BulkImportResponse retryResponse = client.bulkImport(retryRequest);
System.out.println("Retry all succeed: " + retryResponse.isAllSucceed());
}