Java SDK 通过 Stream API 消费数据表的增量变更数据(插入、更新、删除),适用于离线导出、数据同步、变更通知等场景。
前提条件
安装 Tablestore Java SDK并初始化客户端。
数据表已开启 Stream 功能。建表时通过
StreamSpecification启用,详见创建数据表。
功能说明
public ListStreamResponse listStream(ListStreamRequest request) throws TableStoreException, ClientException
public DescribeStreamResponse describeStream(DescribeStreamRequest request) throws TableStoreException, ClientException
public GetShardIteratorResponse getShardIterator(GetShardIteratorRequest request) throws TableStoreException, ClientException
public GetStreamRecordResponse getStreamRecord(GetStreamRecordRequest request) throws TableStoreException, ClientExceptionStream 将数据表的增量变更按 Shard 组织,消费按照 list → describe → getIterator → 循环 getStreamRecord 的步骤进行:
listStream(ListStreamRequest)列出实例下已开启 Stream 的所有表的streamId。describeStream(DescribeStreamRequest)查询 Stream 的描述信息(创建时间、过期时间、当前状态)以及包含的Shard列表。getShardIterator(GetShardIteratorRequest)获取指定Shard的读取迭代值(shardIterator),作为后续拉取增量数据的起点。getStreamRecord(GetStreamRecordRequest)通过shardIterator拉取一批增量记录(StreamRecord列表),并返回nextShardIterator用于继续拉取。
以下示例端到端消费数据表 stream_test_demo 的 Stream,输出每条增量记录的类型和主键。
String demoTable = "stream_test_demo";
// 1. 列出实例下开启 Stream 的所有表,找到目标表的 streamId
ListStreamRequest listRequest = new ListStreamRequest(demoTable);
ListStreamResponse listResponse = client.listStream(listRequest);
String targetStreamId = null;
for (Stream stream : listResponse.getStreams()) {
if (demoTable.equals(stream.getTableName())) {
targetStreamId = stream.getStreamId();
break;
}
}
System.out.println("Stream ID: " + targetStreamId);
// 2. 查询 Stream 的所有 Shard
DescribeStreamRequest describeRequest = new DescribeStreamRequest(targetStreamId);
DescribeStreamResponse describeResponse = client.describeStream(describeRequest);
List<StreamShard> shards = describeResponse.getShards();
System.out.println("Shard count: " + shards.size());
if (!shards.isEmpty()) {
String shardId = shards.get(0).getShardId();
// 3. 拿 Shard 的初始读取迭代值
GetShardIteratorRequest iterRequest =
new GetShardIteratorRequest(targetStreamId, shardId);
GetShardIteratorResponse iterResponse = client.getShardIterator(iterRequest);
String shardIterator = iterResponse.getShardIterator();
// 4. 用迭代值拉取 Shard 的增量记录
GetStreamRecordRequest recordRequest = new GetStreamRecordRequest(shardIterator);
recordRequest.setLimit(100);
GetStreamRecordResponse recordResponse = client.getStreamRecord(recordRequest);
List<StreamRecord> records = recordResponse.getRecords();
System.out.println("Records fetched: " + records.size());
for (StreamRecord record : records) {
System.out.println("RecordType: " + record.getRecordType()
+ ", PK: " + record.getPrimaryKey());
}
// nextShardIterator 用于继续拉取后续增量
System.out.println("Next iterator: "
+ (recordResponse.getNextShardIterator() != null ? "yes" : "no"));
}参数说明
ListStreamRequest
名称 | 类型 | 说明 |
tableName(可选) | String | 数据表名称。不指定时返回当前实例下所有开启 Stream 的表的 Stream 信息;指定时仅返回该表的 Stream 信息。 |
DescribeStreamRequest
名称 | 类型 | 说明 |
streamId(必选) | String | Stream 的唯一标识,由 |
inclusiveStartShardId(可选) | String | 返回的 Shard 列表的起始 |
shardLimit(可选) | int | 本次返回的 Shard 数量上限。 |
GetShardIteratorRequest
名称 | 类型 | 说明 |
streamId(必选) | String | Stream 的唯一标识,由 |
shardId(必选) | String | Shard 的唯一标识,由 |
timestamp(可选) | long | 指定迭代起点的时间戳(微秒),用于从指定时间开始读取。不指定时从 Shard 起始位置读取。 |
GetStreamRecordRequest
名称 | 类型 | 说明 |
shardIterator(必选) | String | 读取迭代值,由 |
limit(可选) | int | 本次返回的 |
tableName(可选) | String | 目标 Shard 所属的数据表名称。 |
场景示例
分页获取 Shard 列表
Stream 包含的 Shard 数量较多时,通过 inclusiveStartShardId 和 shardLimit 分批获取。nextShardId 为 null 表示已遍历完所有 Shard。
String currentStreamId = "<your-stream-id>";
String startShardId = null;
int totalShards = 0;
while (true) {
DescribeStreamRequest request = new DescribeStreamRequest(currentStreamId);
if (startShardId != null) {
request.setInclusiveStartShardId(startShardId);
}
request.setShardLimit(50);
DescribeStreamResponse response = client.describeStream(request);
totalShards += response.getShards().size();
// nextShardId 为 null 表示已遍历完所有 Shard
if (response.getNextShardId() == null) {
break;
}
startShardId = response.getNextShardId();
}
System.out.println("Total shards: " + totalShards);持续 polling 拉取增量数据
按 nextShardIterator 循环调用 getStreamRecord 持续拉取一个 Shard 的增量数据。nextShardIterator 为 null 表示当前 Shard 已读完。
String currentStreamId = "<your-stream-id>";
String shardId = "<your-shard-id>";
GetShardIteratorRequest iterRequest =
new GetShardIteratorRequest(currentStreamId, shardId);
String shardIterator = client.getShardIterator(iterRequest).getShardIterator();
int totalRecords = 0;
while (shardIterator != null) {
GetStreamRecordRequest recordRequest = new GetStreamRecordRequest(shardIterator);
recordRequest.setLimit(100);
GetStreamRecordResponse response = client.getStreamRecord(recordRequest);
totalRecords += response.getRecords().size();
shardIterator = response.getNextShardIterator();
}
System.out.println("Polling total records: " + totalRecords);