All Products
Search
Document Center

Platform For AI:Subscription and pushing of queue service

Last Updated:Feb 05, 2024

This topic describes how to subscribe to and push data of the queue service.

You can subscribe to the queue service to have the data pushed to your client in time. This method avoids the latency of using Get API and does not increase the load of the queue service. The subscription and pushing of queue service involve many additional concepts and may be complicated to use. This topic describes these additional concepts to facilitate use of the queue service.

Consumer

A consumer refers to a client program that subscribes to data from the queue service. When the client uses the Watch API to call data, a consumer object is created in the queue service. The parameters that you add in the API, such as the size of the window or the tags, are used as attributes of the consumer. You can use the Attribute API to obtain the consumer status in the queue service. Example:

[OK] Attributes: 
consumers.list.[0] : Id: default_group.u1, Index: 0, Pending: 0, Status: Complete, Idle: 2.091s, Window: 0, Slots: 0, AutoCommit: true
consumers.list.[1] : Id: default_group.u2, Index: 0, Pending: 0, Status: Complete, Idle: 1.124s, Window: 0, Slots: 0, AutoCommit: true
consumers.stats.total : 2

In the preceding example, consumers.stats.total is the total number of consumers, and consumers.list is a list of consumers. The following table describes the columns.

Parameter

Description

Id

The ID of the consumer in the <Consumer group ID. Consumer Id> format.

Index

The index of the data that the consumer is consuming.

Pending

The amount of data that is being processed but is not committed by the current consumer.

Status

The status of the consumer. Valid values:

  • Running: The consumer is running.

  • Exit: The consumer exits for a long period and no data is consumed.

  • Complete: The consumer exits and data is consumed.

  • Leaving: The consumer exits for a short period.

Window

The size of the consumer window, which is the maximum amount of pushed data.

Slots

The number of idle windows. A value of 0 indicates that the window is occupied.

AutoCommit

Indicates whether to automatically commit data after data is sent.

Tags

The filter conditions of the consumer.

Note

When you use the Watch API with tags, make sure that the tags used by consumers in the same consumer group are all the same.

If you tag data when using the Watch API, an additional Tags column is included in the response. Example:

consumers.list.[0] : Id: ..., Pending: 0, ..., Window: ..., Tags: tags[foo=bar]

The column indicates the data tags to which the consumer subscribes. Data is delivered to the consumer only when the data meets the conditions.

Consumer group

A consumer group is a collection of consumers who subscribe to a queue service with the same filter conditions. The names of consumers in a consumer group must be unique. Consumers in different groups can have the same name.

Data is evenly distributed to consumers within a consumer group, and is distributed to all consumers among consumer groups in parallel. For example:

  • If you have multiple consumers in the same group, then the data is distributed evenly among these consumers, and consumers receive different data.

  • If you have multiple consumers in multiple groups, the consumers in different groups receive the same data.

Important

If a data entry is deleted by a consumer through API, consumers in other groups will not receive the data.

You can use the Attribute API to obtain the consumer status in the queue service. Example:

groups.list.[0] : Id: default_group, Index: 0, Pending: 0, Delivered: 0, Consumers: 1
groups.list.[1] : Id: group, Index: 0, Pending: 0, Delivered: 1, Consumers: 0

In the preceding example, groups.list is a list of consumers. The following table describes the columns.

Parameter

Description

Id

The ID of the consumer group.

Index

The index that is being consumed by the current consumer group, which is the largest index of consumers in the consumer group.

Pending

The amount of data that is being processed but not committed by the current consumer group.

Delivered

The number of pushed messages.

Consumers

The amount of consumed data in the consumer group.

The number of consumer groups that can be created is not limited. Consumer groups are retained after they are created and will not be automatically deleted.

Use consumer and consumer group

You can declare the consumer and consumer group through HTTP headers in the Watch API, or when you initialize your client in the SDK. You can also obtain the key of the HTTP header by using the Attributes API.

meta.header.group : X-EAS-QueueService-Gid
meta.header.user : X-EAS-QueueService-Uid

In the preceding example, X-EAS-QueueService-Uid and X-EAS-QueueService-Gid are used to declare the consumer ID and the consumer group ID.

Commit and Negative

The queue service supports two consumption modes: Commit and Negative. Both modes operate on the index of data, but have different meanings.

  • In the Commit mode, the consumer receives and processes the data. The queue service can push another batch of data to the consumer.

  • In the Negative mode, the consumer receives the data, but cannot process the data. Whether the queue service pushes another batch of data depends on the error code. You can declare the cause and error code in text in the Negative mode, so that the data is pushed to other consumers. The following table describes the error codes that the queue service can process.

    Code

    Description

    Shutdown

    Indicates that the consumer is exiting and the queue service does not continue to push data.

Data rebalance

You may be unable to commit data in the following scenarios:

  • A rolling update of the prediction service is in progress. The data processing is paused on some consumers. The data being processed cannot be committed.

  • Some internal error occurs and the consumer crashes.

  • The consumer cannot process the received data and executes a Negative commit.

The data that cannot be processed is pushed to other consumers by the queue service. This process is called data rebalance. Data rebalance is performed in the following cases:

  • Any consumer enters the Exit state.

  • The consumer does not receive newly pushed data when the Window is idle.

The queue service counts the delivery of each data entry. One delivery is counted when the data is rebalanced and distributed. If the data entry is delivered more times than the maximum delivery number, the data is processed as a dead letter message. The queue service executes the configured dead-letter policy. By default, data is delivered to the tail queue.

Tail queue

A tail queue is an auxiliary queue and is used to store data that is not pushed to consumers, such as dead letters or custom control data. A tail queue is also a queue in the queue service and has the same API as other queues. Both input and output queues have a tail queue.

Important

The tail queue and normal queues share the maximum queue length. For example, if the maximum length of a queue is 10 and the normal queue length is 6, then the maximum length of the tail queue is 4. In this case, if you continue to write data after the tail queue length reaches 4, an error indicating that the queue is too long is returned. We recommend that you clean up the tail queue on a regular basis.

You can add an additional HTTP header to access the tail queue when you call an API. Example:

X-EAS-QueueService-Access-Rear: true