This topic describes how to use the Java SDK of Message Queue for Apache RocketMQ to subscribe to messages.

Subscription modes

Message Queue for Apache RocketMQ supports the following modes:

  • Clustering subscription

    All consumers identified by the same group ID consume messages in an even manner. For example, a topic contains nine messages and a group contains three consumer instances. In this case, each instance consumes three messages.

        // The configuration of clustering subscription (default mode).
        properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);                
  • Broadcasting subscription

    Each of the consumers identified by the same group ID consumes all messages once. For example, a topic contains nine messages and a group contains three consumer instances. In this case, each instance consumes nine messages.

        // The configuration of broadcasting subscription.
        properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);                
Note
  • Maintain consistent subscription for all consumer instances with the same group ID. For more information, see Subscription consistency.
  • The two subscription patterns have different functional restrictions. For example, the broadcasting consumption pattern does not support ordered messages, consumption progress maintenance, or consumer offset resetting. For more information, see Clustering consumption and broadcasting consumption.

Sample code

For more information about the sample code, see Message Queue for Apache RocketMQ code library.

import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;

import java.util.Properties;

public class ConsumerTest {
   public static void main(String[] args) {
       Properties properties = new Properties();
        // The group ID you created in the console.
       properties.put(PropertyKeyConst.GROUP_ID, "XXX");
        // The AccessKey ID you created in the Alibaba Cloud console for identity authentication.
       properties.put(PropertyKeyConst.AccessKey, "XXX");
        // The AccessKey secret you created in the Alibaba Cloud console for identity authentication.
       properties.put(PropertyKeyConst.SecretKey, "XXX");
        // The TCP endpoint. Go to the Instances page in the Message Queue for Apache RocketMQ console, and view the endpoint in the Endpoint Information section.
       properties.put(PropertyKeyConst.NAMESRV_ADDR,
         "XXX");
          // Clustering subscription (default)
          // properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);
          // Broadcasting subscription
          // properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);

       Consumer consumer = ONSFactory.createConsumer(properties);
       consumer.subscribe("TopicTestMQ", "TagA||TagB", new MessageListener() { //Subscribe to multiple tags.
           public Action consume(Message message, ConsumeContext context) {
               System.out.println("Receive: " + message);
               return Action.CommitMessage;
           }
       });

        // Subscribe to another topic. To unsubscribe from this topic, delete the subscription code and restart the consumer.
        consumer.subscribe("TopicTestMQ-Other", "*", new MessageListener() { // Subscribe to all tags.
           public Action consume(Message message, ConsumeContext context) {
               System.out.println("Receive: " + message);
               return Action.CommitMessage;
           }
       });

       consumer.start();
       System.out.println("Consumer Started");
   }
}            

References

For the best practices of consumer throttling in Message Queue for Apache RocketMQ, see RocketMQ client traffic control design.