本文介紹使用開源的用戶端SDK接入雲訊息佇列 RabbitMQ 版服務端時的注意事項。
使用用戶端時需要設定自動重連功能嗎?
設定com.rabbitmq.client.ConnectionFactory介面時,必須開啟串連自動回復功能,保證服務端升級時,用戶端中斷連線可自動重新串連,否則會導致訊息讀寫中斷。
//設定為true,開啟Connection自動回復功能;設定為false,關閉Connection自動回復功能。
factory.setAutomaticRecoveryEnabled(true);
//設定自動回復間隔時間,單位:毫秒。
factory.setNetworkRecoveryInterval(5000);生產訊息時需要注意什嗎?
在生產或消費過程中,請勿頻繁開啟或關閉串連。請儘可能使用長期存活的Connection,以免每次收發訊息時都需要建立新的Connection,消耗大量的網路資源和服務端資源,甚至引起服務端SYN Flood防護。更多資訊,請參見Connection。
生產訊息前根據實際情況選擇是否開啟發送確認介面。開啟發送確認,服務端收到訊息後,會調用本地方法確認訊息收到。請注意,若且唯若用戶端收到服務端的publishAck,才能夠確認訊息發送成功。若訊息發送失敗或沒有收到publishAck,請在用戶端實現訊息發送重試機制,避免訊息丟失。
說明如果開啟發送確認,可能會導致訊息發送延遲的增加,因為發送模式由非同步轉為同步。服務端需要在確保訊息成功落盤後,才會返回傳送成功的響應。
開啟發布確認的過程由兩個步驟實現,以Java語言為例:
建立channel時開啟發布確認。
// 建立connection以及channel Connection connection = createConnection(hostName, userName, passWord, virtualHost); Channel channel = connection.createChannel(); // 在channel維度啟用發布確認模式 channel.confirmSelect();在
basicPublish時等待服務端的返回,並執行對應邏輯:// 發送訊息,注意:該過程中建議複用已有channel channel.basicPublish(exchangeName, bindingKey, true, props, ("example body").getBytes(StandardCharsets.UTF_8)); // 等待confirm返回成功資訊,3秒內確認完成 if (channel.waitForConfirms(3000)) { // 訊息落盤時的邏輯 } else { // 訊息重發邏輯 }
mandatory設定為true時,如果訊息因為路由原因,未到達Queue,用戶端添加的ReturnListener介面將會被調用。// channel添加路由失敗回調 channel.addReturnListener(returnMessage -> System.out.println("return msgId=" + returnMessage.getProperties().getMessageId())); // 發送訊息,第三個參數為true channel.basicPublish(exchangeName, routingKey, true, props, content.getBytes(StandardCharsets.UTF_8));發送訊息時,強烈建議自訂msgId,即訊息的唯一標識。可用於訊息查詢、軌跡查詢、以及故障排查時後台資訊定位。具體設定方法請參見如何設定Message ID。
訊息發送時,需要根據
basicPublish介面返回的錯誤類型決定是否拋出異常。如果是業務自身問題,例如
ExchangeNotExist(Exchange不存在)則需要拋出異常。如果是發送訊息被限流,建議關閉舊的串連,重新建立並初始化Channel,這樣可以保證業務的連續性。
部分邏輯程式碼範例如下所示。完整的發送程式碼範例,請參見生產訊息。
private void doSend(String content) throws Exception { try { // 設定msg id String msgId = UUID.randomUUID().toString(); AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(msgId).build(); channel.basicPublish(exchangeName, routingKey, true, props, content.getBytes(StandardCharsets.UTF_8)); // 等待confirm返回成功資訊,3秒內確認完成 if (channel.waitForConfirms(3000)) { // 訊息落盤時的邏輯 } else { // 訊息重發邏輯 } } catch (Exception e) { // 擷取報錯資訊 String message = e.getMessage(); System.out.println(message); // 判斷該通道是否已經關閉 if (channelClosedByServer(message)) { // 關閉已有channel,並重開channel factory.closeCon(channel); channel = factory.createChannel(); // 初始化,例如啟用confirm等 this.initChannel(); // 重發訊息 doSend(content); } else { // 如果不是服務端有問題,則拋出異常,或者定製後續處理邏輯。 throw e; } } } private boolean channelClosedByServer(String errorMsg) { if (errorMsg != null && errorMsg.contains("channel.close")) { return true; } else { return false; } }
消費訊息時需要注意什嗎?
消費資料時,需要防止消費傾斜。具體做法,請參見Connection和Channel的使用建議。
用戶端不建議設定Consumer Tag,使用服務端自動產生的全域唯一Consumer Tag。如果要設定,需要保證Consumer Tag唯一性。
消費資料時,使用
basic.basicQos設定服務端允許緩衝未ack訊息的數量,當到達設定值時,服務端將不再推送訊息到用戶端。當用戶端提交ack後,服務端將再次推送等同ack數量的訊息,保持服務端緩衝最大未ack訊息數量小於等於QoS設定值。QoS可以設定在Channel上,也可以設定在單個Consumer上。channel.basicQos(100, true)表示同一Channel上建立的所有Consumer共用100的額度限制。channel.basicQos(100, false)或者channel.basicQos(100)表示不同Consumer之間額度不共用,不同Consumer額度都是100。如果用戶端不設定,預設使用服務端配置,服務端預設針對每個消費者限制100,也即等同於用戶端channel.basicQos(100, false)。自訂設定值不能超過100,否則設定不生效,仍然使用預設值。如果消費能力較弱,建議將QoS值降低。如果服務端堆積訊息量達到設定的QoS,則不會再推送訊息給用戶端。這種情況下用戶端看到的現象是服務端間歇性推送訊息,並且重啟消費者後訊息恢複,建議通過增強消費者的消費能力解決。如果使用autoACK消費模式,basicQos將不生效。消費者提交的ack如果不在指定時間內,則觸發消費重試。訊息將會被重新投遞,最多重試16次。若重試16次還未成功,則訊息將被丟棄或發送至死信Exchange。消費逾時時間如下:
執行個體重試策略參數說明
執行個體類型
Serverless系列執行個體
預付費系列執行個體
共用
獨享
企業版
鉑金版
預留+彈性 /按累積量
預留+彈性
消費逾時時間
最大值:3小時
預設值:5分鐘
最大值:12小時
預設值:30分鐘
最大值:3小時
預設值:5分鐘
最大值:12小時
預設值:30分鐘
最大投遞次數
最大值:16
預設值:16
最大值:16
預設值:16
最大值:16
預設值:16
最大值:64
預設值:16
basicGet拉取訊息效率較低,能達到的上限TPS沒有basicConsume高。生產環境大規模消費訊息推薦使用basicConsume,而不是basicGet。queueDeclare和exchangeDeclare等中繼資料介面有限流設定,建議在控制台上建立,不建議在發送資料時調用,否則可能觸發限流導致串連關閉。更多資訊,請參見使用限制。