全部產品
Search
文件中心

ApsaraMQ for RabbitMQ:執行個體限流最佳實務

更新時間:Jul 29, 2025

雲訊息佇列 RabbitMQ 版會對單一實例的TPS流量峰值進行限流,本文介紹雲訊息佇列 RabbitMQ 版執行個體的限流規則、限流後的行為以及限流最佳實務等。

限流閾值

執行個體總TPS限流閾值

執行個體系列

Serverless系列執行個體

預付費系列執行個體

規格

共用

獨享

未開啟彈性TPS

開啟彈性TPS

預留+彈性/按累積量

預留+彈性

企業版

鉑金版

專業版

企業版

鉑金版

專業版

限流閾值

最大5萬次/秒

基礎TPS流量峰值規格的2倍

基礎TPS流量峰值規格

基礎TPS流量峰值規格的2倍,最大5萬次/秒

基礎TPS流量峰值規格的2倍,最大5萬次/秒

基礎TPS流量峰值規格的1.5倍

單節點SendMessage TPS限流閾值

服務端會在執行個體維度限制每個後台服務節點上SendMessage的TPS值,限流閾值如下所示:

限制

Serverless系列執行個體

預付費系列執行個體

共用

獨享

企業版

鉑金版

專業版

按累積量

預留+彈性

預留+彈性

限流閾值

2.5萬次/秒

2.5萬次/秒

2.5萬次/秒

2.5萬次/秒

單介面的限流閾值

限制項

限制項介面

Serverless系列執行個體

預付費系列執行個體

共用

獨享

企業版

鉑金版

專業版

預留+彈性/按累積量

預留+彈性

單一實例同步擷取訊息

basicGet

500 TPS

500 TPS

單一實例清Queue

purgeQueue

500 TPS

500 TPS

單一實例建立Exchange

exchangeDeclare

500 TPS

500 TPS

單一實例刪除Exchange

exchangeDelete

500 TPS

500 TPS

單一實例建立Queue

queueDeclare

500 TPS

500 TPS

單一實例刪除Queue

queueDelete

500 TPS

500 TPS

單一實例建立Binding

queueBind

500 TPS

500 TPS

單一實例刪除Binding

queueUnbind

500 TPS

500 TPS

單一實例恢複訊息

basicRecover

500 TPS

500 TPS

單一實例重入Queue訊息

  • basicReject(requeue=true)

  • basicNack(requeue=true)

20 TPS

20 TPS

限流規則

雲訊息佇列 RabbitMQ 版執行個體的TPS流量峰值超過您所購買執行個體的TPS規格上限時,雲訊息佇列 RabbitMQ 版執行個體會被限流。

限流後的行為如下:

  • 雲訊息佇列 RabbitMQ 版服務端會返回錯誤碼資訊。具體請參見錯誤碼資訊

  • 雲訊息佇列 RabbitMQ 版服務端關閉當前請求的Channel,代碼中可以捕獲異常重新開啟Channel,具體請參見錯誤碼處理範例程式碼

錯誤碼資訊

  • 錯誤碼:reply-code=530

  • 錯誤資訊:reply-text=denied for too many requests

Java用戶端錯誤堆棧樣本:

Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>
(reply-code=530, reply-text=denied for too many requests, ReqId:5FB4C999314635F952FCBFF6, ErrorHelp[dstQueue=XXX_test_queue,
srcExchange=Producer.ExchangeName,bindingKey=XXX_test_bk, http://mrw.so/6rNqO8], class-id=50, method-id=20)
    at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:516)
    at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:346)
    at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:182)
    at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:114)
    at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:672)
    at com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:48)
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:599)
    at java.lang.Thread.run(Thread.java:748)

錯誤碼處理範例程式碼

以Java語言為例,代碼如下所示:

private static final int MAX_RETRIES = 5; // 最大重試次數
private static final long WAIT_TIME_MS = 2000; // 每次重試的等待時間(以毫秒為單位)

private void doAnythingWithReopenChannels(Connection connection, Channel channel) {
    try {
        // ......
        // 在當前通道channel下執行的任何操作
        // 例如訊息發送、消費等
        // ......

    } catch (AlreadyClosedException e) {
        String message = e.getMessage();
        if (isChannelClosed(message)) {
            // 如果通道已經關閉,關閉並重新建立通道
            channel = createChannelWithRetry(connection); 
            // 在重連後可以繼續執行其它操作
            // ......
        } else {
            throw e;
        }
    }
}

private Channel createChannelWithRetry(Connection connection) {
    for (int attempt = 1; attempt <= MAX_RETRIES; attempt++) {
        try {
            return connection.createChannel();
        } catch (Exception e) {
            System.err.println("Failed to create channel. Attempt " + attempt + " of " + MAX_RETRIES);
            // 檢查錯誤, 若仍是被限流導致的關閉錯誤,則可以等待後繼續重試
            // 也可移除本部分重試邏輯
            if (attempt < MAX_RETRIES) {
                try {
                    Thread.sleep(WAIT_TIME_MS);
                } catch (InterruptedException ie) {
                    Thread.currentThread().interrupt(); // 還原中斷狀態
                }
            } else {
                throw new RuntimeException("Exceeded maximum retries to create channel", e);
            }
        }
    }
    throw new RuntimeException("This line should never be reached"); // 理論上不會到達這裡
}

private boolean isChannelClosed(String errorMsg) {
    // 判斷是否包含channel.close報錯,該報錯代表通道已關閉。
    // 可能涵蓋530,541等錯誤資訊。
    if (errorMsg != null && errorMsg.contains("channel.close")) {
        System.out.println("[ChannelClosed] Error details: " + errorMsg);
        return true;
    }
    return false;
}

執行個體秒級TPS峰值查詢

通過查詢執行個體實際使用的秒級TPS峰值,您可以瞭解業務的流量波動情況和流量峰值,判斷執行個體規格是否滿足業務需求。

雲訊息佇列 RabbitMQ 版提供以下三種方式查詢執行個體的秒級TPS峰值:

查詢方式

說明

查詢時間層級

查詢資源層級

(推薦)通過CloudMonitor查詢執行個體TPS峰值並設定警示

優勢:

  • 查詢結果最大可顯示14天內的TPS峰值變化,可快速定位異常範圍。

  • 支援將執行個體TPS峰值作為監控指標設定警示。

  • 支援免費使用。

分鐘級TPS峰值

取值為1分鐘周期內,每秒鐘執行個體TPS的最大值。

執行個體層級TPS峰值

(推薦)通過執行個體詳情查詢執行個體TPS峰值

  • 優勢:

    • 支援查詢秒級TPS峰值,可精確異常範圍。

    • 支援查看具體API介面的TPS峰值。

    • 支援免費使用。

  • 不足:為避免顯示結果過多,只顯示10分鐘內的查詢結果。

秒級TPS峰值

  • 執行個體層級TPS峰值

  • 執行個體內某個API介面的TPS峰值

通過日誌查詢執行個體TPS峰值

  • 優勢:支援通過SLS分析語句查詢,適合複雜問題定位情境。

  • 不足:

    • 相較於前兩種查詢方式,操作較複雜,查詢結果不夠直觀。

    • 需要額外支付Log Service相關費用,具體計費資訊,請參見Log Service計費項目

秒級TPS峰值

執行個體層級TPS峰值

TPS被限流後怎麼處理?

如果出現因TPS峰值設定不合理導致執行個體、Connection被限流,從而影響到您的業務,建議您按照以下解決辦法處理。

單一實例總TPS被限流的解決辦法

  • 如果在測試或者流量峰值不確定、流量較少的短期情境,建議您為預付費系列執行個體開啟彈性TPS能力或者使用Serverless系列執行個體。更多資訊,請參見為執行個體開啟彈性TPS功能

  • 如果在長期穩定、流量較高的業務運行情境,建議您升級TPS流量峰值規格。更多資訊,請參見升級執行個體配置

單節點的TPS被限流的解決辦法

  • 雲訊息佇列 RabbitMQ 版採用分布式叢集架構。建議為每個隊列建立多個串連(至少10個),以便用戶端能夠更均衡地串連到叢集中的多個服務節點。這種方法可以有效避免出現負載熱點問題,從而提高訊息的發送和消費效率。

  • Spring使用者推薦使用CachingConnectionFactory的CONNECTION模式,詳情請參見Spring整合