Open-source RabbitMQ client SDKs are compatible with ApsaraMQ for RabbitMQ, but certain platform-specific behaviors require attention. This page covers connection management, message production, and message consumption best practices.
Connection management
Enable automatic connection recovery
Enable automatic connection recovery on com.rabbitmq.client.ConnectionFactory. Without it, broker upgrades interrupt message reads and writes, and the client cannot reconnect on its own.
// Enable automatic connection recovery
factory.setAutomaticRecoveryEnabled(true);
// Set the interval between recovery attempts (milliseconds)
factory.setNetworkRecoveryInterval(5000);Use persistent connections
Do not open and close connections for each publish or consume operation. Frequent reconnection wastes network and broker resources and can trigger SYN flood protection.
For more information, see Connections and channels.
Avoid metadata operations in application code
Metadata operations such as queueDeclare and exchangeDeclare are subject to throttling. If triggered, the broker may close the connection.
Create queues and exchanges through the ApsaraMQ for RabbitMQ console instead of declaring them at runtime during message sending. For more information about throttling limits, see Limits.
Message production
Set up publisher confirms
Publisher confirms let the broker acknowledge each message after writing it to disk, so the client can detect and retry failed deliveries. Enabling publisher confirms changes the sending mode from synchronous to asynchronous, which may increase latency because the broker returns a success response only after confirming the message is persisted.
To enable publisher confirms on a channel:
Call
confirmSelect()when creating the channel.// Create a connection and a channel Connection connection = createConnection(hostName, userName, passWord, virtualHost); Channel channel = connection.createChannel(); // Enable publisher confirms channel.confirmSelect();After calling
basicPublish, wait for the broker response and handle the result.// Publish a message (reuse the existing channel) channel.basicPublish(exchangeName, bindingKey, true, props, ("example body").getBytes(StandardCharsets.UTF_8)); // Wait up to 3 seconds for the confirm if (channel.waitForConfirms(3000)) { // Message persisted to disk successfully } else { // Confirm failed -- resend the message }
A message is confirmed only when the client receives publishAck from the broker. If the client fails to send a message or does not receive publishAck, retry the send to prevent message loss.
Handle unroutable messages with the mandatory flag
Setting the mandatory parameter to true causes the broker to invoke the ReturnListener callback when a message cannot be routed to any queue.
// Register a callback for unroutable messages
channel.addReturnListener(returnMessage ->
System.out.println("return msgId=" + returnMessage.getProperties().getMessageId()));
// Publish with mandatory=true (third parameter)
channel.basicPublish(exchangeName, routingKey, true, props,
content.getBytes(StandardCharsets.UTF_8));Assign a message ID to every message
Assign a custom ID to each message so you can query its trace and troubleshoot delivery issues. For more information, see How do I specify a message ID?
Handle publishing errors
Determine the error handling strategy based on the error type returned by basicPublish:
Business exceptions (such as
ExchangeNotExist): Throw the exception immediately.Throttling or channel closure: Close the existing connection, create and initialize a new channel, and retry the send.
Example:
private void doSend(String content) throws Exception {
try {
// Assign a unique message ID
String msgId = UUID.randomUUID().toString();
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(msgId).build();
channel.basicPublish(exchangeName, routingKey, true, props, content.getBytes(StandardCharsets.UTF_8));
// Wait up to 3 seconds for the confirm
if (channel.waitForConfirms(3000)) {
// Message persisted to disk successfully
} else {
// Confirm failed -- resend the message
}
} catch (Exception e) {
String message = e.getMessage();
System.out.println(message);
if (channelClosedByServer(message)) {
// Recover: close the old channel, create and reinitialize a new one, then retry
factory.closeCon(channel);
channel = factory.createChannel();
this.initChannel();
doSend(content);
} else {
// Non-broker error -- propagate the exception
throw e;
}
}
}
private boolean channelClosedByServer(String errorMsg) {
if (errorMsg != null && errorMsg.contains("channel.close")) {
return true;
} else {
return false;
}
}For the complete sample code, see Produce messages.
Message consumption
Prevent consumption skew
Make sure messages are distributed evenly across consumers. Consumption skew causes some consumers to be overloaded while others sit idle. For configuration details, see the "Usage notes" section of Connections and channels.
Use broker-generated consumer tags
Let the broker generate a globally unique consumer tag rather than specifying one manually. If you specify a custom consumer tag, make sure it is unique across all consumers.
Configure QoS (prefetch)
Call basicQos to limit the number of unacknowledged messages the broker pushes to a consumer. When this limit is reached, the broker pauses delivery until the consumer acknowledges outstanding messages.
Scope options:
| Call | Scope |
|---|---|
channel.basicQos(100, true) | All consumers on the channel share a combined limit of 100 unacknowledged messages |
channel.basicQos(100, false) or channel.basicQos(100) | Each consumer on the channel has its own limit of 100 unacknowledged messages |
Platform constraints:
The default QoS value is 100 per consumer, equivalent to
channel.basicQos(100, false).Custom QoS values cannot exceed 100. Values above 100 are ignored and the default is used instead.
basicQoshas no effect in autoACK mode.
Choosing a QoS value:
| Scenario | Recommended QoS | Rationale |
|---|---|---|
| Few consumers, fast processing | Higher value (up to 100) | Keeps consumers busy with a steady flow of messages |
| Many consumers, slow processing | Lower value (for example, 1-10) | Distributes messages evenly so no single consumer is overwhelmed |
| Accumulated messages cause intermittent delivery | Do not increase QoS | Improve consumer throughput instead -- the root cause is slow message processing |
If accumulated unacknowledged messages on the broker reach the QoS limit, the broker stops pushing messages, which may appear as intermittent delivery. Restarting the consumer temporarily restores delivery, but the root fix is to improve consumer processing speed.
ACK timeouts and retry behavior
If a consumer does not acknowledge a message within the configured timeout, the broker redelivers it.
Each message can be retried up to the configured maximum. After all retries are exhausted, the message is discarded or routed to the dead-letter exchange.
The timeout and retry limits depend on instance type and edition:
| Parameter | Serverless (Shared) | Serverless (Dedicated) | Subscription (Enterprise Edition) | Subscription (Platinum Edition) |
|---|---|---|---|---|
| Consumption timeout | Max: 3 hours, Default: 5 min | Max: 12 hours, Default: 30 min | Max: 3 hours, Default: 5 min | Max: 12 hours, Default: 30 min |
| Max delivery attempts | Max: 16, Default: 16 | Max: 16, Default: 16 | Max: 16, Default: 16 | Max: 64, Default: 16 |
Use basicConsume instead of basicGet
basicGet polls for individual messages and has lower throughput and transactions per second (TPS) limits compared to basicConsume. In production environments with high message volumes, use basicConsume with a long-lived consumer for push-based delivery.