全部产品
Search
文档中心

云消息队列 RabbitMQ 版:Spring集成

更新时间:Apr 26, 2024

云消息队列 RabbitMQ 版支持Spring框架的SDK,本文介绍如何集成Spring框架的SDK客户端收发消息。

前提条件

Demo工程示例

单击SpringBootDemo下载Demo工程示例。

步骤一:引入Java依赖

创建Java工程,在pom.xml文件中添加如下依赖:

 <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

步骤二:参数配置

application.propertiesapplication.yml文件中填写配置参数,以application.properties为例。

# 设置接入点,在云消息队列 RabbitMQ 版控制台实例详情⻚面获取。 
spring.rabbitmq.host=XXXXXX.amqp.aliyuncs.com
# 云消息队列RabbitMQ连接端口。
spring.rabbitmq.port=5672
# 指定实例的静态用户名,在云消息队列 RabbitMQ 版控制台静态用户名密码管理⻚面查看。 
spring.rabbitmq.username=******
# 指定实例的静态用户密码,在云消息队列 RabbitMQ 版控制台静态用户名密码管理⻚面查看。 
spring.rabbitmq.password=******
# 虚拟主机,提供逻辑隔离,在云消息队列 RabbitMQ 版控制台Vhost列表⻚面查看。 
spring.rabbitmq.virtual-host=test_vhost
# 消息确认(Ack)模式。
# 1. none:消费者接收消息后,无论消费成功与否,服务端均认为消息已成功处理,即RabbitMQ中的autoAck模式。
# 2. auto:客户端在消费成功后主动回复ack(处理失败则回复nack或抛出异常),不需要客户端显式调用Channel.basicAck()。
# 3. manual: 手动回复Ack,需要客户端在消费成功后显式调用Channel.basicAck()主动回复。
spring.rabbitmq.listener.simple.acknowledge-mode=manual

# 一个消费者最多可处理的未被确认(Ack)消息数量(QoS),RabbitMQ服务端取min{prefetch, 100}作为QoS。
spring.rabbitmq.listener.simple.prefetch=10
# RabbitMQ监听器(Listener)的最小并发消费者数量。
spring.rabbitmq.listener.simple.concurrency=2
# RabbitMQ监听器(Listener)的最大并发消费者数量,当消费速率足够快时,客户端会启动max-concurrency个消费者进行消费。
spring.rabbitmq.listener.simple.max-concurrency=5

以下其他配置您可以根据需要选择添加:

可选配置

属性

说明

spring.rabbitmq.addresses

指定客户端连接到的服务端地址,多个地址之间使用英文逗号(,)分隔。

spring.rabbitmq.hostspring.rabbitmq.addresses同时设置,则优先以spring.rabbitmq.addresses为准

spring.rabbitmq.dynamic

是否创建AmqpAdmin bean,默认值为true。

spring.rabbitmq.connection-timeout

连接超时时间,单位毫秒,0表示不超时。

spring.rabbitmq.requested-heartbeat

指定心跳超时时间,单位:秒,默认值为60。

spring.rabbitmq.publisher-confirms

是否启用发布确认机制。

spring.rabbitmq.publisher-returns

是否启用发布返回机制。

spring.rabbitmq.ssl.enabled

是否启用SSL证书认证。

spring.rabbitmq.ssl.key-store

持有SSL证书的key store路径。

spring.rabbitmq.ssl.key-store-password

访问key store的密码。

spring.rabbitmq.ssl.trust-store

持有SSL证书的可信地址。

spring.rabbitmq.ssl.trust-store-password

访问SSL的可信地址的密码。

spring.rabbitmq.ssl.algorithm

SSL使用的算法,例如:TLSv1.2。

spring.rabbitmq.ssl.validate-server-certificate

是否启用服务端证书验证。

spring.rabbitmq.ssl.verify-hostname

是否启用主机校验。

spring.rabbitmq.cache.channel.size

缓存中保持的channel数量。

spring.rabbitmq.cache.channel.checkout-timeout

当缓存数量被设置时,从缓存中获取一个channel的超时时间。

单位:毫秒,取值为0表示总是创建一个新channel。

spring.rabbitmq.cache.connection.size

缓存的连接数,只在CONNECTION模式下生效。

spring.rabbitmq.cache.connection.mode

连接缓存模式。取值如下:

  • CHANNEL

  • CONNECTION

推荐使用CONNECTION模式。

spring.rabbitmq.listener.type

Listener容器类型。取值如下:

  • simple

  • direct

默认值为simple。

spring.rabbitmq.listener.simple.auto-startup

是否应用启动时自动启动容器,默认为true。

spring.rabbitmq.listener.simple.acknowledge-mode

消息确认方式。取值如下:

  • none:消费者接收消息后,无论消费成功与否,服务端均认为消息已成功处理,即RabbitMQ中的autoAck模式。

  • manual:手动回复Ack,需要客户端在消费成功后显式调用Basic.ack主动回复。

  • auto:客户端在消费成功后主动回复ack(处理失败则回复nack或抛出异常),不需要客户端显式调用Basic.ack。

默认值为auto。

spring.rabbitmq.listener.simple.concurrency

最小消费者的数量。

spring.rabbitmq.listener.simple.max-concurrency

最大消费者的数量。

spring.rabbitmq.listener.simple.prefetch

一个消费者最多可处理的未被确认(Ack)的消息数量(等同于客户端调用Basic.qos设置QoS值),如果有事务的话,必须大于等于transaction数量。

spring.rabbitmq.listener.simple.transaction-size

事务处理的消息数量。

spring.rabbitmq.listener.simple.default-requeue-rejected

被拒绝的消息是否重新进入队列,默认值为true。

spring.rabbitmq.listener.simple.missing-queues-fatal

如果容器声明的队列在代理上不可用,是否失败;或者如果在运行时删除一个或多个队列,是否停止容器。默认值为true。

spring.rabbitmq.listener.simple.idle-event-interval

发布空闲容器事件的时间间隔,单位:毫秒。

spring.rabbitmq.listener.simple.retry.enabled

监听重试是否可用。

spring.rabbitmq.listener.simple.retry.max-attempts

消息发送的最大重试次数,默认值为3。

spring.rabbitmq.listener.simple.retry.max-interval

最大重试时间间隔,默认值为10000 ms。

spring.rabbitmq.listener.simple.retry.initial-interval

第一次和第二次尝试发送消息的时间间隔,默认值为1000 ms。

spring.rabbitmq.listener.simple.retry.multiplier

应用于上一重试间隔的乘数。

spring.rabbitmq.listener.simple.retry.stateless

重试时是否有状态,默认值为true。

spring.rabbitmq.template.mandatory

是否启用强制信息,默认值为false。

spring.rabbitmq.template.receive-timeout

receive()操作的超时时间。

spring.rabbitmq.template.reply-timeout

sendAndReceive()操作的超时时间。

spring.rabbitmq.template.retry.enabled

是否启用发送重试。

spring.rabbitmq.template.retry.max-attempts

最大重试次数,默认值为3。

spring.rabbitmq.template.retry.initial-interval

第一次和第二次尝试发送消息的时间间隔,默认值为1000 ms。

spring.rabbitmq.template.retry.multiplier

应用于上一重试间隔的乘数。

spring.rabbitmq.template.retry.max-interval

最大重试时间间隔,默认值为10000 ms。

步骤三:调用SDK收发消息

创建连接

推荐缓存模式:

客户端和服务端建立连接时,推荐将缓存模式设置为CONNECTION模式。

该模式下支持创建多个Connection,程序会缓存一定数量的Connection,每个Connection中缓存一定的Channel。

云消息队列 RabbitMQ 版是集群分布式架构,在CONNECTION模式下,创建多个connection可以帮助客户端更好地和集群的多个服务节点连接,更高效地发送和消费消息。

参数设置如下:

// 缓存模式设置为CONNECTION。
connectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CONNECTION);
// CONNECTION模式下,最大可缓存的connection数量。
connectionFactory.setConnectionCacheSize(10);
// CONNECTION模式下,最大可缓存的Channel数量。
connectionFactory.setChannelCacheSize(64);

完整的创建连接的代码示例如下:

import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {

    @Value("${spring.rabbitmq.host}")
    private String host;

    @Value("${spring.rabbitmq.port}")
    private int port;

    @Value("${spring.rabbitmq.username}")
    private String username;

    @Value("${spring.rabbitmq.password}")
    private String password;

    @Value("${spring.rabbitmq.virtual-host}")
    private String virtualHost;

    @Bean
    public ConnectionFactory connectionFactory() 
    {
        // 初始化RabbitMQ连接配置connectionFactory。
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost(host);
        connectionFactory.setPort(port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        // VirtualHost可以在RabbitMQ控制台手动创建,也可以在这里自动创建。
        connectionFactory.setVirtualHost(virtualHost);
        // 请务必开启Connection自动重连功能,保证服务端发布时客户端可自动重新连接上服务端。
        connectionFactory.getRabbitConnectionFactory().setAutomaticRecoveryEnabled(true);
		    // 缓存模式推荐设置为CONNECTION。
        connectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CONNECTION);
        // CONNECTION模式下,最大可缓存的connection数量。
        connectionFactory.setConnectionCacheSize(10);
        // CONNECTION模式下,最大可缓存的Channel数量。
        connectionFactory.setChannelCacheSize(64);
        
        return connectionFactory;
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) 
    {
        // RabbitMQ消息模板,该模板封装了多种常用的消息操作。
        return new RabbitTemplate(connectionFactory);     
    }

}

消息生产

在RabbitMQService中通过依赖注入的方式获取RabbitTemplate,并调用其提供的send接口发送消息。

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.nio.charset.StandardCharsets;
import java.util.UUID;

@Service
public class RabbitMQService {
    
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendMessage(String exchange, String routingKey, String content) {
        // 设置MessageId。
        String msgId = UUID.randomUUID().toString();
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setMessageId(msgId);
        // 创建Message。
        Message message = new Message(content.getBytes(StandardCharsets.UTF_8), messageProperties);
        /*
         * 发送消息send()接口;
         * exchange:交换机名称;
         * routingKey:路由键;
         * message:消息内容;
         * correlationData用于关联发布者确认;
         */
        rabbitTemplate.send(exchange, routingKey, message, null);
    }
}

消息消费

使用@RabbitListener注解消费消息:

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Arrays;

@Component
public class MessageListener {

  	/**
     * 消息接收;
     * @param message消息;
     *queues需要替换为您提前创建好的队列的名称;
     */
    @RabbitListener(queues = "myQueue")
    public void receiveFromMyQueue(Message message, Channel channel) {
        // 进入消息消费业务逻辑。
        ...
        // 须在Ack有效期(消息重试间隔时间)内返回Ack,否则为无效确认,消息还会被重复投递。
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

RabbitListener常见可选配置如下:

可选配置

属性

说明

ackMode

自定义消息确认模式,覆盖spring.rabbitmq.listener.simple.acknowledge-mode设置。

admin

引用AmqpAdmin用于AMQP资源管理。

autoStartup

是否在应用启动时自动启动容器,覆盖spring.rabbitmq.listener.simple.auto-startup设置。

bindings

配置队列与交换机绑定的数组,包含绑定信息。

concurrency

设置监听器容器(Listener)的并发线程数。

errorHandler

为监听方法抛出的异常配置错误处理器。

exclusive

启用队列的独占模式,指消费者独占队列,其他消费者无法从该队列接收消息,暂不支持。

queues

声明此监听器所监听的队列。

queuesToDeclare

指定要显式声明的队列。