ApsaraMQ for RocketMQ supports two types of consumers: Push consumers and Simple consumers. This topic describes their usage, implementation principles, reliability, retry mechanisms, and applicable scenarios.
Background information
ApsaraMQ for RocketMQ provides different consumer types for different business scenarios. Each consumer type has different integration and control methods. Answering the following questions can help you choose the consumer type that best suits your business scenario.
Concurrent consumption: How can a consumer use a multi-threaded mechanism to process messages and improve processing efficiency?
Synchronous and asynchronous message processing: For different integration scenarios, a consumer might need to asynchronously distribute received messages to the business logic for processing. How is asynchronous message processing implemented?
Reliable message processing: How does a consumer return a response after processing a message? How are retries handled in case of an exception to ensure reliable message processing?
For answers to these questions, see the Push consumer and Simple consumer sections.
Feature overview

As shown in the preceding figure, when a consumer in ApsaraMQ for RocketMQ processes a message, it goes through the following stages: receiving the message, processing the message, and committing the consumption status.
For these stages, ApsaraMQ for RocketMQ provides two consumer types: Push and Simple. These two consumer types use different implementation methods and interfaces to meet your consumption needs in various business scenarios. The following table describes their differences.
If your business scenario changes, or if the consumer type you are using is no longer suitable for your business, you can change the consumer type. Changing the consumer type does not affect your existing ApsaraMQ for RocketMQ resources or business processing.
Feature | Push consumer | Simple consumer |
Interface method | Returns the consumption result through a listener callback. Consumption logic must be handled within the listener. | Your application implements message processing and calls an interface to return the consumption result. |
Concurrency management | The SDK manages concurrency. | Your application logic manages consumption threads. |
Flexibility | Highly encapsulated and less flexible. | Atomic and highly customizable. |
Scenarios | Ideal for development scenarios that do not require custom workflows. | Ideal for development scenarios that require highly customized business workflows. |
Corresponding SDK class | PushConsumer, LitePushConsumer | SimpleConsumer |
Push-type consumers
A Push consumer is a highly encapsulated consumer. You only need to process messages and return consumption results through a message listener. The ApsaraMQ for RocketMQ client software development kit (SDK) handles message acquisition, consumption status submission, and consumption retries.
Usage
Using Push consumers is straightforward. You register a message listener during consumer initialization and implement the message processing logic within the listener. The ApsaraMQ for RocketMQ SDK handles message acquisition, listener invocation, and retry processing in the background. For sample code, see the corresponding SDK classes.
The message listener for a Push consumer returns one of the following three results:
Consumption success: For example, when you use the SDK for Java,
ConsumeResult.SUCCESSis returned. This indicates that the message was processed successfully. The server then updates the consumption progress based on this result.Consumption failure: For example, when you use the SDK for Java,
ConsumeResult.FAILUREis returned. This indicates that the message failed to be processed. The system then determines whether to retry consumption based on the retry policy.Unexpected failure: If an exception is thrown, the result is treated as a consumption failure. The system then determines whether to retry consumption based on the retry policy.
When a Push consumer consumes a message, if an unexpected block in the processing logic prevents the message from being processed, the SDK times out. It then forcibly submits a consumption failure result and handles the message based on the retry policy. For more information about message timeouts, see PushConsumer retry policy.
When a consumption timeout occurs, the SDK submits a consumption failure result. However, the current consumption thread may not respond to the interruption and may continue to process the message.
Internal mechanism
For Push consumers, real-time message processing is based on the typical Reactor thread model within the SDK. As shown in the following figure, the SDK has a built-in long-polling thread. This thread asynchronously pulls messages into the SDK's internal cache queue. The messages are then submitted to consumer threads, which triggers the listener to execute the local consumption logic.

Reliability and retries
For Push consumers, the only boundary between the client SDK and the consumption logic is the message listener interface. The client SDK strictly determines whether a message is consumed successfully based on the result returned by the listener and performs retries to ensure reliability. All messages must be processed synchronously. The call result must be returned when the listener interface finishes. Asynchronous distribution is not allowed. For more information about message retries, see PushConsumer retry policy.
When you use a PushConsumer to consume messages, do not process messages in the following ways. Otherwise, ApsaraMQ for RocketMQ cannot guarantee message reliability.
Incorrect method 1: Returning a success result before message processing is complete. If the message processing later fails, the ApsaraMQ for RocketMQ server is not aware of the failure and will not retry the consumption.
Incorrect method 2: Redistributing the message to other custom threads within the message listener and returning the consumption result early. If the message processing later fails, the ApsaraMQ for RocketMQ server is also unaware of the failure and will not retry the consumption.
Order guarantee
Based on the definition of ordered messages in ApsaraMQ for RocketMQ, if a consumer group is set to the ordered consumption mode, a Push consumer strictly follows the message order when it invokes the message listener. The consumption order is guaranteed without any changes to the business logic.
Ordered message processing requires synchronous submission. If your business logic implements custom asynchronous distribution, ApsaraMQ for RocketMQ cannot guarantee the message order.
Scenarios
Push consumers strictly enforce synchronous message processing and a processing timeout for each message. They are suitable for the following scenarios:
Predictable message processing time: If the processing time is uncertain and messages often take longer than expected to process, the reliability guarantee of a Push consumer frequently triggers retries. This can cause many duplicate messages.
No asynchronous processing or advanced customization: Push consumers restrict the thread model for consumption logic. The client SDK internally triggers message processing at maximum throughput. This model simplifies development but does not allow for asynchronous processing or custom workflows.
Corresponding SDK classes
ApsaraMQ for RocketMQ provides two SDK classes for Push consumers: PushConsumer and LitePushConsumer.
PushConsumer: Suitable for consuming messages from topics that are not the Lite type.
LitePushConsumer: Dedicated to consuming messages from Lite-type topics. It allows consumption control at the Lite topic granularity.
PushConsumer
Sample code:
// Sample consumption: Use a PushConsumer to consume normal messages.
ClientServiceProvider provider = ClientServiceProvider.loadService();
String topic = "Your Topic";
FilterExpression filterExpression = new FilterExpression("Your Filter Tag", FilterExpressionType.TAG);
PushConsumer pushConsumer = provider.newPushConsumerBuilder()
// Set the consumer group.
.setConsumerGroup("Your ConsumerGroup")
// Set the endpoint.
.setClientConfiguration(ClientConfiguration.newBuilder().setEndpoints("Your Endpoint").build())
// Set the pre-bound subscription relationship.
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
// Set the message listener.
.setMessageListener(new MessageListener() {
@Override
public ConsumeResult consume(MessageView messageView) {
// Consume the message and return the processing result.
return ConsumeResult.SUCCESS;
}
})
.build();LitePushConsumer
Sample code:
// Sample consumption: Use a LitePushConsumer to consume normal messages.
ClientServiceProvider provider = ClientServiceProvider.loadService();
LitePushConsumer litePushConsumer = provider.newLitePushConsumerBuilder()
// Set the endpoint.
.setClientConfiguration(ClientConfiguration.newBuilder().setEndpoints("Your Endpoint").build())
// Set the topic.
.bindTopic("Your Topic")
// Set the consumer group.
.setConsumerGroup("Your ConsumerGroup")
// Set the message listener.
.setMessageListener(messageView -> {
// Consume the message and return the processing result.
return ConsumeResult.SUCCESS;
})
.build();
// Subscribe to the target Lite topics.
litePushConsumer.subscribeLite("Your Lite Topic 1");
litePushConsumer.subscribeLite("Your Lite Topic 2");Simple consumers
Simple consumers are a type of consumer that supports atomic operations for message processing. Your business logic calls operations to retrieve messages, commit the consumption status, and retry consumption.
Usage
Using a Simple consumer involves calling multiple API operations from your business logic. You call operations to retrieve messages and distribute them to business threads for processing. After processing, you call the commit operation to return the result to the server. For sample code, see the corresponding SDK class.
Retry for reliability
For Simple consumers, the client software development kit (SDK) and the server communicate using the ReceiveMessage and AckMessage operations. If the client SDK successfully processes a message, it calls the AckMessage operation. If processing fails, do not send an ACK response. This triggers a retry after the specified message invisibility duration ends. For more information, see SimpleConsumer retry policy.
Ensured message order
A Simple consumer processes ordered messages from ApsaraMQ for RocketMQ in the order they are stored. For a group of messages that must remain in order, if a preceding message is not processed, the subsequent messages cannot be retrieved.
Scenarios
SimpleConsumer provides atomic API operations to retrieve messages and commit consumption results. This method is more flexible than using PushConsumer. SimpleConsumer is suitable for the following scenarios:
Unpredictable message processing duration: Use SimpleConsumer if the message processing duration cannot be estimated or if messages often take a long time to process. You can specify an estimated processing duration during consumption. If the estimate is not suitable for your business needs, you can change it by calling an API operation.
Advanced custom scenarios: The SimpleConsumer SDK does not have complex thread encapsulation. Your business logic has full control. This lets you implement advanced scenarios such as asynchronous distribution and batch consumption.
Custom consumption rate: With SimpleConsumer, your business logic actively calls operations to retrieve messages. This lets you adjust the frequency of retrieving messages to control the consumption rate.
Corresponding SDK class
ApsaraMQ for RocketMQ provides one SDK class for Simple consumers: SimpleConsumer. This class offers extensive custom operations. SimpleConsumer cannot consume messages from Lite topics.
SimpleConsumer
The following code provides an example:
// Consumption example: Use a SimpleConsumer to consume normal messages. Actively get messages, process them, and commit the results.
ClientServiceProvider provider = ClientServiceProvider.loadService();
String topic = "Your Topic";
FilterExpression filterExpression = new FilterExpression("Your Filter Tag", FilterExpressionType.TAG);
SimpleConsumer simpleConsumer = provider.newSimpleConsumerBuilder()
// Set the consumer group.
.setConsumerGroup("Your ConsumerGroup")
// Set the endpoint.
.setClientConfiguration(ClientConfiguration.newBuilder().setEndpoints("Your Endpoint").build())
// Set the pre-bound subscription relationships.
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
.build();
List<MessageView> messageViewList = null;
try {
// A SimpleConsumer must actively get and process messages.
messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30));
messageViewList.forEach(messageView -> {
System.out.println(messageView);
// After processing is complete, actively call ack() to commit the consumption result.
try {
simpleConsumer.ack(messageView);
} catch (ClientException e) {
e.printStackTrace();
}
});
} catch (ClientException e) {
// If the pull fails due to reasons such as system throttling, you must re-initiate the request to get messages.
e.printStackTrace();
}SimpleConsumer mainly involves the following API operations:
Interface Name | Key features | Modifiable parameters |
| The consumer actively calls this operation to get messages from the server. Note Because the server uses distributed storage, it may return an empty result even if messages exist on the server. To resolve this, call the ReceiveMessage operation again or increase the concurrency of ReceiveMessage calls. |
|
| After a consumer successfully consumes a message, it actively calls this operation to return a success response to the server. | None |
| In a consumption retry scenario, a consumer can call this operation to change the message processing duration, which controls the retry interval. | Message invisibility duration: Call this operation to change the value of the message invisibility duration preset in the |
Recommendations
Control consumption time for push consumers
Strictly control the message consumption time for Push consumers to avoid reprocessing due to timeouts. If your application frequently processes messages that take a long time, use Simple consumers and set a message invisibility duration.