雲訊息佇列 RabbitMQ 版支援Spring架構的SDK,本文介紹如何整合Spring架構的SDK用戶端收發訊息。
前提條件
您已在雲訊息佇列 RabbitMQ 版控制台建立執行個體、Vhost、Exchange、Queue等相關資源,具體操作,請參見步驟二:建立資源。
Demo工程樣本
單擊SpringBootDemo.zip下載Demo工程樣本。
步驟一:參數配置
在application.properties或application.yml檔案中填寫配置參數,以application.properties為例。
# 設定存取點,在雲訊息佇列 RabbitMQ 版控制台執行個體詳情⻚面擷取。
spring.rabbitmq.host=XXXXXX.amqp.aliyuncs.com
# 雲訊息佇列RabbitMQ串連連接埠。
spring.rabbitmq.port=5672
# 指定執行個體的靜態使用者名稱,在雲訊息佇列 RabbitMQ 版控制台靜態使用者名稱密碼管理⻚面查看。
spring.rabbitmq.username=******
# 指定執行個體的靜態使用者密碼,在雲訊息佇列 RabbitMQ 版控制台靜態使用者名稱密碼管理⻚面查看。
spring.rabbitmq.password=******
# 虛擬機器主機,提供邏輯隔離,在雲訊息佇列 RabbitMQ 版控制台Vhost列表⻚面查看。
spring.rabbitmq.virtual-host=test_vhost
# 訊息確認(Ack)模式。
# 1. none:消費者接收訊息後,無論消費成功與否,服務端均認為訊息已成功處理,即RabbitMQ中的autoAck模式。
# 2. auto:用戶端在消費成功後主動回複ack(處理失敗則回複nack或拋出異常),不需要用戶端顯式調用Channel.basicAck()。
# 3. manual: 手動回複Ack,需要用戶端在消費成功後顯式調用Channel.basicAck()主動回複。
spring.rabbitmq.listener.simple.acknowledge-mode=manual
#推薦設定為CONNECTION模式,雲訊息佇列 RabbitMQ 版是叢集分布式架構,在CONNECTION模式下,用戶端能夠更均衡地串連到叢集中的多個服務節點。這種方法可以有效避免出現負載熱點問題,從而提高訊息的發送和消費效率。
spring.rabbitmq.cache.connection.mode=connection
#根據情況增減
spring.rabbitmq.cache.connection.size=50
#根據情況增減
spring.rabbitmq.cache.channel.size=1
# 一個消費者最多可處理的未被確認(Ack)訊息數量(QoS),RabbitMQ服務端取min{prefetch, 100}作為QoS。如果消費者處理能力弱,減小設定
spring.rabbitmq.listener.simple.prefetch=100
# RabbitMQ監聽器(Listener)的最小並發消費者數量,根據實際增減
spring.rabbitmq.listener.simple.concurrency=10
# RabbitMQ監聽器(Listener)的最大並發消費者數量,當消費速率足夠快時,用戶端會啟動max-concurrency個消費者進行消費。
spring.rabbitmq.listener.simple.max-concurrency=20以下其他配置您可以根據需要選擇添加:
步驟二:調用SDK收發訊息
訊息生產
在RabbitMQService中通過依賴注入的方式擷取RabbitTemplate,並調用其提供的send介面發送訊息。
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
@Service
public class RabbitMQService {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMessage(String exchange, String routingKey, String content) {
// 設定MessageId。
String msgId = UUID.randomUUID().toString();
MessageProperties messageProperties = new MessageProperties();
messageProperties.setMessageId(msgId);
// 建立Message。
Message message = new Message(content.getBytes(StandardCharsets.UTF_8), messageProperties);
/*
* 發送訊息send()介面;
* exchange:交換器名稱;
* routingKey:路由鍵;
* message:訊息內容;
* correlationData用於關聯發行者確認;
*/
rabbitTemplate.send(exchange, routingKey, message, null);
}
}訊息消費
使用@RabbitListener註解消費訊息:
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Arrays;
@Component
public class MessageListener {
/**
* 訊息接收;
* @param message訊息;
* @param channel通道;
* @throws IOException
*queues需要替換為您提前建立好的隊列的名稱;
*/
@RabbitListener(queues = "myQueue")
public void receiveFromMyQueue(Message message, Channel channel) throws IOException {
// 進入訊息消費商務邏輯。
...
// 須在Ack有效期間(消費逾時時間)內返回Ack,否則為無效確認,訊息還會被重複投遞。
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}RabbitListener常見可選配置如下: