全部产品
Search
文档中心

云消息队列 RabbitMQ 版:Spring集成

更新时间:Mar 15, 2024

云消息队列 RabbitMQ 版支持Spring框架的SDK,本文介绍如何集成Spring框架的SDK客户端收发消息。

前提条件

Demo工程示例

步骤一:引入Java依赖

创建Java工程,在pom.xml文件中添加如下依赖:

 <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

步骤二:参数配置

修改application.properties配置文件,将参数值修改为您在前提条件中创建的资源信息。

# 实例的接入点,在云消息队列 RabbitMQ 版控制台的实例详情⻚面获取。 
spring.rabbitmq.host=XXXXXX.amqp.aliyuncs.com
# 云消息队列RabbitMQ的连接端口,固定取值为5672。
spring.rabbitmq.port=5672
# 指定实例的静态用户名,在云消息队列 RabbitMQ 版控制台静态用户名密码管理⻚面查看。 
spring.rabbitmq.username=******
# 指定实例的静态用户密码,在云消息队列 RabbitMQ 版控制台静态用户名密码管理⻚面查看。 
spring.rabbitmq.password=******
# 虚拟主机,提供逻辑隔离,在云消息队列 RabbitMQ 版控制台Vhost列表⻚面查看。 
spring.rabbitmq.virtual-host=test_vhost
# 消费消息时,消息手动确认。
spring.rabbitmq.listener.simple.acknowledge-mode=manual
# 不创建AmqpAdmin bean。
spring.rabbitmq.dynamic=false    

以下其他配置您可以根据需要选择添加:

可选配置

属性

说明

spring.rabbitmq.addresses

指定客户端连接到的服务端地址,多个地址之间使用英文逗号(,)分隔。

spring.rabbitmq.hostspring.rabbitmq.addresses同时设置,则优先以spring.rabbitmq.addresses为准

spring.rabbitmq.dynamic

是否创建AmqpAdmin bean,默认值为true。

spring.rabbitmq.connection-timeout

连接超时时间,单位毫秒,0表示不超时。

spring.rabbitmq.requested-heartbeat

指定心跳超时时间,单位:秒,默认值为60。

spring.rabbitmq.publisher-confirms

是否启用发布确认机制。

spring.rabbitmq.publisher-returns

是否启用发布返回机制。

spring.rabbitmq.ssl.enabled

是否启用SSL证书认证。

spring.rabbitmq.ssl.key-store

持有SSL证书的key store路径。

spring.rabbitmq.ssl.key-store-password

访问key store的密码。

spring.rabbitmq.ssl.trust-store

持有SSL证书的可信地址。

spring.rabbitmq.ssl.trust-store-password

访问SSL的可信地址的密码。

spring.rabbitmq.ssl.algorithm

SSL使用的算法,例如:TLSv1.2。

spring.rabbitmq.ssl.validate-server-certificate

是否启用服务端证书验证。

spring.rabbitmq.ssl.verify-hostname

是否启用主机校验。

spring.rabbitmq.cache.channel.size

缓存中保持的channel数量。

spring.rabbitmq.cache.channel.checkout-timeout

当缓存数量被设置时,从缓存中获取一个channel的超时时间。

单位:毫秒,取值为0表示总是创建一个新channel。

spring.rabbitmq.cache.connection.size

缓存的连接数,只在CONNECTION模式下生效。

spring.rabbitmq.cache.connection.mode

连接缓存模式。取值如下:

  • CHANNEL

  • CONNECTION

推荐使用CONNECTION模式。

spring.rabbitmq.listener.type

Listener容器类型。取值如下:

  • simple

  • direct

默认值为simple。

spring.rabbitmq.listener.simple.auto-startup

是否应用启动时自动启动容器,默认为true。

spring.rabbitmq.listener.simple.acknowledge-mode

消息确认方式。取值如下:

  • none

  • manual

  • auto

默认值为auto。

spring.rabbitmq.listener.simple.concurrency

最小消费者的数量。

spring.rabbitmq.listener.simple.max-concurrency

最大消费者的数量。

spring.rabbitmq.listener.simple.prefetch

一个消费者最多可处理的nack消息数量,如果有事务的话,必须大于等于transaction数量。

spring.rabbitmq.listener.simple.transaction-size

事务处理的消息数量。

spring.rabbitmq.listener.simple.default-requeue-rejected

被拒绝的消息是否重新进入队列,默认值为true。

spring.rabbitmq.listener.simple.missing-queues-fatal

如果容器声明的队列在代理上不可用,是否失败;或者如果在运行时删除一个或多个队列,是否停止容器。默认值为true。

spring.rabbitmq.listener.simple.idle-event-interval

发布空闲容器事件的时间间隔,单位:毫秒。

spring.rabbitmq.listener.simple.retry.enabled

监听重试是否可用。

spring.rabbitmq.listener.simple.retry.max-attempts

消息发送的最大重试次数,默认值为3。

spring.rabbitmq.listener.simple.retry.max-interval

最大重试时间间隔,默认值为10000 ms。

spring.rabbitmq.listener.simple.retry.initial-interval

第一次和第二次尝试发送消息的时间间隔,默认值为1000 ms。

spring.rabbitmq.listener.simple.retry.multiplier

应用于上一重试间隔的乘数。

spring.rabbitmq.listener.simple.retry.stateless

重试时是否有状态,默认值为true。

spring.rabbitmq.template.mandatory

是否启用强制信息,默认值为false。

spring.rabbitmq.template.receive-timeout

receive()操作的超时时间。

spring.rabbitmq.template.reply-timeout

sendAndReceive()操作的超时时间。

spring.rabbitmq.template.retry.enabled

是否启用发送重试。

spring.rabbitmq.template.retry.max-attempts

最大重试次数,默认值为3。

spring.rabbitmq.template.retry.initial-interval

第一次和第二次尝试发送消息的时间间隔,默认值为1000 ms。

spring.rabbitmq.template.retry.multiplier

应用于上一重试间隔的乘数。

spring.rabbitmq.template.retry.max-interval

最大重试时间间隔,默认值为10000 ms。

步骤三:调用SDK收发消息

创建连接

推荐缓存模式:

客户端和服务端建立连接时,推荐将缓存模式设置为CONNECTION模式。

该模式下支持创建多个Connection,程序会缓存一定数量的Connection,每个Connection中缓存一定的Channel。

云消息队列 RabbitMQ 版是集群分布式架构,在CONNECTION模式下,创建多个connection可以帮助客户端更好地和集群的多个服务节点连接,更高效地发送和消费消息。

参数设置如下:

// 缓存模式设置为CONNECTION。
connectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CONNECTION);
// CONNECTION模式下,最大可缓存的connection数量。
connectionFactory.setConnectionCacheSize(10);
// CONNECTION模式下,最大可缓存的Channel数量。
connectionFactory.setChannelCacheSize(64);

完整的创建连接的代码示例如下:

import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {

    @Value("${spring.rabbitmq.host}")
    private String host;

    @Value("${spring.rabbitmq.port}")
    private int port;

    @Value("${spring.rabbitmq.username}")
    private String username;

    @Value("${spring.rabbitmq.password}")
    private String password;

    @Value("${spring.rabbitmq.virtual-host}")
    private String virtualHost;

    @Bean
    public ConnectionFactory connectionFactory() 
    {
        // 初始化RabbitMQ连接配置connectionFactory。
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost(host);
        connectionFactory.setPort(port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        // VirtualHost可以在RabbitMQ控制台手动创建,也可以在这里自动创建。
        connectionFactory.setVirtualHost(virtualHost);
        // 请务必开启Connection自动重连功能,保证服务端发布时客户端可自动重新连接上服务端。
        connectionFactory.getRabbitConnectionFactory().setAutomaticRecoveryEnabled(true);
		    // 缓存模式推荐设置为CONNECTION。
        connectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CONNECTION);
        // CONNECTION模式下,最大可缓存的connection数量。
        connectionFactory.setConnectionCacheSize(10);
        // CONNECTION模式下,最大可缓存的Channel数量。
        connectionFactory.setChannelCacheSize(64);
        
        return connectionFactory;
    }
    
    @Bean
    @ConditionalOnClass
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(CachingConnectionFactory connectionFactory)
    {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        return factory;
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) 
    {
        // RabbitMQ消息模板,该模板封装了多种常用的消息操作。
        return new RabbitTemplate(connectionFactory);     
    }

}

消息生产

在RabbitMQService中通过依赖注入的方式获取RabbitTemplate,并调用其提供的send接口发送消息。

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.nio.charset.StandardCharsets;
import java.util.UUID;

@Service
public class RabbitMQService {
    
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendMessage(String exchange, String routingKey, String content) {
        // 设置MessageId。
        String msgId = UUID.randomUUID().toString();
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setMessageId(msgId);
        // 创建Message。
        Message message = new Message(content.getBytes(StandardCharsets.UTF_8), messageProperties);
        /*
         * 发送消息send()接口;
         * exchange:交换机名称;
         * routingKey:路由键;
         * message:消息内容;
         * correlationData用于关联发布者确认;
         */
        rabbitTemplate.send(exchange, routingKey, message, null);
    }
}

消息消费

使用@RabbitListener注解消费消息:

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Arrays;

@Component
public class MessageListener {

  	/**
     * 消息接收;
     * @param message消息;
     *queues需要替换为您提前创建好的队列的名称;
     */
    @RabbitListener(queues = "myQueue")
    public void receiveFromMyQueue(Message message, Channel channel) {
        // 进入消息消费业务逻辑。
        ...
        // 在1分钟内,返回ack,否则为无效确认,消息还会被重复投递。
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}