ApsaraMQ for RabbitMQ provides an SDK for the Spring framework. This topic describes how to integrate the Spring SDK to send and receive messages.
Prerequisites
-
You have created resources, such as an instance, a vhost, an exchange, and a queue, in the ApsaraMQ for RabbitMQ console. For more information, see Step 2: Create resources.
Demo project
Click SpringBootDemo.zip to download the demo project.
Step 1: Configure parameters
In the application.properties or application.yml file, set the configuration parameters. The following example uses the application.properties file.
# The endpoint. Obtain the endpoint on the Instance Details page in the ApsaraMQ for RabbitMQ console.
spring.rabbitmq.host=XXXXXX.amqp.aliyuncs.com
# The port used to connect to ApsaraMQ for RabbitMQ.
spring.rabbitmq.port=5672
# The static username of the instance. View the username on the Static Accounts page in the ApsaraMQ for RabbitMQ console.
spring.rabbitmq.username=******
# The static password of the instance. View the password on the Static Accounts page in the ApsaraMQ for RabbitMQ console.
spring.rabbitmq.password=******
# The virtual host, which provides logical isolation. View the virtual host on the Vhosts page in the ApsaraMQ for RabbitMQ console.
spring.rabbitmq.virtual-host=test_vhost
# The message acknowledgment (Ack) mode.
# 1. none: After a consumer receives a message, the server considers the message successfully processed, regardless of whether the consumption is successful. This is the autoAck mode in RabbitMQ.
# 2. auto: The client automatically sends an ack after a message is successfully consumed. If the message fails to be processed, the client sends a nack or throws an exception. You do not need to explicitly call Channel.basicAck().
# 3. manual: Manually send an Ack. You must explicitly call Channel.basicAck() after a message is successfully consumed.
spring.rabbitmq.listener.simple.acknowledge-mode=manual
# Set the cache mode to CONNECTION. ApsaraMQ for RabbitMQ uses a distributed architecture. In CONNECTION mode, clients can connect to multiple service nodes in the cluster in a more balanced way. This method can effectively prevent load hot spots and improve the efficiency of message sending and consumption.
spring.rabbitmq.cache.connection.mode=connection
# Adjust the value as needed.
spring.rabbitmq.cache.connection.size=50
# Adjust the value as needed.
spring.rabbitmq.cache.channel.size=1
# The maximum number of unacknowledged (Ack) messages that a consumer can process at a time (QoS). The ApsaraMQ for RabbitMQ server uses min{prefetch, 100} as the QoS value. If the consumer has low processing capabilities, decrease this value.
spring.rabbitmq.listener.simple.prefetch=100
# The minimum number of concurrent consumers for the RabbitMQ listener. Adjust the value as needed.
spring.rabbitmq.listener.simple.concurrency=10
# The maximum number of concurrent consumers for the RabbitMQ listener. When the consumption rate is high enough, the client starts max-concurrency consumers to consume messages.
spring.rabbitmq.listener.simple.max-concurrency=20
You can add the following optional configurations as needed:
Step 2: Use the SDK to send and receive messages
Produce messages
In RabbitMQService, obtain RabbitTemplate through dependency injection and call its send method to send a message.
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
@Service
public class RabbitMQService {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMessage(String exchange, String routingKey, String content) {
// Set the MessageId.
String msgId = UUID.randomUUID().toString();
MessageProperties messageProperties = new MessageProperties();
messageProperties.setMessageId(msgId);
// Create a Message.
Message message = new Message(content.getBytes(StandardCharsets.UTF_8), messageProperties);
/*
* Call the send() interface to send the message.
* exchange: the name of the exchange.
* routingKey: the routing key.
* message: the message content.
* correlationData is used for publisher confirms.
*/
rabbitTemplate.send(exchange, routingKey, message, null);
}
}
Consume messages
Use the @RabbitListener annotation to consume messages:
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Arrays;
@Component
public class MessageListener {
/**
* Receive a message.
* @param message The message.
* @param channel The channel.
* @throws IOException
* Replace queues with the name of the queue that you created.
*/
@RabbitListener(queues = "myQueue")
public void receiveFromMyQueue(Message message, Channel channel) throws IOException {
// Enter the business logic for message consumption.
...
// You must return an Ack within the Ack validity period (consumption timeout). Otherwise, the confirmation is invalid and the message is redelivered.
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
The following are common optional configurations for RabbitListener: