Configure these parameters when you use the Community Edition of SDK for Java to connect to ApsaraMQ for RocketMQ. Parameters are organized into three groups: connection, producer, and consumer.
Connection parameters
Set these parameters for every producer and consumer client.
| Parameter | Description | Default |
|---|
NAMESRV_ADDR | TCP endpoint of your ApsaraMQ for RocketMQ instance. Get this from the Instance Details page in the ApsaraMQ for RocketMQ console. | None (required) |
AccessKey | AccessKey ID for authentication. See Create an AccessKey pair. | None (required) |
SecretKey | AccessKey secret for authentication. See Create an AccessKey pair. | None (required) |
AccessChannel | Set to CLOUD to enable cloud message tracing. | None |
All clients require NAMESRV_ADDR, AccessKey, and SecretKey. For the complete setup procedure, see Prepare the environment.
Producer parameters
| Parameter | Description | Default | Valid values |
|---|
producerGroup | Producer group ID. Producers in the same application that send the same messages belong to one group. Get the group ID from the ApsaraMQ for RocketMQ console. See Terms. | None (required) | -- |
sendMsgTimeout | Timeout for sending a message, in milliseconds. | -- | -- |
compressMsgBodyOverHowmuch | Message body size threshold for automatic compression, in KB. Messages larger than this value are compressed before sending and decompressed on the consumer side. | 4 KB | -- |
retryTimesWhenSendFailed | Maximum retry count for failed synchronous sends. Does not apply to asynchronous sends. | -- | -- |
maxMessageSize | Maximum message size allowed by the client, in MB. Messages exceeding this limit trigger an error. The broker also enforces its own size limit, so configure both the client and broker parameters together. | 4 MB | -- |
Example: configure a producer
DefaultMQProducer producer = new DefaultMQProducer("<your-producer-group-id>");
producer.setNamesrvAddr("<your-tcp-endpoint>");
// Timeout and retry
producer.setSendMsgTimeout(3000); // 3 seconds
producer.setRetryTimesWhenSendFailed(3); // Retry up to 3 times (synchronous sends only)
// Compression and size limits
producer.setCompressMsgBodyOverHowmuch(4096); // Compress messages over 4 KB
producer.setMaxMessageSize(4 * 1024 * 1024); // 4 MB max message size
producer.start();
retryTimesWhenSendFailed applies only to synchronous sends.
Consumer parameters
Offset and thread pool
| Parameter | Description | Default | Valid values |
|---|
consumerGroup | Consumer group ID. Consumers in the same application that subscribe to the same messages and use the same consumption logic belong to one group. Get the group ID from the ApsaraMQ for RocketMQ console. See Terms. | None (required) | -- |
consumeFromWhere | Starting offset when a new consumer group launches. | Latest offset | -- |
consumeThreadMin | Minimum number of threads in the consumer thread pool. | 20 | -- |
consumeThreadMax | Maximum number of threads in the consumer thread pool. Must equal consumeThreadMin. | 20 | -- |
Flow control and retry
| Parameter | Description | Default | Valid values |
|---|
consumeConcurrentlyMaxSpan | Maximum offset span for concurrent consumption in a single queue. | 2000 | 1–65535 |
pullThresholdForQueue | Maximum number of messages cached locally per queue. | 1000 | 1–65535 |
pullThresholdSizeForQueue | Maximum total size of messages cached locally per queue, in MB. | 100 MB | 1–1024 |
maxReconsumeTimes | Maximum number of retry attempts for a failed message. | 16 | -- |
suspendCurrentQueueTimeMillis | Minimum interval between consecutive retries of ordered messages, in milliseconds. | 1000 ms | 10–30000 |
Example: configure a push consumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("<your-consumer-group-id>");
consumer.setNamesrvAddr("<your-tcp-endpoint>");
// Start consuming from the latest offset
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
// Thread pool: min and max must match
consumer.setConsumeThreadMin(20);
consumer.setConsumeThreadMax(20);
// Flow control
consumer.setConsumeConcurrentlyMaxSpan(2000);
consumer.setPullThresholdForQueue(1000);
consumer.setPullThresholdSizeForQueue(100); // 100 MB per queue
// Retry
consumer.setMaxReconsumeTimes(16);
consumer.subscribe("<your-topic>", "*");
consumer.registerMessageListener((msgs, context) -> {
// Process messages
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
consumeThreadMin and consumeThreadMax must be set to the same value.