All Products
Search
Document Center

Collaborative consumption

Last Updated: Aug 09, 2021

Collaborative consumption

Terms

Offset-based data consumption

The offset-based data consumption feature allows you to save consumption offsets on the server. A consumption offset consists of the sequence number of a record and the timestamp when the record is written to DataHub.

After you create a subscription for a topic and your application consumes specific data, the consumption offset is submitted to the server. When your application starts the next time, the application can obtain the consumption offset from the server and consume data from the next record. Consumption offsets must be saved on the server so that your application can consume data from the saved consumption offsets after shards are reallocated. This is a prerequisite for collaborative consumption.

You do not need to process consumption offsets in a consumer group. In the config clause, specify an interval at which consumption offsets are submitted. When your application reads records based on a consumption offset, the application considers that the records before the consumption offset are processed. If no consumption offset is submitted within the specified interval, the application submits a consumption offset. If the consumption offset fails to be submitted and your application is interrupted, the consumption offset may fail to be submitted in time. In this case, your application may repeatedly consume specific data.

Collaborative consumption

The collaborative consumption feature automatically allocates shards when multiple consumers consume a topic at the same time. This feature simplifies data processing on consumer clients. The consumers may be operating on different machines and find it difficult to allocate shards through their own coordination. If consumers that subscribe to the same topic are in the same consumer group, a shard is allocated to only one consumer in the consumer group.

Scenario

A, B, and C are three consumer instances and a topic has 10 shards.

  1. When the consumer instance A is started at first, 10 shards are allocated to it.

  2. When the other two consumer instances are started, the shards are reallocated in the following way: four to A, three to B, and three to C.

  3. When one of the shards consumed by the consumer instance A is split into two and the two shards are released after consumption, the shards are reallocated in the following way: four to A, four to B, and three to C.

  4. When the consumer instance C is stopped, the shards are reallocated in the following way: six to A and five to B.

Heartbeat

To implement collaborative consumption, the heartbeat mechanism is needed to notify the server of the required information. The information includes the status of a consumer instance, the shards allocated to it, and the shards that need to be released. If no heartbeat is received from a consumer instance within the specified interval, the consumer instance is considered to have stopped. When the status of a consumer instance changes, the server reallocates shards. The server returns the new allocation plan in heartbeat requests. Therefore, the client takes time to detect the reallocation of shards.

Version

Maven dependencies and JDK:

maven pom

<dependency>
    <groupId>com.aliyun.datahub</groupId>
    <artifactId>aliyun-sdk-datahub</artifactId>
    <version>2.18.0-public</version>
</dependency>
<dependency>
      <groupId>com.aliyun.datahub</groupId>
      <artifactId>datahub-client-library</artifactId>
      <version>1.1.12-public</version>
</dependency>

jdk

jdk: >= 1.8

Sample code

For more information about how to read and write data, see Data read and write.