Message Queue for Apache RocketMQ SDK for Java is used to send and subscribe to messages. In Message Queue for Apache RocketMQ, subscribers can obtain messages in push or pull mode. This topic describes the methods and parameters for sending and subscribing to messages.
Background information
Message Queue for Apache RocketMQ supports the following modes to obtain messages:
- Push: Message Queue for Apache RocketMQ pushes messages to consumers. In push mode, Message Queue for Apache RocketMQ can push multiple messages to consumers at a time. For more information, see Batch consumption.
- Pull: Consumers pull messages from Message Queue for Apache RocketMQ.
Pull consumers provide more options to receive messages and provide you more control over message pulls than push consumers.
Common parameters
Parameter | Description |
---|---|
NAMESRV_ADDR | The TCP endpoint. You can click Instance Details in the Message Queue for Apache RocketMQ console to obtain the TCP endpoint. |
AccessKey | The AccessKey ID that you have created in the Resource Access Management (RAM) console. The AccessKey ID is used to verify your identity. |
SecretKey | The AccessKey secret that you have created in the RAM console. The AccessKey secret is used to verify your identity. |
OnsChannel | The user channel. Default value: ALIYUN. If you are a CloudTmall user, set the value to CLOUD. |
Methods for sending messages

Parameters for sending messages
Parameter | Description |
---|---|
SendMsgTimeoutMillis | The timeout period of sending messages. Default value: 3000. Unit: milliseconds. |
CheckImmunityTimeInSeconds | The shortest time for which the system waits before checking the status of a transactional message for the first time. Unit: seconds. |
shardingKey | The partition key that is used to determine the partition to which an ordered message is distributed. |
Methods for subscribing to messages


The following content describes the methods that can be called in pull mode:
public interface PullConsumer extends Admin { /** * Query the partition information of a topic. This method returns all partitions of the topic. You can call this method only after your pull consumers start to run. */ Set<TopicPartition> topicPartitions(String topic); /** * Specify a partition from which you want to pull messages. This method does not implement rebalancing. You must make sure that messages in all partitions can be consumed. If this method is called multiple times, the system replaces the partitions to which subscribers have subscribed instead of increasing the number of partitions to which subscribers subscribe. */ void assign(Collection<TopicPartition> topicPartitions); /** * Pull messages. The maxBatchMessageCount parameter specifies the maximum number of messages that can be pulled at a time. You can specify a timeout period in milliseconds. */ List<Message> poll(long timeout); /** * Reset the consumer offset of a specified partition to a specified position. The specified position must be between the minimum offset and the maximum offset of the partition. You can call this method only after your pull consumers start to run. A subscriber must subscribe to the specified partition. */ void seek(TopicPartition topicPartition, long offset); /** * Reset the consumer offset of a specified partition to the minimum offset of the partition. You can call this method only after your pull consumers start to run. A subscriber must subscribe to the specified partition. */ void seekToBeginning(TopicPartition topicPartition); /** * Reset the consumer offset of a specified partition to the maximum offset of the partition. You can call this method only after your pull consumers start to run. A subscriber must subscribe to the specified partition. */ void seekToEnd(TopicPartition topicPartition); /** * Suspend message consumption in a specified partition. */ void pause(Collection<TopicPartition> topicPartitions); /** * Resume message consumption in a specified partition. */ void resume(Collection<TopicPartition> topicPartitions); /** * Query an offset based on a timestamp in a specified partition. The timestamp indicates when a message is stored to a broker. The offset corresponds to the first timestamp that is greater than or equal to the specified timestamp. */ Long offsetForTimestamp(TopicPartition topicPartition, Long timestamp); /** * Query the latest consumer offset of a specified partition. */ Long committed(TopicPartition topicPartition); /** * Manually commit a consumer offset. The consumer offset is synchronized to your on-premises client and then committed to your broker by using a thread in an asynchronous environment. */ void commitSync(); interface TopicPartitionChangeListener { /** * This method is called when the partitions of a topic change, for example, when the number of partitions of a topic changes due to broker scaling. */ void onChanged(Set<TopicPartition> topicPartitions); } /** * Register TopicPartitionChangeListener that listens for changes in the partitions of a topic. The onChanged method can be called back for the registered listener when the number of partitions of a topic changes due to broker scaling. By default, the maximum delay for a callback is five seconds. */ void registerTopicPartitionChangedListener(String topic, TopicPartitionChangeListener callback); }
Parameters
Parameter | Description |
---|---|
GROUP_ID | The group ID that you have created in the Message Queue for Apache RocketMQ console. For more information, see Terms. |
MessageModel | The consumption mode of a consumer instance. Valid values:
|
ConsumeThreadNums | The number of consumption threads for a consumer instance. Default value: 20. |
MaxReconsumeTimes | The maximum number of retries upon a consumption failure. Default value: 16. |
ConsumeTimeout | The timeout period for consuming each message. If the time for consuming a message exceeds the specified timeout period, the message fails to be consumed and is redelivered after a retry interval. Configure an appropriate value for each business. Default value: 15. Unit: minutes. |
suspendTimeMillis | The retry interval for an ordered message that fails to be consumed. |
maxCachedMessageAmount | The maximum number of messages that can be cached at your on-premises consumer client.
Valid values: 100 to 50000. Default value: 5000.
This parameter takes effect on a client. The quota is evenly allocated to topics to which subscribers have subscribed. For example, if you set the parameter value to 1000 for a consumer client that subscribes to 2 topics, a maximum of 500 messages can be cached for each topic. If a consumer client pulls multiple messages at a time, the actual number of messages that need to be cached can be greater than the value that you set for maxCachedMessageAmount. We recommend that you set the parameter value to twice the number of messages that your client consumer can consume per second. Notice Specify a proper value. An excessively large value may cause an Out-of-Memory (OOM)
error on your client.
|
maxCachedMessageSizeInMiB | The maximum size of messages that can be cached at your on-premises client. Valid values: 16 MB to 2048 MB. Default value: 512 MB. |
Parameter | Description |
---|---|
ConsumeMessageBatchMaxSize | The maximum number of cached messages that can be pushed to consumers at a time. If the number of the cached messages reaches the parameter value that you specify, Message Queue for Apache RocketMQ pushes the cached messages to consumers at a time. Default value: 32. Valid values: 1 to 1024. |
BatchConsumeMaxAwaitDurationInSeconds | The wait time before multiple cached messages are pushed to consumers at a time. Message Queue for Apache RocketMQ pushes multiple cached messages to consumers at a time after the time specified by this parameter. Default value: 0. Valid values: 0 to 450. Unit: seconds. |
Parameter | Description |
---|---|
maxCachedMessageSizeInMiB | The maximum size of messages that a consumer can cache on the client for a single
partition. Default value: 100 MiB. Valid values: 16 MiB to 2048 MiB.
Notice Specify a proper value. An excessively large value may cause an OOM error on your
client.
|
autoCommit | Specifies whether to automatically commit a consumer offset. Default value: true. |
autoCommitIntervalMillis | The interval between the operations of automatically committing a consumer offset. Default value: 5. Unit: seconds. |
pollTimeoutMillis | The timeout period for pulling messages each time. Default value: 5. Unit: seconds. |
For more information about partitions and offsets, see Terms.
References
For more information about the sample code for sending and subscribing to messages, see the following topics: