This topic describes how to use Message Queue for Apache RocketMQ SDK for Java to subscribe to messages.

Subscription modes

Message Queue for Apache RocketMQ supports the following two subscription modes:

  • Clustering subscription

    All consumers identified by the same group ID consume an equal number of messages. For example, a topic contains nine messages and a group contains three consumer instances. In clustering consumption mode, each instance consumes three messages. Set the value to CLUSTERING.

    // Configure clustering subscription, which is the default mode.
    properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);
  • Broadcasting subscription

    Each of the consumers identified by the same group ID consumes all messages once. For example, a topic contains nine messages and a group contains three consumer instances. In broadcasting consumption mode, each instance consumes nine messages. Set the value to BROADCASTING.

    // Configure broadcasting subscription.
    properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);               
Note
  • You must maintain consistent subscriptions for all consumer instances identified by the same group ID. For more information, see Subscription consistency.
  • The two subscription modes have different functional limits. For example, the broadcasting subscription mode does not support ordered messages, consumption progress maintenance, or consumer offset resetting. For more information, see Clustering consumption and broadcasting consumption.

Modes for obtaining messages

Message Queue for Apache RocketMQ supports the following two modes for obtaining messages:

  • Push: Message Queue for Apache RocketMQ pushes messages to consumers. In push mode, you can configure batch consumption, and then Message Queue for Apache RocketMQ pushes messages to consumers in batches. For more information, see Batch consumption.
  • Pull: Consumers proactively pull messages from Message Queue for Apache RocketMQ.

The pull mode provides more options to subscribe to messages and gives you more control over message pulling than the push mode. For more information of the methods in the two modes, see Methods and parameters.

Notice To use the pull mode, make sure that the type of your Message Queue for Apache RocketMQ instance is Enterprise Platinum Edition.

Sample code

For more information about the sample code, see Message Queue for Apache RocketMQ code library. This section provides the following sample code of the push and pull modes:

  • Push mode
    import com.aliyun.openservices.ons.api.Action;
    import com.aliyun.openservices.ons.api.ConsumeContext;
    import com.aliyun.openservices.ons.api.Consumer;
    import com.aliyun.openservices.ons.api.Message;
    import com.aliyun.openservices.ons.api.MessageListener;
    import com.aliyun.openservices.ons.api.ONSFactory;
    import com.aliyun.openservices.ons.api.PropertyKeyConst;
    
    import java.util.Properties;
    
    public class ConsumerTest {
       public static void main(String[] args) {
           Properties properties = new Properties();
            // The group ID that you created in the Message Queue for Apache RocketMQ console.
           properties.put(PropertyKeyConst.GROUP_ID, "XXX");
            // The AccessKey ID that you created in the Alibaba Cloud Resource Access Management (RAM) console for identity verification.
           properties.put(PropertyKeyConst.AccessKey, "XXX");
            // The AccessKey secret that you created in the Alibaba Cloud RAM console for identity verification.
           properties.put(PropertyKeyConst.SecretKey, "XXX");
            // The TCP endpoint. You can view the endpoint in the TCP Client Endpoint section on the Instance Details page in the Message Queue for Apache RocketMQ console.
           properties.put(PropertyKeyConst.NAMESRV_ADDR, "XXX");
              // The clustering subscription mode (default).
              // properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);
              // The broadcasting subscription mode.
              // properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);
    
           Consumer consumer = ONSFactory.createConsumer(properties);
           consumer.subscribe("TopicTestMQ", "TagA||TagB", new MessageListener() { // Subscribes to multiple tags.
               public Action consume(Message message, ConsumeContext context) {
                   System.out.println("Receive: " + message);
                   return Action.CommitMessage;
               }
           });
    
            // Subscribe to another topic. To unsubscribe from this topic, delete the subscription code and restart the consumer.
            consumer.subscribe("TopicTestMQ-Other", "*", new MessageListener() { // Subscribes to all tags.
               public Action consume(Message message, ConsumeContext context) {
                   System.out.println("Receive: " + message);
                   return Action.CommitMessage;
               }
           });
    
           consumer.start();
           System.out.println("Consumer Started");
       }
    }            
  • Push mode (batch consumption)
    Notice To configure batch consumption in Message Queue for Apache RocketMQ, you must upgrade your TCP-based SDK for Java to version 1.8.7.3 or later. For more information about versions and access methods of SDK for Java, see Release notes of SDK for Java.
    import com.aliyun.openservices.ons.api.Action;
    import com.aliyun.openservices.ons.api.ConsumeContext;
    import com.aliyun.openservices.ons.api.Message;
    import com.aliyun.openservices.ons.api.batch.BatchConsumer;
    import com.aliyun.openservices.ons.api.batch.BatchMessageListener;
    import java.util.List;
    import java.util.Properties;
    
    import com.aliyun.openservices.ons.api.ONSFactory;
    import com.aliyun.openservices.ons.api.PropertyKeyConst;
    import com.aliyun.openservices.tcp.example.MqConfig;
    
    public class SimpleBatchConsumer {
    
        public static void main(String[] args) {
            Properties consumerProperties = new Properties();
            consumerProperties.setProperty(PropertyKeyConst.GROUP_ID, MqConfig.GROUP_ID);
            consumerProperties.setProperty(PropertyKeyConst.AccessKey, MqConfig.ACCESS_KEY);
            consumerProperties.setProperty(PropertyKeyConst.SecretKey, MqConfig.SECRET_KEY);
            consumerProperties.setProperty(PropertyKeyConst.NAMESRV_ADDR, MqConfig.NAMESRV_ADDR);
    
            // Set the maximum number of messages to be consumed in a batch. When the number of messages accumulated under a specified topic reaches this value, the SDK immediately calls the callback method to consume these messages. Default value: 32. Valid values: 1 to 1024. In this example, the value is set to 128.
            consumerProperties.setProperty(PropertyKeyConst.ConsumeMessageBatchMaxSize, String.valueOf(128));
            // Set the maximum wait time between batches. The SDK immediately calls the callback method to consume messages after the specified wait time. Default value: 0. Valid values: 0 to 450. Unit: seconds. In this example, the value is set to 10 seconds.
            consumerProperties.setProperty(PropertyKeyConst.BatchConsumeMaxAwaitDurationInSeconds, String.valueOf(10));
    
            BatchConsumer batchConsumer = ONSFactory.createBatchConsumer(consumerProperties);
            batchConsumer.subscribe(MqConfig.TOPIC, MqConfig.TAG, new BatchMessageListener() {
    
                 @Override
                public Action consume(final List<Message> messages, ConsumeContext context) {
                    System.out.printf("Batch-size: %d\n", messages.size());
                    // Process messages in batches.
                    return Action.CommitMessage;
                }
            });
            // Start BatchConsumer.
            batchConsumer.start();
            System.out.println("Consumer start success.") ;
    
            // Wait for a specified period of time to prevent the process from exiting.
            try {
                Thread.sleep(200000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }         
  • Pull mode
    import com.aliyun.openservices.ons.api.Message;
    import com.aliyun.openservices.ons.api.ONSFactory;
    import com.aliyun.openservices.ons.api.PropertyKeyConst;
    import com.aliyun.openservices.ons.api.PullConsumer;
    import com.aliyun.openservices.ons.api.TopicPartition;
    import java.util.List;
    import java.util.Properties;
    import java.util.Set;
    
    public class PullConsumerClient {
        public static void main(String[] args){
            Properties properties = new Properties();
            properties.setProperty(PropertyKeyConst.GROUP_ID, "GID-xxxxx");
            // The AccessKey ID that you created in the Alibaba Cloud RAM console for identity verification.
            properties.put(PropertyKeyConst.AccessKey, "xxxxxxx");
            // The AccessKey secret that you created in the Alibaba Cloud RAM console for identity verification.
            properties.put(PropertyKeyConst.SecretKey, "xxxxxxx");
            // The TCP endpoint. You can view the endpoint in the TCP Client Endpoint section on the Instance Details page in the Message Queue for Apache RocketMQ console.
            properties.put(PropertyKeyConst.NAMESRV_ADDR, "xxxxx");
            PullConsumer consumer = ONSFactory.createPullConsumer(properties);
            // Start the consumer.
            consumer.start();
            // Query all partitions of topic-xxx.
            Set<TopicPartition> topicPartitions = consumer.topicPartitions("topic-xxx");
            // Specify a partition from which you want to pull messages.
            consumer.assign(topicPartitions);
    
            while (true) {
                // Pull messages. Set the timeout period to 3,000 milliseconds.
                List<Message> messages = consumer.poll(3000);
                System.out.printf("Received message: %s %n", messages);
            }
        }
    }

    For more information about partitions and consumer offsets, see Terms.

References

For more information about best practices of consumer-side throttling in Message Queue for Apache RocketMQ, see Throttling design on the Message Queue for Apache RocketMQ client.