All Products
Search
Document Center

ApsaraMQ for RabbitMQ:Send and receive messages by using SDKs integrated with the Spring framework

Last Updated:Mar 15, 2024

ApsaraMQ for RabbitMQ supports SDKs that are integrated with the Spring framework. This topic describes how to connect to an ApsaraMQ for RabbitMQ instance by using an SDK that is integrated with the Spring framework to send and receive messages.

Prerequisites

Demo projects

Step 1: Add the Java dependency

Create a Java project and add the following dependency to the pom.xml file:

 <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

Step 2: Configure parameters

Modify the configurations in the application.properties file and change the parameter values to the information about the resources that you created in the "Prerequisites" section of this topic.

# The endpoint that is used to connect to the instance. You can obtain the endpoint on the Instance Details page in the ApsaraMQ for RabbitMQ console.  
spring.rabbitmq.host=XXXXXX.amqp.aliyuncs.com
# The port that is used to connect to ApsaraMQ for RabbitMQ. Set this parameter to 5672. 
spring.rabbitmq.port=5672
# The static username of the instance. You can obtain the static username and password on the Static Accounts page in the ApsaraMQ for RabbitMQ console.  
spring.rabbitmq.username=******
# The static password of the instance. You can obtain the static username and password on the Static Accounts page in the ApsaraMQ for RabbitMQ console.  
spring.rabbitmq.password=******
# The vhost of the instance. vhosts are logically isolated from each other. You can obtain the vhost name on the vhosts page in the ApsaraMQ for RabbitMQ console.  
spring.rabbitmq.virtual-host=test_vhost
# The mode that is used to acknowledge messages. In this example, the manual mode is used. 
spring.rabbitmq.listener.simple.acknowledge-mode=manual
# Specify whether to create AmqpAdmin bean. In this example, the AmqpAdmin bean is not created. 
spring.rabbitmq.dynamic=false    

The parameters listed in the following table are optional.

Optional parameters

Parameter

Description

spring.rabbitmq.addresses

The IP addresses of the brokers to which the client is connected. Separate multiple IP addresses with commas (,).

If you configure the spring.rabbitmq.host parameter and the spring.rabbitmq.addresses parameter at the same time, the value of the spring.rabbitmq.addresses parameter takes effect.

spring.rabbitmq.dynamic

Specify whether to create AmqpAdmin bean. Default value: true.

spring.rabbitmq.connection-timeout

The timeout period of connections. Unit: milliseconds. The value 0 specifies that connections never time out.

spring.rabbitmq.requested-heartbeat

The timeout period of heartbeats. Unit: seconds. Default value: 60.

spring.rabbitmq.publisher-confirms

Specify whether to enable the confirmation mechanism for message publishing.

spring.rabbitmq.publisher-returns

Specify whether to enable the return mechanism for message publishing.

spring.rabbitmq.ssl.enabled

Specify whether to enable Secure Sockets Layer (SSL)-based authentication.

spring.rabbitmq.ssl.key-store

The path of the key store in which the SSL certificate is stored.

spring.rabbitmq.ssl.key-store-password

The password that is used to access the key store.

spring.rabbitmq.ssl.trust-store

The trusted IP address of the SSL certificate.

spring.rabbitmq.ssl.trust-store-password

The password that is used to access the trusted IP address of the SSL certificate.

spring.rabbitmq.ssl.algorithm

The algorithm that is used by SSL. Example: TLSv1.2.

spring.rabbitmq.ssl.validate-server-certificate

Specify whether to enable broker certificate authentication.

spring.rabbitmq.ssl.verify-hostname

Specify whether to enable host verification.

spring.rabbitmq.cache.channel.size

The number of channels in the cache.

spring.rabbitmq.cache.channel.checkout-timeout

The timeout period for obtaining a channel from the cache if the specified number of channels in the cache is reached.

Unit: milliseconds. The value 0 specifies that a new channel is created.

spring.rabbitmq.cache.connection.size

The number of connections in the cache. This parameter takes effect only if you set the spring.rabbitmq.cache.connection.mode parameter to CONNECTION.

spring.rabbitmq.cache.connection.mode

The cache mode of connections. Valid values:

  • CHANNEL

  • CONNECTION

We recommend that you set this parameter to CONNECTION.

spring.rabbitmq.listener.type

The type of the listener container. Valid values:

  • simple

  • direct

Default value: simple.

spring.rabbitmq.listener.simple.auto-startup

Specify whether to automatically start the container when the application is started. Default value: true.

spring.rabbitmq.listener.simple.acknowledge-mode

The mode that is used to acknowledge messages. Valid values:

  • none

  • manual

  • auto

Default value: auto.

spring.rabbitmq.listener.simple.concurrency

The minimum number of consumers.

spring.rabbitmq.listener.simple.max-concurrency

The maximum number of consumers.

spring.rabbitmq.listener.simple.prefetch

The maximum number of negative acknowledgment (NACK) messages that a consumer can process. If transactions exist, the parameter value must be equal to or greater than the number of transactions.

spring.rabbitmq.listener.simple.transaction-size

The number of messages processed by transactions.

spring.rabbitmq.listener.simple.default-requeue-rejected

Specify whether rejected messages are redelivered to the queue. Default value: true.

spring.rabbitmq.listener.simple.missing-queues-fatal

Specify whether to fail the container if the queues that are declared by the container are not available on the broker or whether to stop the container if one or more queues are deleted in the runtime. Default value: true.

spring.rabbitmq.listener.simple.idle-event-interval

The interval at which idle container events are published. Unit: milliseconds.

spring.rabbitmq.listener.simple.retry.enabled

Specify whether retries are enabled for the listener container.

spring.rabbitmq.listener.simple.retry.max-attempts

The maximum number of retries for a message if the message fails to be sent. Default value: 3.

spring.rabbitmq.listener.simple.retry.max-interval

The maximum interval between two consecutive retries. Unit: milliseconds. Default value: 10000.

spring.rabbitmq.listener.simple.retry.initial-interval

The interval between the first and second attempts to send a message. Unit: milliseconds. Default value: 1000.

spring.rabbitmq.listener.simple.retry.multiplier

The multiplier that is applied to the previous retry interval.

spring.rabbitmq.listener.simple.retry.stateless

Specify whether retries are stateless. Default value: true.

spring.rabbitmq.template.mandatory

Specify whether to enable mandatory information. Default value: false.

spring.rabbitmq.template.receive-timeout

The timeout period for the receive() operation.

spring.rabbitmq.template.reply-timeout

The timeout period for the sendAndReceive() operation.

spring.rabbitmq.template.retry.enabled

Specify whether to enable retries for sending messages.

spring.rabbitmq.template.retry.max-attempts

The maximum number of retries that are allowed if a message fails to be sent. Default value: 3.

spring.rabbitmq.template.retry.initial-interval

The interval between the first and second attempts to send a message. Unit: milliseconds. Default value: 1000.

spring.rabbitmq.template.retry.multiplier

The multiplier that is applied to the previous retry interval.

spring.rabbitmq.template.retry.max-interval

The maximum interval between two consecutive retries. Unit: milliseconds. Default value: 10000.

Step 3: Use an SDK to send and receive messages.

Create connections

Recommended cache mode

When you establish a connection between a client and a broker, we recommend that you specify the cache mode as CONNECTION.

In this mode, you can create multiple connections. The program caches a specific number of connections and each connection caches a specific number of channels.

ApsaraMQ for RabbitMQ adopts a distributed cluster architecture. In CONNECTION mode, you can create multiple connections to connect clients to multiple service nodes in a cluster to efficiently send and consume messages.

The following code shows how to configure the cache mode:

// Specify the cache mode as CONNECTION. 
connectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CONNECTION);
// The maximum number of connections that can be cached in CONNECTION mode. 
connectionFactory.setConnectionCacheSize(10);
// The maximum number of channels that can be cached in CONNECTION mode. 
connectionFactory.setChannelCacheSize(64);

The following sample code provides an example on how to create connections:

import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {

    @Value("${spring.rabbitmq.host}")
    private String host;

    @Value("${spring.rabbitmq.port}")
    private int port;

    @Value("${spring.rabbitmq.username}")
    private String username;

    @Value("${spring.rabbitmq.password}")
    private String password;

    @Value("${spring.rabbitmq.virtual-host}")
    private String virtualHost;

    @Bean
    public ConnectionFactory connectionFactory() 
    {
        // Initialize ConnectionFactory. 
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost(host);
        connectionFactory.setPort(port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        // The vhost. You can manually create the vhost in the ApsaraMQ for RabbitMQ console or use the following code to automatically create the vhost. 
        connectionFactory.setVirtualHost(virtualHost);
        // Specify whether to enable the automatic reconnection feature. Make sure that this feature is enabled. This way, the client can reconnect to the broker when messages are published on the broker. 
        connectionFactory.getRabbitConnectionFactory().setAutomaticRecoveryEnabled(true);
		    // The cache mode. We recommend that you set this parameter to CONNECTION. 
        connectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CONNECTION);
        // The maximum number of connections that can be cached in CONNECTION mode. 
        connectionFactory.setConnectionCacheSize(10);
        // The maximum number of channels that can be cached in CONNECTION mode. 
        connectionFactory.setChannelCacheSize(64);
        
        return connectionFactory;
    }
    
    @Bean
    @ConditionalOnClass
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(CachingConnectionFactory connectionFactory)
    {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        return factory;
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) 
    {
        // The RabbitMQ message template. The template encapsulates various message operations. 
        return new RabbitTemplate(connectionFactory);     
    }

}

Produce messages

Obtain the RabbitMQ message template by performing dependency injection in the RabbitMQService public class and call the send() operation to send a message.

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.nio.charset.StandardCharsets;
import java.util.UUID;

@Service
public class RabbitMQService {
    
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendMessage(String exchange, String routingKey, String content) {
        // Specify the message ID. 
        String msgId = UUID.randomUUID().toString();
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setMessageId(msgId);
        // Create the message. 
        Message message = new Message(content.getBytes(StandardCharsets.UTF_8), messageProperties);
        /*
         * Call the send() operation to send the message;
         * exchange: the exchange name.
         * routingKey: the routing key.
         * message: the message content.
         * correlationData: confirm the message producer;
         */
        rabbitTemplate.send(exchange, routingKey, message, null);
    }
}

Consume messages

Use the @RabbitListener annotation to consume messages:

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Arrays;

@Component
public class MessageListener {

  	/**
     * Receive a message;
     * @param message the message that is received;
     * Replace queues with the name of the queue that you created.
     */
    @RabbitListener(queues = "myQueue")
    public void receiveFromMyQueue(Message message, Channel channel) {
        // Use the logic for message consumption. 
        ...
        // The acknowledgement (ACK) for a message must be returned within one minute. Otherwise, the broker sends the message again for consumption. 
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}