Tablestore提供了 stream 的 list 和 describe 操作,以及 shard 的 getsharditerator 和 getshardrecord 操作。

列出所有的Stream(ListStream)

ListStream介面用於列出當前執行個體和表下的所有stream。

樣本

列出某個表的所有 stream 資訊。

private static void listStream(SyncClient client, String tableName) {
    ListStreamRequest listStreamRequest = new ListStreamRequest(tableName);
    ListStreamResponse result = client.listStream(listStreamRequest);
}
            

查詢表Stream描述資訊(DescribeStream)

DescribeStream介面可以查詢 stream 的建立時間(creationTime)、到期時間(expirationTime)、當前的狀態(status) 、包含 shard 的列表(shards)和下一個起始 shard 的 id(如果還有尚未返回的 shard)。

樣本 1

擷取當前 stream 的所有 shard 資訊。

    private static void describeStream(SyncClient client, String streamId) {
        DescribeStreamRequest desRequest = new DescribeStreamRequest(streamId);
        DescribeStreamResponse desStream = client.describeStream(desRequest);
    }
            

樣本 2

設定開始 shardID(InclusiveStartShardId)和每次返回的最大 shard 數目。

    private static void describeStream(SyncClient client, String streamId) {
    DescribeStreamRequest dsRequest = new DescribeStreamRequest(streamId);
    dsRequest.setInclusiveStartShardId(startShardId);
    dsRequest.setShardLimit(10);
    DescribeStreamResponse dscStream = client.describeStream(dsRequest);
    }
            

擷取Shard的讀取迭代值(GetShardIterator)

GetShardIterator 介面用於擷取 shard 的讀取起始迭代值。

樣本

擷取 shard 的讀取起始迭代值。

    private static void getShardIterator(SyncClient client, String streamId, String shardId) {
    GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest(streamId, shardId);
    GetShardIteratorResponse shardIterator = client.getShardIterator(getShardIteratorRequest);
    }
            

擷取Shard的更新記錄(GetStreamRecord)

GetStreamRecord 介面用於擷取 shard 的每條更新記錄。

樣本

擷取 shard 的最初 100 條更新。

    private static void getShardIterator(SyncClient client, String shardIterator) {
        GetStreamRecordRequest streamRecordRequest = new GetStreamRecordRequest(shardIterator);
        streamRecordRequest.setLimit(100);
        GetStreamRecordResponse streamRecordResponse = client.getStreamRecord(streamRecordRequest);
        List<StreamRecord> records = streamRecordResponse.getRecords();
        for(int k=0;k<records.size();k++){
            System.out.println("record info:" +  records.get(k).toString());
        }
        System.out.println("next iterator:" + streamRecordResponse.getNextShardIterator());
    }