All Products
Search
Document Center

Platform For AI:Subscription and pushing of queue service

Last Updated:Apr 10, 2025

Subscription and pushing can immediately push ready data to the client, reducing the latency caused by polling through the Get API and the queue service load caused by querying through the queue service. However, subscription and pushing has a certain complexity because it involves many additional concepts. This topic describes how to use the subscription and push feature of the queue service.

Consumer

A consumer refers to a client program that subscribes to data from the queue service. When the client uses the Watch API to call data, a consumer object is created in the queue service. Parameters in the API (such as window size and tags) are used as consumer attributes. You can view the consumer status through the Attribute API. Example:

[OK] Attributes: 
consumers.list.[0] : Id: default_group.u1, Index: 0, Pending: 0, Status: Complete, Idle: 2.091s, Window: 0, Slots: 0, AutoCommit: true
consumers.list.[1] : Id: default_group.u2, Index: 0, Pending: 0, Status: Complete, Idle: 1.124s, Window: 0, Slots: 0, AutoCommit: true
consumers.stats.total : 2

consumers.stats.total indicates the total number of consumers. consumers.list is the list of consumers. The following table describes the columns:

Parameter

Description

Id

The ID of the consumer, in the format of <consumer group Id.consumer Id>.

Index

The index of the data that the consumer is consuming.

Pending

The amount of data that is being processed but is not committed by the current consumer.

Status

The status of the consumer. Valid values:

  • Running: The consumer is running.

  • Exit: The consumer exits for a long period and no data is consumed.

  • Complete: The consumer exits and data is consumed.

  • Leaving: The consumer exits for a short period.

Window

The size of the consumer window, which is the maximum amount of pushed data.

Slots

The number of idle windows. A value of 0 indicates that the window is occupied.

AutoCommit

Indicates whether to automatically commit data after data is sent.

Tags

The filter conditions of the consumer.

Note

When you use the Watch API with tags, make sure that the tags used by consumers in the same consumer group are all the same.

When you use the Watch API, if you execute the data tag, you will see an additional Tags column, for example:

consumers.list.[0] : Id: ..., Pending: 0, ..., Window: ..., Tags: tags[foo=bar]

The column indicates the data tags to which the consumer subscribes. Data is delivered to the consumer only when the data meets the conditions.

Consumer group

A consumer group is a collection of consumers that subscribe to the queue service with the same filter conditions. Consumers in the same group cannot have the same name, but consumers in different groups can have the same name.

Within the same consumer group, data is evenly distributed to each consumer. Between different groups, data is pushed in parallel to consumers in each group. For example:

  • Consumers in the same group receive different data.

  • Consumers in different groups receive the same data.

Important

If a consumer deletes data through the API, the data is immediately deleted, and consumers in other groups will not be able to receive it.

You can view the consumer status in the queue service through the Attribute API. The following is an example:

groups.list.[0] : Id: default_group, Index: 0, Pending: 0, Delivered: 0, Consumers: 1
groups.list.[1] : Id: group, Index: 0, Pending: 0, Delivered: 1, Consumers: 0

groups.list is the list of consumers. The following table describes the columns:

Parameter

Description

Id

The ID of the consumer group.

Index

The index that is being consumed by the current consumer group, which is the largest index of consumers in the consumer group.

Pending

The amount of data that is being processed but not committed by the current consumer group.

Delivered

The number of pushed messages.

Consumers

The amount of consumed data in the consumer group.

The number of consumer groups is not limited, but they will not be automatically cleaned up. After creation, the status will be retained.

Using consumers and consumer groups

You can declare the consumer and consumer group through HTTP headers in the Watch API, or when you initialize your client in the SDK. You can also obtain the key of the HTTP header by using the Attributes API.

meta.header.group : X-EAS-QueueService-Gid
meta.header.user : X-EAS-QueueService-Uid

Declare the consumer ID and the consumer group ID to join through X-EAS-QueueService-Uid and X-EAS-QueueService-Gid respectively.

Commit and negative

The queue service supports two consumption methods: Commit and Negative. Both operate on the data Index, but have different semantics.

  • Commit indicates that the consumer has received the data and completed processing, and the next batch can be pushed.

  • Negative indicates that the consumer has received the data but cannot process it, and the queue service decides whether to push the next batch based on the error code. You can declare the cause and error code in text in the Negative mode, so that the data is pushed to other consumers. The following table lists the special error codes that the queue service can handle:

    Code

    Description

    Shutdown

    Indicates that the consumer is exiting and the queue service does not continue to push data.

Data rebalance

You may be unable to commit data in the following scenarios:

  • When the prediction service is rolling updated, some consumers are terminated, resulting in data being processed that cannot be committed.

  • Some internal error occurs and the consumer crashes.

  • The consumer cannot process the received data and executes a Negative commit.

This data that cannot be processed will be redistributed to other consumers by the queue service. This mechanism is called data rebalance. Data rebalance is performed in the following cases:

  • Any consumer enters the Exit state.

  • The consumer does not receive new data pushes when the window has idle space.

The queue service maintains a delivery counter for each piece of data. Each time the data is rebalanced and distributed, the counter is incremented. During the rebalance process, if it is found that the delivery counter of a piece of data has exceeded the maximum number of deliveries, the data is treated as a dead letter. The queue service executes the configured dead-letter policy. By default, data is delivered to the tail queue.

Tail queue

The tail queue is an auxiliary queue used to store data that is not pushed to consumers (such as dead letters or custom control data). It is a queue instance within the queue service and has the same API. Each input and output queue has a tail queue.

Important

The tail queue and the standard queue share the maximum queue length. If the maximum length is 10 and the standard queue occupies 6, then the tail queue can be at most 4. If the tail queue is full, writing to it will return a queue length error. Therefore, you need to regularly observe and clean up the tail queue.

When calling the API, you can declare access to the tail queue by adding the following HTTP Header:

X-EAS-QueueService-Access-Rear: true