云消息队列 RabbitMQ 版支持Spring框架的SDK,本文介绍如何集成Spring框架的SDK客户端收发消息。
前提条件
您已在云消息队列 RabbitMQ 版控制台创建实例、Vhost、Exchange、Queue等相关资源,具体操作,请参见步骤二:创建资源。
Demo工程示例
步骤一:引入Java依赖
创建Java工程,在pom.xml
文件中添加如下依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
步骤二:参数配置
修改application.properties
配置文件,将参数值修改为您在前提条件中创建的资源信息。
# 实例的接入点,在云消息队列 RabbitMQ 版控制台的实例详情⻚面获取。
spring.rabbitmq.host=XXXXXX.amqp.aliyuncs.com
# 云消息队列RabbitMQ的连接端口,固定取值为5672。
spring.rabbitmq.port=5672
# 指定实例的静态用户名,在云消息队列 RabbitMQ 版控制台静态用户名密码管理⻚面查看。
spring.rabbitmq.username=******
# 指定实例的静态用户密码,在云消息队列 RabbitMQ 版控制台静态用户名密码管理⻚面查看。
spring.rabbitmq.password=******
# 虚拟主机,提供逻辑隔离,在云消息队列 RabbitMQ 版控制台Vhost列表⻚面查看。
spring.rabbitmq.virtual-host=test_vhost
# 消费消息时,消息手动确认。
spring.rabbitmq.listener.simple.acknowledge-mode=manual
# 不创建AmqpAdmin bean。
spring.rabbitmq.dynamic=false
以下其他配置您可以根据需要选择添加:
步骤三:调用SDK收发消息
创建连接
推荐缓存模式:
客户端和服务端建立连接时,推荐将缓存模式设置为CONNECTION模式。
该模式下支持创建多个Connection,程序会缓存一定数量的Connection,每个Connection中缓存一定的Channel。
云消息队列 RabbitMQ 版是集群分布式架构,在CONNECTION模式下,创建多个connection可以帮助客户端更好地和集群的多个服务节点连接,更高效地发送和消费消息。
参数设置如下:
// 缓存模式设置为CONNECTION。
connectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CONNECTION);
// CONNECTION模式下,最大可缓存的connection数量。
connectionFactory.setConnectionCacheSize(10);
// CONNECTION模式下,最大可缓存的Channel数量。
connectionFactory.setChannelCacheSize(64);
完整的创建连接的代码示例如下:
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
@Value("${spring.rabbitmq.host}")
private String host;
@Value("${spring.rabbitmq.port}")
private int port;
@Value("${spring.rabbitmq.username}")
private String username;
@Value("${spring.rabbitmq.password}")
private String password;
@Value("${spring.rabbitmq.virtual-host}")
private String virtualHost;
@Bean
public ConnectionFactory connectionFactory()
{
// 初始化RabbitMQ连接配置connectionFactory。
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setPort(port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
// VirtualHost可以在RabbitMQ控制台手动创建,也可以在这里自动创建。
connectionFactory.setVirtualHost(virtualHost);
// 请务必开启Connection自动重连功能,保证服务端发布时客户端可自动重新连接上服务端。
connectionFactory.getRabbitConnectionFactory().setAutomaticRecoveryEnabled(true);
// 缓存模式推荐设置为CONNECTION。
connectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CONNECTION);
// CONNECTION模式下,最大可缓存的connection数量。
connectionFactory.setConnectionCacheSize(10);
// CONNECTION模式下,最大可缓存的Channel数量。
connectionFactory.setChannelCacheSize(64);
return connectionFactory;
}
@Bean
@ConditionalOnClass
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(CachingConnectionFactory connectionFactory)
{
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return factory;
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory)
{
// RabbitMQ消息模板,该模板封装了多种常用的消息操作。
return new RabbitTemplate(connectionFactory);
}
}
消息生产
在RabbitMQService中通过依赖注入的方式获取RabbitTemplate,并调用其提供的send接口发送消息。
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
@Service
public class RabbitMQService {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMessage(String exchange, String routingKey, String content) {
// 设置MessageId。
String msgId = UUID.randomUUID().toString();
MessageProperties messageProperties = new MessageProperties();
messageProperties.setMessageId(msgId);
// 创建Message。
Message message = new Message(content.getBytes(StandardCharsets.UTF_8), messageProperties);
/*
* 发送消息send()接口;
* exchange:交换机名称;
* routingKey:路由键;
* message:消息内容;
* correlationData用于关联发布者确认;
*/
rabbitTemplate.send(exchange, routingKey, message, null);
}
}
消息消费
使用@RabbitListener
注解消费消息:
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Arrays;
@Component
public class MessageListener {
/**
* 消息接收;
* @param message消息;
*queues需要替换为您提前创建好的队列的名称;
*/
@RabbitListener(queues = "myQueue")
public void receiveFromMyQueue(Message message, Channel channel) {
// 进入消息消费业务逻辑。
...
// 在1分钟内,返回ack,否则为无效确认,消息还会被重复投递。
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}