ApsaraMQ for RabbitMQ provides an SDK for the Spring framework. This topic describes how to integrate the Spring SDK to send and receive messages.
Prerequisites
You have created resources, such as an instance, a vhost, an exchange, and a queue, in the ApsaraMQ for RabbitMQ console. For more information, see Step 2: Create resources.
Demo project
Click SpringBootDemo.zip to download the demo project.
Step 1: Configure parameters
In the application.properties or application.yml file, set the configuration parameters. The following example uses the application.properties file.
# The endpoint. Obtain the endpoint on the Instance Details page in the ApsaraMQ for RabbitMQ console.
spring.rabbitmq.host=XXXXXX.amqp.aliyuncs.com
# The port used to connect to ApsaraMQ for RabbitMQ.
spring.rabbitmq.port=5672
# The static username of the instance. View the username on the Static Accounts page in the ApsaraMQ for RabbitMQ console.
spring.rabbitmq.username=******
# The static password of the instance. View the password on the Static Accounts page in the ApsaraMQ for RabbitMQ console.
spring.rabbitmq.password=******
# The virtual host, which provides logical isolation. View the virtual host on the Vhosts page in the ApsaraMQ for RabbitMQ console.
spring.rabbitmq.virtual-host=test_vhost
# The message acknowledgment (Ack) mode.
# 1. none: After a consumer receives a message, the server considers the message successfully processed, regardless of whether the consumption is successful. This is the autoAck mode in RabbitMQ.
# 2. auto: The client automatically sends an ack after a message is successfully consumed. If the message fails to be processed, the client sends a nack or throws an exception. You do not need to explicitly call Channel.basicAck().
# 3. manual: Manually send an Ack. You must explicitly call Channel.basicAck() after a message is successfully consumed.
spring.rabbitmq.listener.simple.acknowledge-mode=manual
# Set the cache mode to CONNECTION. ApsaraMQ for RabbitMQ uses a distributed architecture. In CONNECTION mode, clients can connect to multiple service nodes in the cluster in a more balanced way. This method can effectively prevent load hot spots and improve the efficiency of message sending and consumption.
spring.rabbitmq.cache.connection.mode=connection
# Adjust the value as needed.
spring.rabbitmq.cache.connection.size=50
# Adjust the value as needed.
spring.rabbitmq.cache.channel.size=1
# The maximum number of unacknowledged (Ack) messages that a consumer can process at a time (QoS). The ApsaraMQ for RabbitMQ server uses min{prefetch, 100} as the QoS value. If the consumer has low processing capabilities, decrease this value.
spring.rabbitmq.listener.simple.prefetch=100
# The minimum number of concurrent consumers for the RabbitMQ listener. Adjust the value as needed.
spring.rabbitmq.listener.simple.concurrency=10
# The maximum number of concurrent consumers for the RabbitMQ listener. When the consumption rate is high enough, the client starts max-concurrency consumers to consume messages.
spring.rabbitmq.listener.simple.max-concurrency=20You can add the following optional configurations as needed:
Step 2: Use the SDK to send and receive messages
Produce messages
In RabbitMQService, obtain RabbitTemplate through dependency injection and call its send method to send a message.
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
@Service
public class RabbitMQService {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMessage(String exchange, String routingKey, String content) {
// Set the MessageId.
String msgId = UUID.randomUUID().toString();
MessageProperties messageProperties = new MessageProperties();
messageProperties.setMessageId(msgId);
// Create a Message.
Message message = new Message(content.getBytes(StandardCharsets.UTF_8), messageProperties);
/*
* Call the send() interface to send the message.
* exchange: the name of the exchange.
* routingKey: the routing key.
* message: the message content.
* correlationData is used for publisher confirms.
*/
rabbitTemplate.send(exchange, routingKey, message, null);
}
}Consume messages
Use the @RabbitListener annotation to consume messages:
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Arrays;
@Component
public class MessageListener {
/**
* Receive a message.
* @param message The message.
* @param channel The channel.
* @throws IOException
* Replace queues with the name of the queue that you created.
*/
@RabbitListener(queues = "myQueue")
public void receiveFromMyQueue(Message message, Channel channel) throws IOException {
// Enter the business logic for message consumption.
...
// You must return an Ack within the Ack validity period (consumption timeout). Otherwise, the confirmation is invalid and the message is redelivered.
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}The following are common optional configurations for RabbitListener: