All Products
Search
Document Center

ApsaraMQ for RocketMQ:Methods and parameters

Last Updated:Oct 23, 2023

ApsaraMQ for RocketMQ SDK for Java is used to send and subscribe to messages. In ApsaraMQ for RocketMQ, subscribers can obtain messages in push or pull mode. This topic describes the methods and parameters for sending and subscribing to messages.

Background information

ApsaraMQ for RocketMQ supports the following modes to obtain messages:

  • Push: ApsaraMQ for RocketMQ pushes messages to consumers. In push mode, ApsaraMQ for RocketMQ can push multiple messages to consumers at a time. For more information, see Batch consumption.

  • Pull: Consumers pull messages from ApsaraMQ for RocketMQ.

Pull consumers provide more options to receive messages and provide you more control over message pulls than push consumers.

Important

To use pull consumers, make sure that your ApsaraMQ for RocketMQ instance is an Enterprise Platinum Edition instance.

Common parameters

Parameter

Description

NAMESRV_ADDR

The TCP endpoint. You can click Instance Details in the ApsaraMQ for RocketMQ console to obtain the TCP endpoint.

AccessKey

An AccessKey ID is used as the unique identifier for authentication. For information about how to obtain an AccessKey ID, see Create an AccessKey pair.

SecretKey

An AccessKey secret is used as the password for authentication. For information about how to obtain an AccessKey secret, see Create an AccessKey pair.

OnsChannel

The user channel. Default value: ALIYUN. If you are a CloudTmall user, set the value to CLOUD.

Methods for sending messages

messagesendinterface

Parameters for sending messages

Parameter

Description

SendMsgTimeoutMillis

The timeout period of sending messages.

Unit: milliseconds.

CheckImmunityTimeInSeconds

The shortest time for which the system waits before checking the status of a transactional message for the first time. Unit: seconds.

shardingKey

The partition key that is used to determine the partition to which an ordered message is distributed.

Methods for subscribing to messages

Figure 1. Methods in push modeconsumeinterface

Figure 2. Methods in pull modepull_consumer

The following content describes the methods that can be called in pull mode:

public interface PullConsumer extends Admin {

    /**
     * Query the partition information of a topic. This method returns all partitions of the topic. You can call this method only after your pull consumers start to run. 
     */
    Set<TopicPartition> topicPartitions(String topic);

    /**
     * Specify a partition from which you want to pull messages. This method does not implement rebalancing. You must make sure that messages in all partitions can be consumed. If this method is called multiple times, the system replaces the partitions to which subscribers have subscribed instead of increasing the number of partitions to which subscribers subscribe. 
     */
    void assign(Collection<TopicPartition> topicPartitions);

    /**
     * Pull messages. The maxBatchMessageCount parameter specifies the maximum number of messages that can be pulled at a time. You can specify a timeout period in milliseconds. 
     */
    List<Message> poll(long timeout);

    /**
     * Reset the consumer offset of a specified partition to a specified position. The specified position must be between the minimum offset and the maximum offset of the partition. You can call this method only after your pull consumers start to run. A subscriber must subscribe to the specified partition. 
     */
    void seek(TopicPartition topicPartition, long offset);

    /**
     * Reset the consumer offset of a specified partition to the minimum offset of the partition. You can call this method only after your pull consumers start to run. A subscriber must subscribe to the specified partition. 
     */
    void seekToBeginning(TopicPartition topicPartition);

    /**
     * Reset the consumer offset of a specified partition to the maximum offset of the partition. You can call this method only after your pull consumers start to run. A subscriber must subscribe to the specified partition. 
     */
    void seekToEnd(TopicPartition topicPartition);

    /**
     * Suspend message consumption in a specified partition.
     */
    void pause(Collection<TopicPartition> topicPartitions);

    /**
     * Resume message consumption in a specified partition.
     */
    void resume(Collection<TopicPartition> topicPartitions);

    /**
     * Query an offset based on a timestamp in a specified partition. The timestamp indicates when a message is stored to a broker. The offset corresponds to the first timestamp that is greater than or equal to the specified timestamp. 
     */
    Long offsetForTimestamp(TopicPartition topicPartition, Long timestamp);

    /**
     * Query the latest consumer offset of a specified partition.
     */
    Long committed(TopicPartition topicPartition);

    /**
     * Manually commit a consumer offset. The consumer offset is synchronized to your on-premises client and then committed to your broker by using a thread in an asynchronous environment. 
     */
    void commitSync();
    
    interface TopicPartitionChangeListener {
        /**
         * This method is called when the partitions of a topic change, for example, when the number of partitions of a topic changes due to broker scaling. 
         */
        void onChanged(Set<TopicPartition> topicPartitions);
    }
    
    /**
     * Register TopicPartitionChangeListener that listens for changes in the partitions of a topic. The onChanged method can be called back for the registered listener when the number of partitions of a topic changes due to broker scaling. By default, the maximum delay for a callback is five seconds. 
     */
    void registerTopicPartitionChangedListener(String topic, TopicPartitionChangeListener callback);
}

Parameters

Table 1. Common parameters

Parameter

Description

GROUP_ID

The group ID that you have created in the ApsaraMQ for RocketMQ console. For more information, see Terms.

MessageModel

The consumption mode of a consumer instance. Valid values:

  • CLUSTERING: clustering consumption. This value is the default value.

  • BROADCASTING: broadcasting consumption.

ConsumeThreadNums

The number of consumption threads for a consumer instance. Default value: 20.

MaxReconsumeTimes

The maximum number of retries upon a consumption failure. Default value: 16.

ConsumeTimeout

The timeout period for consuming each message. If the time for consuming a message exceeds the specified timeout period, the message fails to be consumed and is redelivered after a retry interval. Configure an appropriate value for each business. Default value: 15. Unit: minutes.

suspendTimeMillis

The retry interval for an ordered message that fails to be consumed.

maxCachedMessageAmount

The maximum number of messages that can be cached at your on-premises consumer client. Valid values: 100 to 50000. Default value: 5000.

This parameter takes effect on a client. The quota is evenly allocated to topics to which subscribers have subscribed. For example, if you set the parameter value to 1000 for a consumer client that subscribes to 2 topics, a maximum of 500 messages can be cached for each topic.

If a consumer client pulls multiple messages at a time, the actual number of messages that need to be cached can be greater than the value that you set for maxCachedMessageAmount.

We recommend that you set the parameter value to twice the number of messages that your client consumer can consume per second.

Important

Specify a proper value. An excessively large value may cause an Out-of-Memory (OOM) error on your client.

maxCachedMessageSizeInMiB

The maximum size of messages that can be cached at your on-premises client. Valid values: 16 MB to 2048 MB. Default value: 512 MB.

Table 2. Parameters that are required when Message Queue for Apache RocketMQ pushes multiple messages to consumers at a time

Parameter

Description

ConsumeMessageBatchMaxSize

The maximum number of cached messages that can be pushed to consumers at a time. If the number of the cached messages reaches the parameter value that you specify, ApsaraMQ for RocketMQ pushes the cached messages to consumers at a time. Default value: 32. Valid values: 1 to 1024.

BatchConsumeMaxAwaitDurationInSeconds

The wait time before multiple cached messages are pushed to consumers at a time. ApsaraMQ for RocketMQ pushes multiple cached messages to consumers at a time after the time specified by this parameter. Default value: 0. Valid values: 0 to 450. Unit: seconds.

Table 3. Parameters specific to the pull mode

Parameter

Description

maxCachedMessageSizeInMiB

The maximum size of messages that a consumer can cache on the client for a single partition. Default value: 100 MiB. Valid values: 16 MiB to 2048 MiB.

Important

Specify a proper value. An excessively large value may cause an OOM error on your client.

autoCommit

Specifies whether to automatically commit a consumer offset. Default value: true.

autoCommitIntervalMillis

The interval between the operations of automatically committing a consumer offset. Default value: 5. Unit: seconds.

pollTimeoutMillis

The timeout period for pulling messages each time. Default value: 5. Unit: seconds.

For more information about partitions and offsets, see Terms.

References

For more information about the sample code for sending and subscribing to messages, see the following topics: