全部產品
Search
文件中心

ApsaraMQ for RabbitMQ:SDK使用注意事項

更新時間:Jun 12, 2025

本文介紹使用開源的用戶端SDK接入雲訊息佇列 RabbitMQ 版服務端時的注意事項。

使用用戶端時需要設定自動重連功能嗎?

設定com.rabbitmq.client.ConnectionFactory介面時,必須開啟串連自動回復功能,保證服務端升級時,用戶端中斷連線可自動重新串連,否則會導致訊息讀寫中斷。

//設定為true,開啟Connection自動回復功能;設定為false,關閉Connection自動回復功能。
factory.setAutomaticRecoveryEnabled(true);
//設定自動回復間隔時間,單位:毫秒。
factory.setNetworkRecoveryInterval(5000);

生產訊息時需要注意什嗎?

  • 在生產或消費過程中,請勿頻繁開啟或關閉串連。請儘可能使用長期存活的Connection,以免每次收發訊息時都需要建立新的Connection,消耗大量的網路資源和服務端資源,甚至引起服務端SYN Flood防護。更多資訊,請參見Connection

  • 生產訊息前根據實際情況選擇是否開啟發送確認介面。開啟發送確認,服務端收到訊息後,會調用本地方法確認訊息收到。請注意,若且唯若用戶端收到服務端的publishAck,才能夠確認訊息發送成功。若訊息發送失敗或沒有收到publishAck,請在用戶端實現訊息發送重試機制,避免訊息丟失。

    說明

    如果開啟發送確認,可能會導致訊息發送延遲的增加,因為發送模式由非同步轉為同步。服務端需要在確保訊息成功落盤後,才會返回傳送成功的響應。

    開啟發布確認的過程由兩個步驟實現,以Java語言為例:

    1. 建立channel時開啟發布確認。

      // 建立connection以及channel
      Connection connection = createConnection(hostName, userName, passWord, virtualHost);
      Channel channel = connection.createChannel();
      // 在channel維度啟用發布確認模式
      channel.confirmSelect();
    2. basicPublish時等待服務端的返回,並執行對應邏輯:

      // 發送訊息,注意:該過程中建議複用已有channel
      channel.basicPublish(exchangeName, bindingKey, true, props,
                      ("example body").getBytes(StandardCharsets.UTF_8));
      
      // 等待confirm返回成功資訊,3秒內確認完成
      if (channel.waitForConfirms(3000)) {
          // 訊息落盤時的邏輯
      } else {
          // 訊息重發邏輯
      }
  • mandatory設定為true時,如果訊息因為路由原因,未到達Queue,用戶端添加的ReturnListener介面將會被調用。

    // channel添加路由失敗回調
    channel.addReturnListener(returnMessage -> System.out.println("return msgId=" + returnMessage.getProperties().getMessageId()));
    // 發送訊息,第三個參數為true
    channel.basicPublish(exchangeName, routingKey, true, props, content.getBytes(StandardCharsets.UTF_8));
  • 發送訊息時,強烈建議自訂msgId,即訊息的唯一標識。可用於訊息查詢、軌跡查詢、以及故障排查時後台資訊定位。具體設定方法請參見如何設定Message ID

  • 訊息發送時,需要根據basicPublish介面返回的錯誤類型決定是否拋出異常。

    • 如果是業務自身問題,例如ExchangeNotExist(Exchange不存在)則需要拋出異常。

    • 如果是發送訊息被限流,建議關閉舊的串連,重新建立並初始化Channel,這樣可以保證業務的連續性。

    部分邏輯程式碼範例如下所示。完整的發送程式碼範例,請參見生產訊息

    private void doSend(String content) throws Exception {
        try {
            // 設定msg id
            String msgId = UUID.randomUUID().toString();
            AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(msgId).build();
    
            channel.basicPublish(exchangeName, routingKey, true, props, content.getBytes(StandardCharsets.UTF_8));
            // 等待confirm返回成功資訊,3秒內確認完成
            if (channel.waitForConfirms(3000)) {
                // 訊息落盤時的邏輯
            } else {
                // 訊息重發邏輯
            }
    
        } catch (Exception e) {
            // 擷取報錯資訊
            String message = e.getMessage();
            System.out.println(message);
    
            // 判斷該通道是否已經關閉
            if (channelClosedByServer(message)) {
                // 關閉已有channel,並重開channel
                factory.closeCon(channel);
                channel = factory.createChannel();
                // 初始化,例如啟用confirm等
                this.initChannel();
                // 重發訊息
                doSend(content);
            } else {
                // 如果不是服務端有問題,則拋出異常,或者定製後續處理邏輯。
                throw e;
            }
        }
    }
    
    private boolean channelClosedByServer(String errorMsg) {
        if (errorMsg != null
            && errorMsg.contains("channel.close")) {
            return true;
        } else {
            return false;
        }
    }

消費訊息時需要注意什嗎?

  • 消費資料時,需要防止消費傾斜。具體做法,請參見Connection和Channel的使用建議

  • 用戶端不建議設定Consumer Tag,使用服務端自動產生的全域唯一Consumer Tag。如果要設定,需要保證Consumer Tag唯一性。

  • 消費資料時,使用basic.basicQos設定服務端允許緩衝未ack訊息的數量,當到達設定值時,服務端將不再推送訊息到用戶端。當用戶端提交ack後,服務端將再次推送等同ack數量的訊息,保持服務端緩衝最大未ack訊息數量小於等於QoS設定值。QoS可以設定在Channel上,也可以設定在單個Consumer上。channel.basicQos(100, true)表示同一Channel上建立的所有Consumer共用100的額度限制。channel.basicQos(100, false)或者channel.basicQos(100)表示不同Consumer之間額度不共用,不同Consumer額度都是100。如果用戶端不設定,預設使用服務端配置,服務端預設針對每個消費者限制100,也即等同於用戶端channel.basicQos(100, false)。自訂設定值不能超過100,否則設定不生效,仍然使用預設值。如果消費能力較弱,建議將QoS值降低。如果服務端堆積訊息量達到設定的QoS,則不會再推送訊息給用戶端。這種情況下用戶端看到的現象是服務端間歇性推送訊息,並且重啟消費者後訊息恢複,建議通過增強消費者的消費能力解決。如果使用autoACK消費模式,basicQos將不生效。

  • 消費者提交的ack如果不在指定時間內,則觸發消費重試。訊息將會被重新投遞,最多重試16次。若重試16次還未成功,則訊息將被丟棄或發送至死信Exchange。消費逾時時間如下:

    執行個體重試策略參數說明

    執行個體類型

    Serverless系列執行個體

    預付費系列執行個體

    共用

    獨享

    企業版

    鉑金版

    預留+彈性 /按累積量

    預留+彈性

    消費逾時時間

    最大值:3小時

    預設值:5分鐘

    最大值:12小時

    預設值:30分鐘

    最大值:3小時

    預設值:5分鐘

    最大值:12小時

    預設值:30分鐘

    最大投遞次數

    最大值:16

    預設值:16

    最大值:16

    預設值:16

    最大值:16

    預設值:16

    最大值:64

    預設值:16

  • basicGet拉取訊息效率較低,能達到的上限TPS沒有basicConsume高。生產環境大規模消費訊息推薦使用basicConsume,而不是basicGet

  • queueDeclareexchangeDeclare等中繼資料介面有限流設定,建議在控制台上建立,不建議在發送資料時調用,否則可能觸發限流導致串連關閉。更多資訊,請參見使用限制