全部产品
Search
文档中心

表格存储:离线写入数据

更新时间:Jun 25, 2026

Java SDK 在大数据离线场景下将多行数据批量写入数据表,支持插入、修改、删除三种行操作。

前提条件

安装 Tablestore Java SDK并初始化客户端。

功能说明

public BulkImportResponse bulkImport(BulkImportRequest bulkImportRequest) throws TableStoreException, ClientException

在单个请求中提交多行数据操作,由服务端按行独立写入并按行返回结果,适用于大数据场景下的离线批量导入。

  • 通过 addRowChange(RowChange)addRowChanges(List<RowChange>) 添加行操作;支持 RowPutChange(插入)、RowUpdateChange(修改)、RowDeleteChange(删除)三种子类,可在同一请求中混合使用。

  • 通过 BulkImportResponse.getResult(succeedRows, failedRows) 拿成功和失败的行;isAllSucceed() 判断全量成功。每行结果独立,部分行失败不影响其他行写入。

  • BulkImportRequest.createRequestForRetry(failedRows) 基于失败行生成只包含失败行的重试请求,用于按行级别重试。

以下示例将 5 行数据批量插入数据表 bulk_import_demo,并打印成功和失败行数。

String tableName = "bulk_import_demo";

BulkImportRequest request = new BulkImportRequest(tableName);

List<RowChange> rowChanges = new ArrayList<RowChange>();
for (int i = 0; i < 5; i++) {
    PrimaryKey pk = PrimaryKeyBuilder.createPrimaryKeyBuilder()
            .addPrimaryKeyColumn("pk", PrimaryKeyValue.fromString("row" + i))
            .build();
    RowPutChange put = new RowPutChange(tableName, pk);
    put.addColumn(new Column("col1", ColumnValue.fromString("v" + i)));
    rowChanges.add(put);
}
request.addRowChanges(rowChanges);

BulkImportResponse response = client.bulkImport(request);

// 通过 succeedRows / failedRows 拿成功和失败的行
List<BulkImportResponse.RowResult> succeedRows = new ArrayList<BulkImportResponse.RowResult>();
List<BulkImportResponse.RowResult> failedRows = new ArrayList<BulkImportResponse.RowResult>();
response.getResult(succeedRows, failedRows);

System.out.println("All succeed: " + response.isAllSucceed());
System.out.println("Succeed: " + succeedRows.size() + ", Failed: " + failedRows.size());

参数说明

名称

类型

说明

tableName(必选)

String

数据表名称。

rowChanges(必选)

List<RowChange>

行操作列表。每个元素为 RowPutChangeRowUpdateChangeRowDeleteChange 之一,可在同一请求中混合三种操作。

场景示例

单请求混合三种行操作

在同一个 BulkImportRequest 中同时提交插入、修改、删除操作,服务端按行独立处理。

String tableName = "bulk_import_demo";

BulkImportRequest request = new BulkImportRequest(tableName);

// 插入新行
PrimaryKey pkPut = PrimaryKeyBuilder.createPrimaryKeyBuilder()
        .addPrimaryKeyColumn("pk", PrimaryKeyValue.fromString("mixed_put"))
        .build();
RowPutChange put = new RowPutChange(tableName, pkPut);
put.addColumn(new Column("col1", ColumnValue.fromString("put_value")));
request.addRowChange(put);

// 更新已有行(追加列)
PrimaryKey pkUpdate = PrimaryKeyBuilder.createPrimaryKeyBuilder()
        .addPrimaryKeyColumn("pk", PrimaryKeyValue.fromString("row0"))
        .build();
RowUpdateChange update = new RowUpdateChange(tableName, pkUpdate);
update.put(new Column("col2", ColumnValue.fromLong(100)));
request.addRowChange(update);

// 删除已有行
PrimaryKey pkDelete = PrimaryKeyBuilder.createPrimaryKeyBuilder()
        .addPrimaryKeyColumn("pk", PrimaryKeyValue.fromString("row1"))
        .build();
RowDeleteChange delete = new RowDeleteChange(tableName, pkDelete);
request.addRowChange(delete);

BulkImportResponse response = client.bulkImport(request);
System.out.println("Mixed all succeed: " + response.isAllSucceed());

失败行重试

通过 createRequestForRetry(failedRows) 基于失败行生成新请求,只重试失败部分,避免重复写入已成功的行。

String tableName = "bulk_import_demo";

BulkImportRequest request = new BulkImportRequest(tableName);

PrimaryKey pk = PrimaryKeyBuilder.createPrimaryKeyBuilder()
        .addPrimaryKeyColumn("pk", PrimaryKeyValue.fromString("good"))
        .build();
RowPutChange row = new RowPutChange(tableName, pk);
row.addColumn(new Column("col1", ColumnValue.fromString("ok")));
request.addRowChange(row);

BulkImportResponse response = client.bulkImport(request);

List<BulkImportResponse.RowResult> succeedRows = new ArrayList<BulkImportResponse.RowResult>();
List<BulkImportResponse.RowResult> failedRows = new ArrayList<BulkImportResponse.RowResult>();
response.getResult(succeedRows, failedRows);

// 仅当存在失败行时,基于失败行生成只包含失败行的重试请求
if (!failedRows.isEmpty()) {
    BulkImportRequest retryRequest = request.createRequestForRetry(failedRows);
    BulkImportResponse retryResponse = client.bulkImport(retryRequest);
    System.out.println("Retry all succeed: " + retryResponse.isAllSucceed());
}