All Products
Search
Document Center

ApsaraMQ for RocketMQ:Subscribe to messages

Last Updated:Nov 24, 2023

This topic describes how to subscribe to messages by using the TCP client SDK for Java provided by ApsaraMQ for RocketMQ.

Subscription modes

ApsaraMQ for RocketMQ supports the following subscription modes:

  • Clustering subscription

    All consumers identified by the same group ID consume an equal number of messages. For example, a topic contains nine messages and a consumer group contains three consumers. In clustering consumption mode, each consumer consumes three messages. The following code shows how to configure the clustering subscription mode:

    // Configure clustering subscription, which is the default mode. 
    properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);
  • Broadcasting subscription

    Each of the consumers identified by the same group ID consumes all messages once. For example, a topic contains nine messages and a consumer group contains three consumers. In broadcasting consumption mode, each consumer consumes nine messages. The following code shows how to configure the broadcasting subscription mode:

    // Configure broadcasting subscription. 
    properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);               
Note
  • You must maintain consistent subscriptions for all consumer instances identified by the same group ID. For more information, see Subscription consistency.

  • Different limits are imposed on the preceding subscription modes. For example, in broadcasting subscription mode, you cannot send or receive ordered messages, maintain consumption progress, or reset consumer offsets. For more information, see Clustering consumption and broadcasting consumption.

Modes for obtaining messages

ApsaraMQ for RocketMQ allows you to obtain messages by using one of the following modes:

  • Push: Messages are pushed from ApsaraMQ for RocketMQ to consumers. In push mode, ApsaraMQ for RocketMQ supports the batch consumption feature. This feature allows you to send messages to consumers in a batch. For more information, see Batch consumption.

  • Pull: Messages are pulled from ApsaraMQ for RocketMQ by consumers.

Compared with the push mode, the pull mode provides more options in message receiving and allows you more freedom in message pulling. For more information, see Methods and parameters.

Important
  • To use pull consumers, make sure that your ApsaraMQ for RocketMQ instance is of the Enterprise Platinum Edition.

  • Pull consumers can access ApsaraMQ for RocketMQ instances only in virtual private clouds (VPCs).

Sample code

For information about the detailed sample code, see the ApsaraMQ for RocketMQ code repository. The following items provide the sample code on how to subscribe to messages by using the push and pull modes.

  • Push mode

    import com.aliyun.openservices.ons.api.Action;
    import com.aliyun.openservices.ons.api.ConsumeContext;
    import com.aliyun.openservices.ons.api.Consumer;
    import com.aliyun.openservices.ons.api.Message;
    import com.aliyun.openservices.ons.api.MessageListener;
    import com.aliyun.openservices.ons.api.ONSFactory;
    import com.aliyun.openservices.ons.api.PropertyKeyConst;
    
    import java.util.Properties;
    
    public class ConsumerTest {
       public static void main(String[] args) {
           Properties properties = new Properties();
           // The ID of the consumer group that you created in the ApsaraMQ for RocketMQ console. 
           properties.put(PropertyKeyConst.GROUP_ID, "XXX");
           // Make sure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are configured. 
           // The AccessKey ID that is used for authentication. 
           properties.put(PropertyKeyConst.AccessKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
           // The AccessKey secret that is used for authentication. 
           properties.put(PropertyKeyConst.SecretKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
           // The TCP endpoint. You can obtain the endpoint in the TCP Endpoint section of the Instance Details page in the ApsaraMQ for RocketMQ console. 
           properties.put(PropertyKeyConst.NAMESRV_ADDR, "XXX");
              // The clustering subscription mode, which is the default mode. 
              // properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);
              // The broadcasting consumption mode. 
              // properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);
    
           Consumer consumer = ONSFactory.createConsumer(properties);
            // Subscribe to multiple tags. 
           consumer.subscribe("TopicTestMQ", "TagA||TagB", new MessageListener() { 
               public Action consume(Message message, ConsumeContext context) {
                   System.out.println("Receive: " + message);
                   return Action.CommitMessage;
               }
           });
    
            // Subscribe to another topic. To unsubscribe from a topic, delete the code for subscription and restart the consumer. 
            // Subscribe to all tags. 
            consumer.subscribe("TopicTestMQ-Other", "*", new MessageListener() { 
               public Action consume(Message message, ConsumeContext context) {
                   System.out.println("Receive: " + message);
                   return Action.CommitMessage;
               }
           });
    
           consumer.start();
           System.out.println("Consumer Started");
       }
    }            
  • Push mode (batch consumption)

    Important

    To use the batch consumption feature provided by ApsaraMQ for RocketMQ, upgrade your TCP client SDK for Java to version 1.8.7.3 or later. For more information, see Release notes.

    import com.aliyun.openservices.ons.api.Action;
    import com.aliyun.openservices.ons.api.ConsumeContext;
    import com.aliyun.openservices.ons.api.Message;
    import com.aliyun.openservices.ons.api.batch.BatchConsumer;
    import com.aliyun.openservices.ons.api.batch.BatchMessageListener;
    import java.util.List;
    import java.util.Properties;
    
    import com.aliyun.openservices.ons.api.ONSFactory;
    import com.aliyun.openservices.ons.api.PropertyKeyConst;
    import com.aliyun.openservices.tcp.example.MqConfig;
    
    public class SimpleBatchConsumer {
    
        public static void main(String[] args) {
            Properties consumerProperties = new Properties();
            // The ID of the consumer group that you created in the ApsaraMQ for RocketMQ console. 
            consumerProperties.setProperty(PropertyKeyConst.GROUP_ID, MqConfig.GROUP_ID);
            // Make sure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are configured. 
            // The AccessKey ID that is used for authentication. 
            properties.put(PropertyKeyConst.AccessKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
            // The AccessKey secret that is used for authentication. 
            properties.put(PropertyKeyConst.SecretKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
            // The TCP endpoint. You can obtain the endpoint in the TCP Endpoint section of the Instance Details page in the ApsaraMQ for RocketMQ console. 
            consumerProperties.setProperty(PropertyKeyConst.NAMESRV_ADDR, MqConfig.NAMESRV_ADDR);
    
            // The maximum number of messages to be consumed at a time. In this example, the value is specified as 128. If the number of messages cached in the specified topic reaches this value, the SDK immediately calls the callback method for the consumer to consume the messages. Valid values: 1 to 1024. Default value: 32. 
            consumerProperties.setProperty(PropertyKeyConst.ConsumeMessageBatchMaxSize, String.valueOf(128));
            // The maximum wait time between two consecutive batches. In this example, the value is specified as 10 seconds. If the specified wait time is reached, the SDK immediately calls the callback method for the consumer to consume messages. Valid values: 0 to 450. Default value: 0. Unit: seconds. 
            consumerProperties.setProperty(PropertyKeyConst.BatchConsumeMaxAwaitDurationInSeconds, String.valueOf(10));
    
            BatchConsumer batchConsumer = ONSFactory.createBatchConsumer(consumerProperties);
            batchConsumer.subscribe(MqConfig.TOPIC, MqConfig.TAG, new BatchMessageListener() {
    
                 @Override
                public Action consume(final List<Message> messages, ConsumeContext context) {
                    System.out.printf("Batch-size: %d\n", messages.size());
                    // Process messages in batches. 
                    return Action.CommitMessage;
                }
            });
            // Start the consumer for batch consumption. 
            batchConsumer.start();
            System.out.println("Consumer start success.");
    
            // Wait for a specific period of time to prevent the process from exiting. 
            try {
                Thread.sleep(200000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }         
  • Pull mode

    import com.aliyun.openservices.ons.api.Message;
    import com.aliyun.openservices.ons.api.ONSFactory;
    import com.aliyun.openservices.ons.api.PropertyKeyConst;
    import com.aliyun.openservices.ons.api.PullConsumer;
    import com.aliyun.openservices.ons.api.TopicPartition;
    import java.util.List;
    import java.util.Properties;
    import java.util.Set;
    
    public class PullConsumerClient {
        public static void main(String[] args){
            Properties properties = new Properties();
            // The ID of the consumer group that you created in the ApsaraMQ for RocketMQ console. 
            properties.setProperty(PropertyKeyConst.GROUP_ID, "GID-xxxxx");
            // Make sure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are configured. 
            // The AccessKey ID that is used for authentication. 
            properties.put(PropertyKeyConst.AccessKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
            // The AccessKey secret that is used for authentication. 
            properties.put(PropertyKeyConst.SecretKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
            // The TCP endpoint. You can obtain the endpoint in the TCP Endpoint section of the Instance Details page in the ApsaraMQ for RocketMQ console. 
            properties.put(PropertyKeyConst.NAMESRV_ADDR, "xxxxx");
            PullConsumer consumer = ONSFactory.createPullConsumer(properties);
            // Start the consumer. 
            consumer.start();
            // Query all partitions in topic-xxx. 
            Set<TopicPartition> topicPartitions = consumer.topicPartitions("topic-xxx");
            // Specify the partition from which you want to pull messages. 
            consumer.assign(topicPartitions);
    
            while (true) {
                // Pull messages. Specify the timeout period as 3,000 milliseconds. 
                List<Message> messages = consumer.poll(3000);
                System.out.printf("Received message: %s %n", messages);
            }
        }
    }

    For information about partitions and offsets, see Terms.

Additional information

For information about the best practices of consumer throttling in ApsaraMQ for RocketMQ, see RocketMQ client traffic control design.