全部产品
Search
文档中心

表格存储:增量数据操作

更新时间:Jun 25, 2026

Java SDK 通过 Stream API 消费数据表的增量变更数据(插入、更新、删除),适用于离线导出、数据同步、变更通知等场景。

前提条件

功能说明

public ListStreamResponse listStream(ListStreamRequest request) throws TableStoreException, ClientException
public DescribeStreamResponse describeStream(DescribeStreamRequest request) throws TableStoreException, ClientException
public GetShardIteratorResponse getShardIterator(GetShardIteratorRequest request) throws TableStoreException, ClientException
public GetStreamRecordResponse getStreamRecord(GetStreamRecordRequest request) throws TableStoreException, ClientException

Stream 将数据表的增量变更按 Shard 组织,消费按照 list → describe → getIterator → 循环 getStreamRecord 的步骤进行:

  • listStream(ListStreamRequest) 列出实例下已开启 Stream 的所有表的 streamId

  • describeStream(DescribeStreamRequest) 查询 Stream 的描述信息(创建时间、过期时间、当前状态)以及包含的 Shard 列表。

  • getShardIterator(GetShardIteratorRequest) 获取指定 Shard 的读取迭代值(shardIterator),作为后续拉取增量数据的起点。

  • getStreamRecord(GetStreamRecordRequest) 通过 shardIterator 拉取一批增量记录(StreamRecord 列表),并返回 nextShardIterator 用于继续拉取。

以下示例端到端消费数据表 stream_test_demo 的 Stream,输出每条增量记录的类型和主键。

String demoTable = "stream_test_demo";

// 1. 列出实例下开启 Stream 的所有表,找到目标表的 streamId
ListStreamRequest listRequest = new ListStreamRequest(demoTable);
ListStreamResponse listResponse = client.listStream(listRequest);

String targetStreamId = null;
for (Stream stream : listResponse.getStreams()) {
    if (demoTable.equals(stream.getTableName())) {
        targetStreamId = stream.getStreamId();
        break;
    }
}
System.out.println("Stream ID: " + targetStreamId);

// 2. 查询 Stream 的所有 Shard
DescribeStreamRequest describeRequest = new DescribeStreamRequest(targetStreamId);
DescribeStreamResponse describeResponse = client.describeStream(describeRequest);
List<StreamShard> shards = describeResponse.getShards();
System.out.println("Shard count: " + shards.size());

if (!shards.isEmpty()) {
    String shardId = shards.get(0).getShardId();

    // 3. 拿 Shard 的初始读取迭代值
    GetShardIteratorRequest iterRequest =
            new GetShardIteratorRequest(targetStreamId, shardId);
    GetShardIteratorResponse iterResponse = client.getShardIterator(iterRequest);
    String shardIterator = iterResponse.getShardIterator();

    // 4. 用迭代值拉取 Shard 的增量记录
    GetStreamRecordRequest recordRequest = new GetStreamRecordRequest(shardIterator);
    recordRequest.setLimit(100);
    GetStreamRecordResponse recordResponse = client.getStreamRecord(recordRequest);

    List<StreamRecord> records = recordResponse.getRecords();
    System.out.println("Records fetched: " + records.size());
    for (StreamRecord record : records) {
        System.out.println("RecordType: " + record.getRecordType()
                + ", PK: " + record.getPrimaryKey());
    }

    // nextShardIterator 用于继续拉取后续增量
    System.out.println("Next iterator: "
            + (recordResponse.getNextShardIterator() != null ? "yes" : "no"));
}

参数说明

ListStreamRequest

名称

类型

说明

tableName(可选)

String

数据表名称。不指定时返回当前实例下所有开启 Stream 的表的 Stream 信息;指定时仅返回该表的 Stream 信息。

DescribeStreamRequest

名称

类型

说明

streamId(必选)

String

Stream 的唯一标识,由 listStream 返回。

inclusiveStartShardId(可选)

String

返回的 Shard 列表的起始 shardId,用于分页拿取大量 Shard。

shardLimit(可选)

int

本次返回的 Shard 数量上限。

GetShardIteratorRequest

名称

类型

说明

streamId(必选)

String

Stream 的唯一标识,由 describeStream 返回。

shardId(必选)

String

Shard 的唯一标识,由 describeStream 返回的 StreamShard 中获取。

timestamp(可选)

long

指定迭代起点的时间戳(微秒),用于从指定时间开始读取。不指定时从 Shard 起始位置读取。

GetStreamRecordRequest

名称

类型

说明

shardIterator(必选)

String

读取迭代值,由 getShardIterator 或上一次 getStreamRecordnextShardIterator 返回。

limit(可选)

int

本次返回的 StreamRecord 数量上限。

tableName(可选)

String

目标 Shard 所属的数据表名称。

场景示例

分页获取 Shard 列表

Stream 包含的 Shard 数量较多时,通过 inclusiveStartShardIdshardLimit 分批获取。nextShardIdnull 表示已遍历完所有 Shard。

String currentStreamId = "<your-stream-id>";
String startShardId = null;
int totalShards = 0;

while (true) {
    DescribeStreamRequest request = new DescribeStreamRequest(currentStreamId);
    if (startShardId != null) {
        request.setInclusiveStartShardId(startShardId);
    }
    request.setShardLimit(50);

    DescribeStreamResponse response = client.describeStream(request);
    totalShards += response.getShards().size();

    // nextShardId 为 null 表示已遍历完所有 Shard
    if (response.getNextShardId() == null) {
        break;
    }
    startShardId = response.getNextShardId();
}
System.out.println("Total shards: " + totalShards);

持续 polling 拉取增量数据

nextShardIterator 循环调用 getStreamRecord 持续拉取一个 Shard 的增量数据。nextShardIteratornull 表示当前 Shard 已读完。

String currentStreamId = "<your-stream-id>";
String shardId = "<your-shard-id>";

GetShardIteratorRequest iterRequest =
        new GetShardIteratorRequest(currentStreamId, shardId);
String shardIterator = client.getShardIterator(iterRequest).getShardIterator();

int totalRecords = 0;
while (shardIterator != null) {
    GetStreamRecordRequest recordRequest = new GetStreamRecordRequest(shardIterator);
    recordRequest.setLimit(100);
    GetStreamRecordResponse response = client.getStreamRecord(recordRequest);

    totalRecords += response.getRecords().size();
    shardIterator = response.getNextShardIterator();
}
System.out.println("Polling total records: " + totalRecords);