All Products
Search
Document Center

ApsaraMQ for RocketMQ:Subscribe to messages

Last Updated:Mar 11, 2026

Subscribe to messages by using the ApsaraMQ for RocketMQ TCP client SDK for Java. This topic covers subscription modes, delivery modes, and complete code examples.

Subscription modes

ApsaraMQ for RocketMQ supports two subscription modes: clustering and broadcasting.

Clustering (default)

In clustering mode, all consumers in the same consumer group share the message load equally. Each message is delivered to only one consumer.

For example, if a topic contains 9 messages and the consumer group has 3 consumers, each consumer receives 3 messages.

// Clustering is the default mode. To set it explicitly:
properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);

Broadcasting

In broadcasting mode, every consumer in the group receives a full copy of each message.

For example, if a topic contains 9 messages and the consumer group has 3 consumers, each consumer receives all 9 messages.

// Set the subscription mode to broadcasting.
properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);
Note

Delivery modes

ApsaraMQ for RocketMQ supports two delivery modes: push and pull.

ModeDescriptionBest for
PushMessages are pushed from ApsaraMQ for RocketMQ to consumers. Also supports batch consumption.Most use cases
PullConsumers pull messages from ApsaraMQ for RocketMQ, providing more options in message receiving and more freedom in message pulling.Use cases that need fine-grained control over message fetching
Important
  • Pull consumers require an Enterprise Platinum Edition instance.

  • Pull consumers can connect only through a Virtual Private Cloud (VPC).

For the full list of pull consumer methods and parameters, see Methods and parameters.

Sample code

The following examples demonstrate push, batch push, and pull consumption. For more examples, see the ApsaraMQ for RocketMQ code repository on GitHub.

Replace the following placeholders with your actual values:

PlaceholderDescriptionWhere to find it
<your-group-id>Consumer group IDApsaraMQ for RocketMQ console
<your-tcp-endpoint>TCP endpointTCP Endpoint section on the Instance Details page in the console

All examples read AccessKey credentials from the ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET environment variables.

Push consumer

import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;

import java.util.Properties;

public class ConsumerTest {
   public static void main(String[] args) {
       Properties properties = new Properties();
       // The consumer group ID created in the ApsaraMQ for RocketMQ console.
       properties.put(PropertyKeyConst.GROUP_ID, "<your-group-id>");
       // Read AccessKey credentials from environment variables.
       properties.put(PropertyKeyConst.AccessKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
       properties.put(PropertyKeyConst.SecretKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
       // The TCP endpoint from the Instance Details page in the console.
       properties.put(PropertyKeyConst.NAMESRV_ADDR, "<your-tcp-endpoint>");
       // Clustering mode (default). Uncomment the line below to switch to broadcasting.
       // properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);
       // properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);

       Consumer consumer = ONSFactory.createConsumer(properties);
       // Subscribe to specific tags. Use "||" as a separator.
       consumer.subscribe("TopicTestMQ", "TagA||TagB", new MessageListener() {
           public Action consume(Message message, ConsumeContext context) {
               System.out.println("Receive: " + message);
               return Action.CommitMessage;
           }
       });

       // Subscribe to another topic with all tags.
       // To unsubscribe, remove the subscription code and restart the consumer.
       consumer.subscribe("TopicTestMQ-Other", "*", new MessageListener() {
           public Action consume(Message message, ConsumeContext context) {
               System.out.println("Receive: " + message);
               return Action.CommitMessage;
           }
       });

       consumer.start();
       System.out.println("Consumer Started");
   }
}

Push consumer (batch consumption)

Important

Batch consumption requires TCP client SDK for Java version 1.8.7.3 or later. For version details, see Release notes.

Two properties control batch behavior:

PropertyDescriptionValid valuesDefault
ConsumeMessageBatchMaxSizeMaximum messages per batch. The SDK invokes the callback when the cached count reaches this value.1 to 102432
BatchConsumeMaxAwaitDurationInSecondsMaximum wait time in seconds before delivering a partial batch. The SDK invokes the callback when this duration elapses, regardless of batch size.0 to 4500
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.batch.BatchConsumer;
import com.aliyun.openservices.ons.api.batch.BatchMessageListener;
import java.util.List;
import java.util.Properties;

import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.tcp.example.MqConfig;

public class SimpleBatchConsumer {

    public static void main(String[] args) {
        Properties consumerProperties = new Properties();
        // The consumer group ID created in the ApsaraMQ for RocketMQ console.
        consumerProperties.setProperty(PropertyKeyConst.GROUP_ID, MqConfig.GROUP_ID);
        // Read AccessKey credentials from environment variables.
        consumerProperties.put(PropertyKeyConst.AccessKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
        consumerProperties.put(PropertyKeyConst.SecretKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
        // The TCP endpoint from the Instance Details page in the console.
        consumerProperties.setProperty(PropertyKeyConst.NAMESRV_ADDR, MqConfig.NAMESRV_ADDR);

        // Set the maximum batch size to 128 messages.
        consumerProperties.setProperty(PropertyKeyConst.ConsumeMessageBatchMaxSize, String.valueOf(128));
        // Set the maximum wait time between batches to 10 seconds.
        consumerProperties.setProperty(PropertyKeyConst.BatchConsumeMaxAwaitDurationInSeconds, String.valueOf(10));

        BatchConsumer batchConsumer = ONSFactory.createBatchConsumer(consumerProperties);
        batchConsumer.subscribe(MqConfig.TOPIC, MqConfig.TAG, new BatchMessageListener() {

             @Override
            public Action consume(final List<Message> messages, ConsumeContext context) {
                System.out.printf("Batch-size: %d\n", messages.size());
                // Process messages in batches.
                return Action.CommitMessage;
            }
        });
        // Start the batch consumer.
        batchConsumer.start();
        System.out.println("Consumer start success.");

        // Keep the process alive to continue consuming.
        try {
            Thread.sleep(200000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

Pull consumer

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.PullConsumer;
import com.aliyun.openservices.ons.api.TopicPartition;
import java.util.List;
import java.util.Properties;
import java.util.Set;

public class PullConsumerClient {
    public static void main(String[] args){
        Properties properties = new Properties();
        // The consumer group ID created in the ApsaraMQ for RocketMQ console.
        properties.setProperty(PropertyKeyConst.GROUP_ID, "<your-group-id>");
        // Read AccessKey credentials from environment variables.
        properties.put(PropertyKeyConst.AccessKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
        properties.put(PropertyKeyConst.SecretKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
        // The TCP endpoint from the Instance Details page in the console.
        properties.put(PropertyKeyConst.NAMESRV_ADDR, "<your-tcp-endpoint>");
        PullConsumer consumer = ONSFactory.createPullConsumer(properties);
        // Start the pull consumer.
        consumer.start();
        // Get all partitions for the topic.
        Set<TopicPartition> topicPartitions = consumer.topicPartitions("topic-xxx");
        // Assign partitions to this consumer.
        consumer.assign(topicPartitions);

        while (true) {
            // Poll for messages with a 3-second timeout.
            List<Message> messages = consumer.poll(3000);
            System.out.printf("Received message: %s %n", messages);
        }
    }
}

For more information about partitions and offsets, see Terms.

Message retry

If message consumption fails or times out, ApsaraMQ for RocketMQ automatically redelivers the message. For more information, see Message retry.