Message Queue for Apache RocketMQ SDK for Java is used to send and subscribe to messages. In Message Queue for Apache RocketMQ, subscribers can obtain messages in push or pull mode. This topic describes the methods and parameters for sending and subscribing to messages.

Background information

Message Queue for Apache RocketMQ supports the following two modes for obtaining messages:

  • Push: Message Queue for Apache RocketMQ pushes messages to consumers. In push mode, you can configure batch consumption. Then, Message Queue for Apache RocketMQ pushes messages to consumers in batches. For more information, see Batch consumption.
  • Pull: Consumers proactively pull messages from Message Queue for Apache RocketMQ.

The pull mode provides more options to subscribe to messages and gives you more control over message pulling than the push mode.

Notice To use the pull mode, make sure that the type of your Message Queue for Apache RocketMQ instance is Enterprise Platinum Edition.

Common parameters

Parameter Description
NAMESRV_ADDR The TCP endpoint. You can obtain it on the Instance Details page in the Message Queue for Apache RocketMQ console.
AccessKey The AccessKey ID that you created in the Alibaba Cloud Management Console for identity verification.
SecretKey The AccessKey secret that you created in the Alibaba Cloud Management Console for identity verification.
OnsChannel The source of the user. The value is ALIYUN by default and CLOUD for CloudTmall users.

Methods for sending messages

messagesendinterface

Parameters for sending messages

Parameter Description
SendMsgTimeoutMillis The timeout period of sending messages, in milliseconds. Default value: 3000.
CheckImmunityTimeInSeconds (for transactional messages) The shortest time interval before the first recheck of a transactional message, in seconds.
shardingKey (for ordered messages) The shard key that is used to determine the partitions to which ordered messages are distributed.

Methods for subscribing to messages

Figure 1. Methods in push mode
consumeinterface
Figure 2. Methods in pull mode
pull_consumer

The following content describes the methods in pull mode:

public interface PullConsumer extends Admin {

    /**
     * Query partition information of a topic. All the partitions of the topic are returned. You can call this method only after you run the Pull Consumer Start command. 
     */
    Set<TopicPartition> topicPartitions(String topic);

    /**
     * Specify a partition from which messages will be pulled. This method does not implement rebalance. Therefore, you must make sure that messages in all partitions can be consumed. If this method is called multiple times, the system replaces the partitions that the consumers previously subscribed instead of incrementally increasing the number of subscribed partitions. 
     */
    void assign(Collection<TopicPartition> topicPartitions);

    /**
     * Pull messages. The maxBatchMessageCount parameter specifies the maximum number of messages that can be pulled at a time. You can specify a timeout period in milliseconds. 
     */
    List<Message> poll(long timeout);

    /**
     * Reset the consumer offset of a specified partition to a specified position. The position must be between the minimum offset and the maximum offset of the partition. You can call this method only after you run the Pull Consumer Start command. The specified partition must be subscribed to. 
     */
    void seek(TopicPartition topicPartition, long offset);

    /**
     * Reset the consumer offset of a specified partition to the minimum offset of the partition. You can call this method only after you run the Pull Consumer Start command. The specified partition must be subscribed to. 
     */
    void seekToBeginning(TopicPartition topicPartition);

    /**
     * Reset the consumer offset of a specified partition to the maximum offset of the partition. You can call this method only after you run the Pull Consumer Start command. The specified partition must be subscribed to. 
     */
    void seekToEnd(TopicPartition topicPartition);

    /**
     * Suspend the consumption of messages in a specified partition.
     */
    void pause(Collection<TopicPartition> topicPartitions);

    /**
     * Resume the consumption of messages in a specified partition.
     */
    void resume(Collection<TopicPartition> topicPartitions);

    /**
     * Query the offset corresponding to a timestamp in a specified partition. The timestamp indicates when a message is stored to the broker. The offset indicates when the first message whose timestamp is no less than the timestamp when the message is stored. 
     */
    Long offsetForTimestamp(TopicPartition topicPartition, Long timestamp);

    /**
     * Query the latest consumer offset of a specified partition.
     */
    Long committed(TopicPartition topicPartition);

    /**
     * Manually commit a consumer offset. The consumer offset is synchronized to the local client, and then committed to the broker through an asynchronous thread. 
     */
    void commitSync();
    
    interface TopicPartitionChangeListener {
        /**
         * This method is called when the partitions of a topic change, for example, when the number of partitions of a topic changes due to broker scaling.
         */
        void onChanged(Set<TopicPartition> topicPartitions);
    }
    
    /**
     * Register a partition change listener that listens to changes in partitions of a topic. For example, you can call back the onChanged method of the registered listener when the number of partitions of a topic changes due to broker scaling. A delay of up to about 5 seconds is allowed by default. 
     */
    void registerTopicPartitionChangedListener(String topic, TopicPartitionChangeListener callback);
}

Parameters for subscribing to messages

Table 1. Common parameters
Parameter Description
GROUP_ID The group ID that you created in the Message Queue for Apache RocketMQ console. For more information, see Terms.
MessageModel The consumption mode of a consumer instance. Valid values:
  • CLUSTERING (default): clustering consumption
  • BROADCASTING: broadcasting consumption
ConsumeThreadNums The number of consumption threads for a consumer instance. Default value: 20.
MaxReconsumeTimes The maximum number of retries upon a consumption failure. Default value: 16.
ConsumeTimeout The maximum consumption timeout period for each message. If a message fails to be processed within this period, the consumption fails, and the message needs to be re-sent for consumption. A proper value must be set for each type of message, in minutes. Default value: 15.
suspendTimeMillis (for ordered messages) The retry interval for ordered messages that fail to be consumed.
maxCachedMessageAmount The maximum number of messages cached on the local client. Default value: 1000.
maxCachedMessageSizeInMiB The maximum size of messages cached on the local client. Value range: 16 MB to 2 GB. Default value: 512 MB.
Table 2. Parameters specific to batch consumption in push mode
Parameter Description
ConsumeMessageBatchMaxSize The maximum number of messages to be consumed in a batch. If the number of cached messages reaches the value of the parameter, Message Queue for Apache RocketMQ pushes the messages to consumers at a time. Default value: 32. Valid values: 1 to 1024.
BatchConsumeMaxAwaitDurationInSeconds The wait time for batch consumption. Message Queue for Apache RocketMQ pushes the messages to consumers in a batch after the time specified by this parameter. Default value: 0. Valid values: 0 to 450. Unit: seconds.
Table 3. Parameters specific to the pull mode
Parameter Description
maxCachedMessageSizeInMiB The maximum size of messages that a consumer can cache on the client for a single partition. Default value: 100 MB. Value range: 16 MB to 2048 MB.
Note Set a proper value. An excessively large value may cause the client to run out of memory (OOM).
autoCommit Automatic commit of consumer offset. Default value: true.
autoCommitIntervalMillis The interval for automatic commit of consumer offsets, in seconds. Default value: 5.
pollTimeoutMillis The timeout period of pulling messages, in seconds. Default value: 5.

For more information about partitions and consumer offsets, see Terms.

References

For more information about sample code of sending and subscribing to messages, see the following topics: