ApsaraMQ for RabbitMQ supports SDKs that are integrated with the Spring framework. This topic describes how to connect to an ApsaraMQ for RabbitMQ instance by using an SDK that is integrated with the Spring framework to send and receive messages.
Prerequisites
Resources are created in the ApsaraMQ for RabbitMQ console. The resources include an instance, a vhost, an exchange, and a queue. For more information, see Step 2: Create resources.
IntelliJ IDEA is installed. For more information, see IntelliJ IDEA.
JDK 1.8 or later is installed. For more information, see Java Downloads.
Maven 2.5 or later is installed. For more information, see Downloading Apache Maven.
Demo projects
Step 1: Add the Java dependency
Create a Java project and add the following dependency to the pom.xml
file:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
Step 2: Configure parameters
Modify the configurations in the application.properties
file and change the parameter values to the information about the resources that you created in the "Prerequisites" section of this topic.
# The endpoint that is used to connect to the instance. You can obtain the endpoint on the Instance Details page in the ApsaraMQ for RabbitMQ console.
spring.rabbitmq.host=XXXXXX.amqp.aliyuncs.com
# The port that is used to connect to ApsaraMQ for RabbitMQ. Set this parameter to 5672.
spring.rabbitmq.port=5672
# The static username of the instance. You can obtain the static username and password on the Static Accounts page in the ApsaraMQ for RabbitMQ console.
spring.rabbitmq.username=******
# The static password of the instance. You can obtain the static username and password on the Static Accounts page in the ApsaraMQ for RabbitMQ console.
spring.rabbitmq.password=******
# The vhost of the instance. vhosts are logically isolated from each other. You can obtain the vhost name on the vhosts page in the ApsaraMQ for RabbitMQ console.
spring.rabbitmq.virtual-host=test_vhost
# The mode that is used to acknowledge messages. In this example, the manual mode is used.
spring.rabbitmq.listener.simple.acknowledge-mode=manual
# Specify whether to create AmqpAdmin bean. In this example, the AmqpAdmin bean is not created.
spring.rabbitmq.dynamic=false
The parameters listed in the following table are optional.
Step 3: Use an SDK to send and receive messages.
Create connections
Recommended cache mode
When you establish a connection between a client and a broker, we recommend that you specify the cache mode as CONNECTION.
In this mode, you can create multiple connections. The program caches a specific number of connections and each connection caches a specific number of channels.
ApsaraMQ for RabbitMQ adopts a distributed cluster architecture. In CONNECTION mode, you can create multiple connections to connect clients to multiple service nodes in a cluster to efficiently send and consume messages.
The following code shows how to configure the cache mode:
// Specify the cache mode as CONNECTION.
connectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CONNECTION);
// The maximum number of connections that can be cached in CONNECTION mode.
connectionFactory.setConnectionCacheSize(10);
// The maximum number of channels that can be cached in CONNECTION mode.
connectionFactory.setChannelCacheSize(64);
The following sample code provides an example on how to create connections:
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
@Value("${spring.rabbitmq.host}")
private String host;
@Value("${spring.rabbitmq.port}")
private int port;
@Value("${spring.rabbitmq.username}")
private String username;
@Value("${spring.rabbitmq.password}")
private String password;
@Value("${spring.rabbitmq.virtual-host}")
private String virtualHost;
@Bean
public ConnectionFactory connectionFactory()
{
// Initialize ConnectionFactory.
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setPort(port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
// The vhost. You can manually create the vhost in the ApsaraMQ for RabbitMQ console or use the following code to automatically create the vhost.
connectionFactory.setVirtualHost(virtualHost);
// Specify whether to enable the automatic reconnection feature. Make sure that this feature is enabled. This way, the client can reconnect to the broker when messages are published on the broker.
connectionFactory.getRabbitConnectionFactory().setAutomaticRecoveryEnabled(true);
// The cache mode. We recommend that you set this parameter to CONNECTION.
connectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CONNECTION);
// The maximum number of connections that can be cached in CONNECTION mode.
connectionFactory.setConnectionCacheSize(10);
// The maximum number of channels that can be cached in CONNECTION mode.
connectionFactory.setChannelCacheSize(64);
return connectionFactory;
}
@Bean
@ConditionalOnClass
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(CachingConnectionFactory connectionFactory)
{
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return factory;
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory)
{
// The RabbitMQ message template. The template encapsulates various message operations.
return new RabbitTemplate(connectionFactory);
}
}
Produce messages
Obtain the RabbitMQ message template by performing dependency injection in the RabbitMQService public class and call the send() operation to send a message.
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
@Service
public class RabbitMQService {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMessage(String exchange, String routingKey, String content) {
// Specify the message ID.
String msgId = UUID.randomUUID().toString();
MessageProperties messageProperties = new MessageProperties();
messageProperties.setMessageId(msgId);
// Create the message.
Message message = new Message(content.getBytes(StandardCharsets.UTF_8), messageProperties);
/*
* Call the send() operation to send the message;
* exchange: the exchange name.
* routingKey: the routing key.
* message: the message content.
* correlationData: confirm the message producer;
*/
rabbitTemplate.send(exchange, routingKey, message, null);
}
}
Consume messages
Use the @RabbitListener
annotation to consume messages:
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Arrays;
@Component
public class MessageListener {
/**
* Receive a message;
* @param message the message that is received;
* Replace queues with the name of the queue that you created.
*/
@RabbitListener(queues = "myQueue")
public void receiveFromMyQueue(Message message, Channel channel) {
// Use the logic for message consumption.
...
// The acknowledgement (ACK) for a message must be returned within one minute. Otherwise, the broker sends the message again for consumption.
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}