All Products
Search
Document Center

ApsaraMQ for RabbitMQ:Usage notes for open-source RabbitMQ clients

Last Updated:Mar 11, 2026

Open-source RabbitMQ client SDKs are compatible with ApsaraMQ for RabbitMQ, but certain platform-specific behaviors require attention. This page covers connection management, message production, and message consumption best practices.

Connection management

Enable automatic connection recovery

Enable automatic connection recovery on com.rabbitmq.client.ConnectionFactory. Without it, broker upgrades interrupt message reads and writes, and the client cannot reconnect on its own.

// Enable automatic connection recovery
factory.setAutomaticRecoveryEnabled(true);
// Set the interval between recovery attempts (milliseconds)
factory.setNetworkRecoveryInterval(5000);

Use persistent connections

Do not open and close connections for each publish or consume operation. Frequent reconnection wastes network and broker resources and can trigger SYN flood protection.

For more information, see Connections and channels.

Avoid metadata operations in application code

Metadata operations such as queueDeclare and exchangeDeclare are subject to throttling. If triggered, the broker may close the connection.

Create queues and exchanges through the ApsaraMQ for RabbitMQ console instead of declaring them at runtime during message sending. For more information about throttling limits, see Limits.

Message production

Set up publisher confirms

Publisher confirms let the broker acknowledge each message after writing it to disk, so the client can detect and retry failed deliveries. Enabling publisher confirms changes the sending mode from synchronous to asynchronous, which may increase latency because the broker returns a success response only after confirming the message is persisted.

To enable publisher confirms on a channel:

  1. Call confirmSelect() when creating the channel.

    // Create a connection and a channel
    Connection connection = createConnection(hostName, userName, passWord, virtualHost);
    Channel channel = connection.createChannel();
    // Enable publisher confirms
    channel.confirmSelect();
  2. After calling basicPublish, wait for the broker response and handle the result.

    // Publish a message (reuse the existing channel)
    channel.basicPublish(exchangeName, bindingKey, true, props,
                    ("example body").getBytes(StandardCharsets.UTF_8));
    
    // Wait up to 3 seconds for the confirm
    if (channel.waitForConfirms(3000)) {
        // Message persisted to disk successfully
    } else {
        // Confirm failed -- resend the message
    }
Note

A message is confirmed only when the client receives publishAck from the broker. If the client fails to send a message or does not receive publishAck, retry the send to prevent message loss.

Handle unroutable messages with the mandatory flag

Setting the mandatory parameter to true causes the broker to invoke the ReturnListener callback when a message cannot be routed to any queue.

// Register a callback for unroutable messages
channel.addReturnListener(returnMessage ->
    System.out.println("return msgId=" + returnMessage.getProperties().getMessageId()));
// Publish with mandatory=true (third parameter)
channel.basicPublish(exchangeName, routingKey, true, props,
    content.getBytes(StandardCharsets.UTF_8));

Assign a message ID to every message

Assign a custom ID to each message so you can query its trace and troubleshoot delivery issues. For more information, see How do I specify a message ID?

Handle publishing errors

Determine the error handling strategy based on the error type returned by basicPublish:

  • Business exceptions (such as ExchangeNotExist): Throw the exception immediately.

  • Throttling or channel closure: Close the existing connection, create and initialize a new channel, and retry the send.

Example:

private void doSend(String content) throws Exception {
    try {
        // Assign a unique message ID
        String msgId = UUID.randomUUID().toString();
        AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(msgId).build();

        channel.basicPublish(exchangeName, routingKey, true, props, content.getBytes(StandardCharsets.UTF_8));
        // Wait up to 3 seconds for the confirm
        if (channel.waitForConfirms(3000)) {
            // Message persisted to disk successfully
        } else {
            // Confirm failed -- resend the message
        }

    } catch (Exception e) {
        String message = e.getMessage();
        System.out.println(message);

        if (channelClosedByServer(message)) {
            // Recover: close the old channel, create and reinitialize a new one, then retry
            factory.closeCon(channel);
            channel = factory.createChannel();
            this.initChannel();
            doSend(content);
        } else {
            // Non-broker error -- propagate the exception
            throw e;
        }
    }
}

private boolean channelClosedByServer(String errorMsg) {
    if (errorMsg != null && errorMsg.contains("channel.close")) {
        return true;
    } else {
        return false;
    }
}

For the complete sample code, see Produce messages.

Message consumption

Prevent consumption skew

Make sure messages are distributed evenly across consumers. Consumption skew causes some consumers to be overloaded while others sit idle. For configuration details, see the "Usage notes" section of Connections and channels.

Use broker-generated consumer tags

Let the broker generate a globally unique consumer tag rather than specifying one manually. If you specify a custom consumer tag, make sure it is unique across all consumers.

Configure QoS (prefetch)

Call basicQos to limit the number of unacknowledged messages the broker pushes to a consumer. When this limit is reached, the broker pauses delivery until the consumer acknowledges outstanding messages.

Scope options:

CallScope
channel.basicQos(100, true)All consumers on the channel share a combined limit of 100 unacknowledged messages
channel.basicQos(100, false) or channel.basicQos(100)Each consumer on the channel has its own limit of 100 unacknowledged messages

Platform constraints:

  • The default QoS value is 100 per consumer, equivalent to channel.basicQos(100, false).

  • Custom QoS values cannot exceed 100. Values above 100 are ignored and the default is used instead.

  • basicQos has no effect in autoACK mode.

Choosing a QoS value:

ScenarioRecommended QoSRationale
Few consumers, fast processingHigher value (up to 100)Keeps consumers busy with a steady flow of messages
Many consumers, slow processingLower value (for example, 1-10)Distributes messages evenly so no single consumer is overwhelmed
Accumulated messages cause intermittent deliveryDo not increase QoSImprove consumer throughput instead -- the root cause is slow message processing
Note

If accumulated unacknowledged messages on the broker reach the QoS limit, the broker stops pushing messages, which may appear as intermittent delivery. Restarting the consumer temporarily restores delivery, but the root fix is to improve consumer processing speed.

ACK timeouts and retry behavior

If a consumer does not acknowledge a message within the configured timeout, the broker redelivers it.

Each message can be retried up to the configured maximum. After all retries are exhausted, the message is discarded or routed to the dead-letter exchange.

The timeout and retry limits depend on instance type and edition:

ParameterServerless (Shared)Serverless (Dedicated)Subscription (Enterprise Edition)Subscription (Platinum Edition)
Consumption timeoutMax: 3 hours, Default: 5 minMax: 12 hours, Default: 30 minMax: 3 hours, Default: 5 minMax: 12 hours, Default: 30 min
Max delivery attemptsMax: 16, Default: 16Max: 16, Default: 16Max: 16, Default: 16Max: 64, Default: 16

Use basicConsume instead of basicGet

basicGet polls for individual messages and has lower throughput and transactions per second (TPS) limits compared to basicConsume. In production environments with high message volumes, use basicConsume with a long-lived consumer for push-based delivery.

What's next