全部產品
Search
文件中心

ApsaraMQ for RabbitMQ:Spring整合

更新時間:Dec 03, 2025

雲訊息佇列 RabbitMQ 版支援Spring架構的SDK,本文介紹如何整合Spring架構的SDK用戶端收發訊息。

前提條件

Demo工程樣本

單擊SpringBootDemo.zip下載Demo工程樣本。

步驟一:參數配置

application.propertiesapplication.yml檔案中填寫配置參數,以application.properties為例。

# 設定存取點,在雲訊息佇列 RabbitMQ 版控制台執行個體詳情⻚面擷取。 
spring.rabbitmq.host=XXXXXX.amqp.aliyuncs.com
# 雲訊息佇列RabbitMQ串連連接埠。
spring.rabbitmq.port=5672
# 指定執行個體的靜態使用者名稱,在雲訊息佇列 RabbitMQ 版控制台靜態使用者名稱密碼管理⻚面查看。 
spring.rabbitmq.username=******
# 指定執行個體的靜態使用者密碼,在雲訊息佇列 RabbitMQ 版控制台靜態使用者名稱密碼管理⻚面查看。 
spring.rabbitmq.password=******
# 虛擬機器主機,提供邏輯隔離,在雲訊息佇列 RabbitMQ 版控制台Vhost列表⻚面查看。 
spring.rabbitmq.virtual-host=test_vhost
# 訊息確認(Ack)模式。
# 1. none:消費者接收訊息後,無論消費成功與否,服務端均認為訊息已成功處理,即RabbitMQ中的autoAck模式。
# 2. auto:用戶端在消費成功後主動回複ack(處理失敗則回複nack或拋出異常),不需要用戶端顯式調用Channel.basicAck()。
# 3. manual: 手動回複Ack,需要用戶端在消費成功後顯式調用Channel.basicAck()主動回複。
spring.rabbitmq.listener.simple.acknowledge-mode=manual

#推薦設定為CONNECTION模式,雲訊息佇列 RabbitMQ 版是叢集分布式架構,在CONNECTION模式下,用戶端能夠更均衡地串連到叢集中的多個服務節點。這種方法可以有效避免出現負載熱點問題,從而提高訊息的發送和消費效率。
spring.rabbitmq.cache.connection.mode=connection
#根據情況增減
spring.rabbitmq.cache.connection.size=50
#根據情況增減
spring.rabbitmq.cache.channel.size=1

# 一個消費者最多可處理的未被確認(Ack)訊息數量(QoS),RabbitMQ服務端取min{prefetch, 100}作為QoS。如果消費者處理能力弱,減小設定
spring.rabbitmq.listener.simple.prefetch=100
# RabbitMQ監聽器(Listener)的最小並發消費者數量,根據實際增減
spring.rabbitmq.listener.simple.concurrency=10
# RabbitMQ監聽器(Listener)的最大並發消費者數量,當消費速率足夠快時,用戶端會啟動max-concurrency個消費者進行消費。
spring.rabbitmq.listener.simple.max-concurrency=20

以下其他配置您可以根據需要選擇添加:

可選配置

屬性

說明

spring.rabbitmq.addresses

指定用戶端串連到的服務端地址,多個地址之間使用英文逗號(,)分隔。

spring.rabbitmq.hostspring.rabbitmq.addresses同時設定,則優先以spring.rabbitmq.addresses為準

spring.rabbitmq.dynamic

是否建立AmqpAdmin bean,預設值為true。

spring.rabbitmq.connection-timeout

連線逾時時間,單位毫秒,0表示不逾時。

spring.rabbitmq.requested-heartbeat

指定心跳逾時時間,單位:秒,預設值為60。

spring.rabbitmq.publisher-confirms

是否啟用發布確認機制。

spring.rabbitmq.publisher-returns

是否啟用發布返回機制。

spring.rabbitmq.ssl.enabled

是否啟用SSL認證認證。

spring.rabbitmq.ssl.key-store

持有SSL認證的key store路徑。

spring.rabbitmq.ssl.key-store-password

訪問key store的密碼。

spring.rabbitmq.ssl.trust-store

持有SSL認證的可信地址。

spring.rabbitmq.ssl.trust-store-password

訪問SSL的可信地址的密碼。

spring.rabbitmq.ssl.algorithm

SSL使用的演算法,例如:TLSv1.2。

spring.rabbitmq.ssl.validate-server-certificate

是否啟用服務端認證驗證。

spring.rabbitmq.ssl.verify-hostname

是否啟用主機校正。

spring.rabbitmq.cache.channel.size

緩衝中保持的channel數量。

spring.rabbitmq.cache.channel.checkout-timeout

當緩衝數量被設定時,從緩衝中擷取一個channel的逾時時間。

單位:毫秒,取值為0表示總是建立一個新channel。

spring.rabbitmq.cache.connection.size

緩衝的串連數,只在CONNECTION模式下生效。

spring.rabbitmq.cache.connection.mode

串連緩衝模式。取值如下:

  • CHANNEL

  • CONNECTION

推薦使用CONNECTION模式。

spring.rabbitmq.listener.type

Listener容器類型。取值如下:

  • simple

  • direct

預設值為simple。

spring.rabbitmq.listener.simple.auto-startup

是否應用啟動時自動啟動容器,預設為true。

spring.rabbitmq.listener.simple.acknowledge-mode

訊息確認方式。取值如下:

  • none:消費者接收訊息後,無論消費成功與否,服務端均認為訊息已成功處理,即RabbitMQ中的autoAck模式。

  • manual:手動回複Ack,需要用戶端在消費成功後顯式調用Basic.ack主動回複。

  • auto:用戶端在消費成功後主動回複ack(處理失敗則回複nack或拋出異常),不需要用戶端顯式調用Basic.ack。

預設值為auto。

spring.rabbitmq.listener.simple.concurrency

最小消費者的數量。

spring.rabbitmq.listener.simple.max-concurrency

最大消費者的數量。

spring.rabbitmq.listener.simple.prefetch

一個消費者最多可處理的未被確認(Ack)的訊息數量(等同於用戶端調用Basic.qos設定QoS值),如果有事務的話,必須大於等於transaction數量。

spring.rabbitmq.listener.simple.transaction-size

交易處理的訊息數量。

spring.rabbitmq.listener.simple.default-requeue-rejected

被拒絕的訊息是否重新進入隊列,預設值為true。

spring.rabbitmq.listener.simple.missing-queues-fatal

如果容器聲明的隊列在代理上不可用,是否失敗;或者如果在運行時刪除一個或多個隊列,是否停止容器。預設值為true。

spring.rabbitmq.listener.simple.idle-event-interval

發布空閑容器事件的時間間隔,單位:毫秒。

spring.rabbitmq.template.mandatory

是否啟用強制資訊,預設值為false。

spring.rabbitmq.template.receive-timeout

receive()操作的逾時時間。

spring.rabbitmq.template.reply-timeout

sendAndReceive()操作的逾時時間。

步驟二:調用SDK收發訊息

訊息生產

在RabbitMQService中通過依賴注入的方式擷取RabbitTemplate,並調用其提供的send介面發送訊息。

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.nio.charset.StandardCharsets;
import java.util.UUID;

@Service
public class RabbitMQService {
    
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendMessage(String exchange, String routingKey, String content) {
        // 設定MessageId。
        String msgId = UUID.randomUUID().toString();
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setMessageId(msgId);
        // 建立Message。
        Message message = new Message(content.getBytes(StandardCharsets.UTF_8), messageProperties);
        /*
         * 發送訊息send()介面;
         * exchange:交換器名稱;
         * routingKey:路由鍵;
         * message:訊息內容;
         * correlationData用於關聯發行者確認;
         */
        rabbitTemplate.send(exchange, routingKey, message, null);
    }
}

訊息消費

使用@RabbitListener註解消費訊息:

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Arrays;

@Component
public class MessageListener {

     /**
     * 訊息接收;
     * @param message訊息;
     * @param channel通道;
     * @throws IOException
     *queues需要替換為您提前建立好的隊列的名稱;
     */
    @RabbitListener(queues = "myQueue")
    public void receiveFromMyQueue(Message message, Channel channel) throws IOException {
        // 進入訊息消費商務邏輯。
        ...
        // 須在Ack有效期間(消費逾時時間)內返回Ack,否則為無效確認,訊息還會被重複投遞。
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

RabbitListener常見可選配置如下:

可選配置

屬性

說明

ackMode

自訂訊息確認模式,覆蓋spring.rabbitmq.listener.simple.acknowledge-mode設定。

admin

引用AmqpAdmin用於AMQP資源管理。

autoStartup

是否在應用啟動時自動啟動容器,覆蓋spring.rabbitmq.listener.simple.auto-startup設定。

bindings

配置隊列與交換器綁定的數組,包含綁定資訊。

concurrency

設定監聽器容器(Listener)的並發線程數。

errorHandler

為監聽方法拋出的異常配置錯誤處理器。

exclusive

啟用隊列的獨佔模式,指消費者獨佔隊列,其他消費者無法從該隊列接收訊息,暫不支援。

queues

聲明此監聽器所監聽的隊列。

queuesToDeclare

指定要顯式聲明的隊列。