Message Queue for Apache RocketMQ SDK for Java is used to send and subscribe to messages. In Message Queue for Apache RocketMQ, subscribers can obtain messages in push or pull mode. This topic describes the methods and parameters for sending and subscribing to messages.

Background information

Message Queue for Apache RocketMQ supports the following modes to obtain messages:

  • Push: Message Queue for Apache RocketMQ pushes messages to consumers. In push mode, Message Queue for Apache RocketMQ can push multiple messages to consumers at a time. For more information, see Batch consumption.
  • Pull: Consumers pull messages from Message Queue for Apache RocketMQ.

Pull consumers provide more options to receive messages and provide you more control over message pulls than push consumers.

Notice To use pull consumers, make sure that your Message Queue for Apache RocketMQ instance is an Enterprise Platinum Edition instance.

Common parameters

Parameter Description
NAMESRV_ADDR The TCP endpoint. You can click Instance Details in the Message Queue for Apache RocketMQ console to obtain the TCP endpoint.
AccessKey The AccessKey ID that you have created in the Resource Access Management (RAM) console. The AccessKey ID is used to verify your identity.
SecretKey The AccessKey secret that you have created in the RAM console. The AccessKey secret is used to verify your identity.
OnsChannel The user channel. Default value: ALIYUN. If you are a CloudTmall user, set the value to CLOUD.

Methods for sending messages

messagesendinterface

Parameters for sending messages

Parameter Description
SendMsgTimeoutMillis The timeout period of sending messages. Default value: 3000. Unit: milliseconds.
CheckImmunityTimeInSeconds The shortest time for which the system waits before checking the status of a transactional message for the first time. Unit: seconds.
shardingKey The partition key that is used to determine the partition to which an ordered message is distributed.

Methods for subscribing to messages

Figure 1. Methods in push mode
consumeinterface
Figure 2. Methods in pull mode
pull_consumer

The following content describes the methods that can be called in pull mode:

public interface PullConsumer extends Admin {

    /**
     * Query the partition information of a topic. This method returns all partitions of the topic. You can call this method only after your pull consumers start to run. 
     */
    Set<TopicPartition> topicPartitions(String topic);

    /**
     * Specify a partition from which you want to pull messages. This method does not implement rebalancing. You must make sure that messages in all partitions can be consumed. If this method is called multiple times, the system replaces the partitions to which subscribers have subscribed instead of increasing the number of partitions to which subscribers subscribe. 
     */
    void assign(Collection<TopicPartition> topicPartitions);

    /**
     * Pull messages. The maxBatchMessageCount parameter specifies the maximum number of messages that can be pulled at a time. You can specify a timeout period in milliseconds. 
     */
    List<Message> poll(long timeout);

    /**
     * Reset the consumer offset of a specified partition to a specified position. The specified position must be between the minimum offset and the maximum offset of the partition. You can call this method only after your pull consumers start to run. A subscriber must subscribe to the specified partition. 
     */
    void seek(TopicPartition topicPartition, long offset);

    /**
     * Reset the consumer offset of a specified partition to the minimum offset of the partition. You can call this method only after your pull consumers start to run. A subscriber must subscribe to the specified partition. 
     */
    void seekToBeginning(TopicPartition topicPartition);

    /**
     * Reset the consumer offset of a specified partition to the maximum offset of the partition. You can call this method only after your pull consumers start to run. A subscriber must subscribe to the specified partition. 
     */
    void seekToEnd(TopicPartition topicPartition);

    /**
     * Suspend message consumption in a specified partition.
     */
    void pause(Collection<TopicPartition> topicPartitions);

    /**
     * Resume message consumption in a specified partition.
     */
    void resume(Collection<TopicPartition> topicPartitions);

    /**
     * Query an offset based on a timestamp in a specified partition. The timestamp indicates when a message is stored to a broker. The offset corresponds to the first timestamp that is greater than or equal to the specified timestamp. 
     */
    Long offsetForTimestamp(TopicPartition topicPartition, Long timestamp);

    /**
     * Query the latest consumer offset of a specified partition.
     */
    Long committed(TopicPartition topicPartition);

    /**
     * Manually commit a consumer offset. The consumer offset is synchronized to your on-premises client and then committed to your broker by using a thread in an asynchronous environment. 
     */
    void commitSync();
    
    interface TopicPartitionChangeListener {
        /**
         * This method is called when the partitions of a topic change, for example, when the number of partitions of a topic changes due to broker scaling. 
         */
        void onChanged(Set<TopicPartition> topicPartitions);
    }
    
    /**
     * Register TopicPartitionChangeListener that listens for changes in the partitions of a topic. The onChanged method can be called back for the registered listener when the number of partitions of a topic changes due to broker scaling. By default, the maximum delay for a callback is five seconds. 
     */
    void registerTopicPartitionChangedListener(String topic, TopicPartitionChangeListener callback);
}

Parameters

Table 1. Common parameters
Parameter Description
GROUP_ID The group ID that you have created in the Message Queue for Apache RocketMQ console. For more information, see Terms.
MessageModel The consumption mode of a consumer instance. Valid values:
  • CLUSTERING: clustering consumption. This value is the default value.
  • BROADCASTING: broadcasting consumption.
ConsumeThreadNums The number of consumption threads for a consumer instance. Default value: 20.
MaxReconsumeTimes The maximum number of retries upon a consumption failure. Default value: 16.
ConsumeTimeout The timeout period for consuming each message. If the time for consuming a message exceeds the specified timeout period, the message fails to be consumed and is redelivered after a retry interval. Configure an appropriate value for each business. Default value: 15. Unit: minutes.
suspendTimeMillis The retry interval for an ordered message that fails to be consumed.
maxCachedMessageAmount The maximum number of messages that can be cached at your on-premises consumer client. Valid values: 100 to 50000. Default value: 5000.

This parameter takes effect on a client. The quota is evenly allocated to topics to which subscribers have subscribed. For example, if you set the parameter value to 1000 for a consumer client that subscribes to 2 topics, a maximum of 500 messages can be cached for each topic.

If a consumer client pulls multiple messages at a time, the actual number of messages that need to be cached can be greater than the value that you set for maxCachedMessageAmount.

We recommend that you set the parameter value to twice the number of messages that your client consumer can consume per second.

Notice Specify a proper value. An excessively large value may cause an Out-of-Memory (OOM) error on your client.
maxCachedMessageSizeInMiB The maximum size of messages that can be cached at your on-premises client. Valid values: 16 MB to 2048 MB. Default value: 512 MB.
Table 2. Parameters that are required when Message Queue for Apache RocketMQ pushes multiple messages to consumers at a time
Parameter Description
ConsumeMessageBatchMaxSize The maximum number of cached messages that can be pushed to consumers at a time. If the number of the cached messages reaches the parameter value that you specify, Message Queue for Apache RocketMQ pushes the cached messages to consumers at a time. Default value: 32. Valid values: 1 to 1024.
BatchConsumeMaxAwaitDurationInSeconds The wait time before multiple cached messages are pushed to consumers at a time. Message Queue for Apache RocketMQ pushes multiple cached messages to consumers at a time after the time specified by this parameter. Default value: 0. Valid values: 0 to 450. Unit: seconds.
Table 3. Parameters specific to the pull mode
Parameter Description
maxCachedMessageSizeInMiB The maximum size of messages that a consumer can cache on the client for a single partition. Default value: 100 MiB. Valid values: 16 MiB to 2048 MiB.
Notice Specify a proper value. An excessively large value may cause an OOM error on your client.
autoCommit Specifies whether to automatically commit a consumer offset. Default value: true.
autoCommitIntervalMillis The interval between the operations of automatically committing a consumer offset. Default value: 5. Unit: seconds.
pollTimeoutMillis The timeout period for pulling messages each time. Default value: 5. Unit: seconds.

For more information about partitions and offsets, see Terms.

References

For more information about the sample code for sending and subscribing to messages, see the following topics: