This article shows you how to use Message Queue for RabbitMQ SDK for Java together with RabbitMQ SDK for Java to send and receive messages.

Background information

When you access Message Queue for RabbitMQ from a client, Message Queue for RabbitMQ authenticates your permissions based on your username and password. Message Queue for RabbitMQ allows you to generate usernames and passwords for your client by using the following methods:
  • Dynamic username and password: Use a permission authentication class provided by Alibaba Cloud to generate dynamic usernames and passwords.
  • Static username and password (recommended): Generate static usernames and passwords in the Message Queue for RabbitMQ console. This method is the same as that of open source RabbitMQ.
Notice When you call an SDK to send or receive messages on a client, we recommend that you use persistent connections. This way, you do not need to create connections every time you send or receive messages. Frequent creation of connections consumes a large number of network and server resources and may even trigger protection against SYN flood attacks. For more information, see Connection.

This article takes Message Queue for RabbitMQ SDK for Java as an example. For information about SDKs for other programming languages or frameworks, see Overview.

Process to send and receive messages

mass

Obtain endpoints

You must obtain the endpoint of your instance in the Message Queue for RabbitMQ console. When you send or receive messages, you must configure the endpoint on the producer or consumer client. Otherwise, you cannot access the Message Queue for RabbitMQ instance.

  1. Log on to the Message Queue for RabbitMQ console.
  2. In the top navigation bar, select the region where your instance resides.
  3. In the left-side navigation pane, click Instances.
  4. On the Instances page, select your instance. In the Basic Information section, find the endpoint of the required network type and click the endpoint to copy it.
    Type Description Example
    Public endpoint You can access an instance from the Internet to read and write data. By default, pay-as-you-go instances have public endpoints. To use a public endpoint for a subscription instance, you must configure a public endpoint when you create the subscription instance. XXX.mq-amqp.cn-hangzhou-a.aliyuncs.com
    VPC endpoint You can access an instance from a VPC to read and write data. By default, both pay-as-you-go and subscription instances have VPC endpoints. XXX.mq-amqp.cn-hangzhou-a-internal.aliyuncs.com

Add Java dependencies

Add the following dependencies to the pom.xml file:

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.5.0</version>
</dependency>
<dependency>
    <groupId>com.alibaba.mq-amqp</groupId>
    <artifactId>mq-amqp-client</artifactId>
    <version>1.0.5</version>
</dependency>
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.5.0</version>
</dependency>

Generate a username and its password

Create the AliyunCredentialsProvider.java file that is used to generate a dynamic username and its password.

import com.alibaba.mq.amqp.utils.UserUtils;
import com.rabbitmq.client.impl.CredentialsProvider;
import org.apache.commons.lang3.StringUtils;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
/**
 * The AliyunCredentialsProvider class that is used to generate a dynamic username and its password. 
 */
public class AliyunCredentialsProvider implements CredentialsProvider {
    /**
     * Access Key ID.
     */
    private final String accessKeyId;
    /**
     * Access Key Secret.
     */
    private final String accessKeySecret;
    /**
     * security temp token. (optional)
     */
    private final String securityToken;
    /**
     * The ID of the instance. You can obtain the instance ID from the Message Queue for RabbitMQ console. 
     */
    private final String instanceId;
    public AliyunCredentialsProvider(final String accessKeyId, final String accessKeySecret,
                                     final String instanceId) {
        this(accessKeyId, accessKeySecret, null, instanceId);
    }
    public AliyunCredentialsProvider(final String accessKeyId, final String accessKeySecret,
                                     final String securityToken, final String instanceId) {
        this.accessKeyId = accessKeyId;
        this.accessKeySecret = accessKeySecret;
        this.securityToken = securityToken;
        this.instanceId = instanceId;
    }
    @Override
    public String getUsername() {
        if(StringUtils.isNotEmpty(securityToken)) {
            return UserUtils.getUserName(accessKeyId, instanceId, securityToken);
        } else {
            return UserUtils.getUserName(accessKeyId, instanceId);
        }
    }

    @Override
    public String getPassword() {
        try {
            return UserUtils.getPassord(accessKeySecret);
        } catch (InvalidKeyException e) {
            //todo
        } catch (NoSuchAlgorithmException e) {
            //todo
        }
        return null;
    }
}
For information about how to create a static username and its password, see Create a static username and its password.

Add a binding

Create, compile, and run the BindingKeyTest.java file.

import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.HashMap;
import java.util.concurrent.TimeoutException;

 public class BindingKeyTest {
     public static void main(String[] args) throws IOException, TimeoutException {
         ConnectionFactory factory = new ConnectionFactory();
         // Set the endpoint that is used to connect to the Message Queue for RabbitMQ instance. You can view the endpoint on the Instances page in the Message Queue for RabbitMQ console. 
         factory.setHost("xxx.xxx.aliyuncs.com");
         // ${instanceId}: the ID of the Message Queue for RabbitMQ instance. You can view the instance ID on the Instances page in the Message Queue for RabbitMQ console. 
         // ${AccessKey}: the AccessKey ID for identity authentication. The AccessKey ID is generated in the Alibaba Cloud Management Console. 
         // ${SecretKey}: the AccessKey secret for identity authentication. The AccessKey secret is generated in the Alibaba Cloud Management Console. 
         factory.setCredentialsProvider(new AliyunCredentialsProvider("${AccessKey}", "${SecretKey}", "${instanceId}"));
 	// This is required for automatic recovery. 
         factory.setAutomaticRecoveryEnabled(true);
         factory.setNetworkRecoveryInterval(5000);
         // Set the name of the vhost. Make sure that a vhost is created in the Message Queue for RabbitMQ console. 
         factory.setVirtualHost("${VhostName}");
         // Set the default port. Port 5672 is used for non-encrypted connections. Port 5671 is used for encrypted connections. 
         factory.setPort(5672);
         factory.setConnectionTimeout(300 * 1000);
         factory.setHandshakeTimeout(300 * 1000);
         factory.setShutdownTimeout(0);
         Connection connection = factory.newConnection();
         Channel channel = connection.createChannel();
         // Specify the exchange. 
         // You can create an exchange in the Message Queue for RabbitMQ console or by calling the Message Queue for RabbitMQ API. If the exchange already exists in the console, specify the exchange type based on the settings in the console. 
         channel.exchangeDeclare("${ExchangeName}", "${ExchangeType}", true);
         // Specify the queue. You can create a queue in the Message Queue for RabbitMQ console or by calling the Message Queue for RabbitMQ API. 
         channel.queueDeclare("${QueueName}", true, false, false, new HashMap<String, Object>());
         // Create a binding between the queue and the exchange. Specify a binding key. 
         channel.queueBind("${QueueName}", "${ExchangeName}", "${BindingKeyTest}");
         connection.close();
     }
 }
 
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.HashMap;
import java.util.concurrent.TimeoutException;

 public class BindingKeyTest {
     public static void main(String[] args) throws IOException, TimeoutException {
         ConnectionFactory factory = new ConnectionFactory();
         // Set the endpoint that is used to connect to the Message Queue for RabbitMQ instance. You can view the endpoint on the Instances page in the Message Queue for RabbitMQ console. 
         factory.setHost("xxx.xxx.aliyuncs.com");
         // The static username. You can view the username on the Username/Password Management page in the Message Queue for RabbitMQ console. 
         factory.setUsername("${Username}");
         // The static password. You can view the password on the Username/Password Management page in the Message Queue for RabbitMQ console. 
         factory.setPassword("${Password}");
 	// This is required for automatic recovery. 
         factory.setAutomaticRecoveryEnabled(true);
         factory.setNetworkRecoveryInterval(5000);
         // Set the name of the vhost. Make sure that a vhost is created in the Message Queue for RabbitMQ console. 
         factory.setVirtualHost("${VhostName}");
         // Set the default port. Port 5672 is used for non-encrypted connections. Port 5671 is used for encrypted connections. 
         factory.setPort(5672);
         factory.setConnectionTimeout(300 * 1000);
         factory.setHandshakeTimeout(300 * 1000);
         factory.setShutdownTimeout(0);
         Connection connection = factory.newConnection();
         Channel channel = connection.createChannel();
         // Specify the exchange. 
         // You can create an exchange in the Message Queue for RabbitMQ console or by calling the Message Queue for RabbitMQ API. If the exchange already exists in the console, specify the exchange type based on the settings in the console. 
         channel.exchangeDeclare("${ExchangeName}", "${ExchangeType}", true);
         // Specify the queue. You can create a queue in the Message Queue for RabbitMQ console or by calling the Message Queue for RabbitMQ API. 
         channel.queueDeclare("${QueueName}", true, false, false, new HashMap<String, Object>());
         // Create a binding between the queue and the exchange. Specify a binding key. 
         channel.queueBind("${QueueName}", "${ExchangeName}", "${BindingKeyTest}");
         connection.close();
     }
 }
 

Publish a message

Create, compile, and run the ProducerTest.java file.

import com.rabbitmq.client.*;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
import java.util.UUID;

public class ProducerTest {
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        // Set the endpoint that is used to connect to the Message Queue for RabbitMQ instance. You can view the endpoint on the Instances page in the Message Queue for RabbitMQ console. 
        factory.setHost("xxx.xxx.aliyuncs.com");
        // ${instanceId}: the ID of the Message Queue for RabbitMQ instance. You can view the instance ID on the Instances page in the Message Queue for RabbitMQ console. 
        // ${AccessKey}: the AccessKey ID for identity authentication. The AccessKey ID is generated in the Alibaba Cloud Management Console. 
        // ${SecretKey}: the AccessKey secret for identity authentication. The AccessKey secret is generated in the Alibaba Cloud Management Console. 
        factory.setCredentialsProvider(new AliyunCredentialsProvider("${AccessKey}", "${SecretKey}", "${instanceId}"));
        // This is required for automatic recovery. 
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(5000);
        // Set the name of the vhost. Make sure that a vhost is created in the Message Queue for RabbitMQ console. 
        factory.setVirtualHost("${VhostName}");
        // Set the default port. Port 5672 is used for non-encrypted connections. Port 5671 is used for encrypted connections. 
        factory.setPort(5672);
        // Set a timeout period based on the network environment. 
        factory.setConnectionTimeout(30 * 1000);
        factory.setHandshakeTimeout(30 * 1000);
        factory.setShutdownTimeout(0);
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        // Send a message. 
        for (int i = 0; i < 100; i++  ) {
            // Specify the exchange. Make sure that the exchange exists in the Message Queue for RabbitMQ console. Specify the exchange type based on the settings in the console. 
            // Specify the binding key based on the business requirements. 
            AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(UUID.randomUUID().toString()).build();
            channel.basicPublish("${ExchangeName}", "BindingKey", true, props,
                    ("Message body"  + i).getBytes(StandardCharsets.UTF_8));
        }
        connection.close();
    }
}
import com.rabbitmq.client.*;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
import java.util.UUID;

public class ProducerTest {
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        // Set the endpoint that is used to connect to the Message Queue for RabbitMQ instance. You can view the endpoint on the Instances page in the Message Queue for RabbitMQ console. 
        factory.setHost("xxx.xxx.aliyuncs.com");
        // The static username. You can view the username on the Username/Password Management page in the Message Queue for RabbitMQ console. 
        factory.setUsername("${UserName}");
        // The static password. You can view the password on the Username/Password Management page in the Message Queue for RabbitMQ console. 
        factory.setPassword("${PassWord}");
        // This is required for automatic recovery. 
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(5000);
        // Set the name of the vhost. Make sure that a vhost is created in the Message Queue for RabbitMQ console. 
        factory.setVirtualHost("${VhostName}");
        // Set the default port. Port 5672 is used for non-encrypted connections. Port 5671 is used for encrypted connections. 
        factory.setPort(5672);
        // Set a timeout period based on the network environment. 
        factory.setConnectionTimeout(30 * 1000);
        factory.setHandshakeTimeout(30 * 1000);
        factory.setShutdownTimeout(0);
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        // Send a message. 
        for (int i = 0; i < 100; i++  ) {
            // Specify the exchange. Make sure that the exchange exists in the Message Queue for RabbitMQ console. Specify the exchange type based on the settings in the console. 
            // Specify the binding key based on the business requirements. 
            AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(UUID.randomUUID().toString()).build();
            channel.basicPublish("${ExchangeName}", "BindingKey", true, props,
                    ("Message body"  + i).getBytes(StandardCharsets.UTF_8));
        }
        connection.close();
    }
}

Consume a message

Create, compile, and run the ConsumerTest.java file.

import com.rabbitmq.client.*;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.concurrent.TimeoutException;

public class ConsumerTest {
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        // Set the endpoint that is used to connect to the Message Queue for RabbitMQ instance. You can view the endpoint on the Instances page in the Message Queue for RabbitMQ console. 
        factory.setHost("xxx.xxx.aliyuncs.com");
        // ${instanceId}: the ID of the Message Queue for RabbitMQ instance. You can view the instance ID on the Instances page in the Message Queue for RabbitMQ console. 
        // ${AccessKey}: the AccessKey ID for identity authentication. The AccessKey ID is generated in the Alibaba Cloud Management Console. 
        // ${SecretKey}: the AccessKey secret for identity authentication. The AccessKey secret is generated in the Alibaba Cloud Management Console. 
        factory.setCredentialsProvider(new AliyunCredentialsProvider("${AccessKey}", "${SecretKey}", "${instanceId}"));
        // This is required for automatic recovery. 
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(5000);
        // Set the name of the vhost. Make sure that a vhost is created in the Message Queue for RabbitMQ console. 
        factory.setVirtualHost("${VhostName}");
        // Set the default port. Port 5672 is used for non-encrypted connections. Port 5671 is used for encrypted connections. 
        factory.setPort(5672);
        factory.setConnectionTimeout(300 * 1000);
        factory.setHandshakeTimeout(300 * 1000);
        factory.setShutdownTimeout(0);
        Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();
        // Specify the exchange. Make sure that the exchange exists in the Message Queue for RabbitMQ console. Specify the exchange type based on the settings in the console. 
        AMQP.Exchange.DeclareOk exchangeDeclareOk = channel.exchangeDeclare("${ExchangeName}", "${ExchangeType}", true, false, false, null);
        // Specify the queue. Make sure that the queue exists in the Message Queue for RabbitMQ console. 
        AMQP.Queue.DeclareOk queueDeclareOk = channel.queueDeclare("${QueueName}", true, false, false, new HashMap<String, Object>());
        // Create a binding between the queue and the exchange. Specify a binding key. 
        AMQP.Queue.BindOk bindOk = channel.queueBind("${QueueName}", "${ExchangeName}", "BindingKeyTest");
        // Consume a message. 
        channel.basicConsume("${QueueName}", false, "ConsumerTag", new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                // Process the received message based on the business logic. 
                System.out.println("Received: "  + new String(body, StandardCharsets.UTF_8) +  ", deliveryTag: "  + envelope.getDeliveryTag()  + ", messageId: " +  properties.getMessageId());
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });
        connection.close();
    }
}
import com.rabbitmq.client.*;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.concurrent.TimeoutException;

public class ConsumerTest {
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        // Set the endpoint that is used to connect to the Message Queue for RabbitMQ instance. You can view the endpoint on the Instances page in the Message Queue for RabbitMQ console. 
        factory.setHost("xxx.xxx.aliyuncs.com");
        // The static username. You can view the username on the Username/Password Management page in the Message Queue for RabbitMQ console. 
        factory.setUsername("${Username}");
        // The static password. You can view the password on the Username/Password Management page in the Message Queue for RabbitMQ console. 
        factory.setPassword("${Password}");
        // This is required for automatic recovery. 
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(5000);
        // Set the name of the vhost. Make sure that a vhost is created in the Message Queue for RabbitMQ console. 
        factory.setVirtualHost("${VhostName}");
        // Set the default port. Port 5672 is used for non-encrypted connections. Port 5671 is used for encrypted connections. 
        factory.setPort(5672);
        factory.setConnectionTimeout(300 * 1000);
        factory.setHandshakeTimeout(300 * 1000);
        factory.setShutdownTimeout(0);
        Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();
        // Specify the exchange. Make sure that the exchange exists in the Message Queue for RabbitMQ console. Specify the exchange type based on the settings in the console. 
        AMQP.Exchange.DeclareOk exchangeDeclareOk = channel.exchangeDeclare("${ExchangeName}", "${ExchangeType}", true, false, false, null);
        // Specify the queue. Make sure that the queue exists in the Message Queue for RabbitMQ console. 
        AMQP.Queue.DeclareOk queueDeclareOk = channel.queueDeclare("${QueueName}", true, false, false, new HashMap<String, Object>());
        // Create a binding between the queue and the exchange. Specify a binding key. 
        AMQP.Queue.BindOk bindOk = channel.queueBind("${QueueName}", "${ExchangeName}", "BindingKeyTest");
        // Consume a message. 
        channel.basicConsume("${QueueName}", false, "ConsumerTag", new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                // Process the received message based on the business logic. 
                System.out.println("Received: "  + new String(body, StandardCharsets.UTF_8) +  ", deliveryTag: "  + envelope.getDeliveryTag()  + ", messageId: " +  properties.getMessageId());
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });
        connection.close();
    }
}