This topic describes how to use open source RabbitMQ SDK to send and receive messages in the scenario where you use a Resource Access Management (RAM) role to grant permissions across Alibaba Cloud accounts. This topic helps you better understand the complete process of sending and receiving messages. In this topic, SDK for Java is used as an example. The process of sending and receiving messages by using open source RabbitMQ SDK for other programming languages or frameworks is similar to that describe in this topic.

Background information

If you need to use the temporary Security Token Service (STS) token generated for a RAM role to access Message Queue for RabbitMQ, you must set the AccessKeyID, AccessKeySecret, and SecurityToken parameters in the AliyunCredentialsProvider class for permission verification.

Messaging process (Java used as an example)

Process of using open source RabbitMQ SDK to send and receive messages
Note Message Queue for RabbitMQ is compatible with open source RabbitMQ. For more information about open source RabbitMQ SDK for other programming languages, see Table 1.

Obtain endpoints

You must obtain the endpoint of your instance in the Message Queue for RabbitMQ console. When you send or receive messages, you must configure the endpoint on the producer or consumer client. Otherwise, you cannot access the Message Queue for RabbitMQ instance.

  1. Log on to the Message Queue for RabbitMQ console.
  2. In the top navigation bar, select the region where your instance resides.
  3. In the left-side navigation pane, click Instances.
  4. On the Instances page, select your instance. In the Basic Information section, find the endpoint of the required network type and click the endpoint to copy it.
    Type Description Example
    Public endpoint You can access an instance from the Internet to read and write data. By default, pay-as-you-go instances have public endpoints. To use a public endpoint for a subscription instance, you must configure a public endpoint when you create the subscription instance. XXX.mq-amqp.cn-hangzhou-a.aliyuncs.com
    VPC endpoint You can access an instance from a VPC to read and write data. By default, both pay-as-you-go and subscription instances have VPC endpoints. XXX.mq-amqp.cn-hangzhou-a-internal.aliyuncs.com

Add Java dependencies

Add the following dependencies to the pom.xml file:

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.5.0</version> <!-- All versions of open source RabbitMQ are supported. -->
</dependency>
<dependency>
    <groupId>com.alibaba.mq-amqp</groupId>
    <artifactId>mq-amqp-client</artifactId>
    <version>1.0.5</version>
</dependency>

Configure AliyunCredentialsProvider.java

Create a permission verification class AliyunCredentialsProvider.java and set the AccessKeyID, AccessKeySecret, and SecurityToken parameters based on the comments in the code. For more information, see Table 1.

Table 1. Parameters
Parameter Example Description
hostName 1880770****.mq-amqp.cn-hangzhou-a.aliyuncs.com The endpoint of the Message Queue for RabbitMQ instance. For more information about how to obtain an endpoint of an instance, see Obtain endpoints.
Port 5672 The default port. Use port 5672 for non-encrypted connections and port 5671 for encrypted connections.
AccessKeyID LTAI5tJQKnB9zVvQ**** The AccessKey ID of your Alibaba Cloud account or a RAM user within the account. You can log on to the RAM console, create a RAM role, grant the role the AliyunAMQPFullAccess permission, obtain the Alibaba Cloud Resource Name (ARN) of the RAM role, and then call the AssumeRole operation to obtain a temporary identity to assume the role. A successful call to the AssumeRole operation returns values of the AccessKeyID, AccessKeySecret, and SecurityToken of the RAM role. For more information about the ARN of a RAM role, see RAM role overview.
AccessKeySecret jw6MawfKOVBveRr84u**** The AccessKey secret of your Alibaba Cloud account or a RAM user within the account.
SecurityToken CAIS9wF1q6Ft5B2yfSjIr5fkJNPkvK5tgqeScX+ElnkMXvhvgIPO0Dz2IHhNfXZuB************ The STS token of the RAM role.
instanceId amqp-cn-v0h1kb9nu*** The ID of the Message Queue for RabbitMQ instance. In the Message Queue for RabbitMQ console, you can view the instance ID on the Instance Details page. For more information about how to view the ID of an instance, see View instance details.
virtualHost Test The vhost that you created in the Message Queue for RabbitMQ instance. In the Message Queue for RabbitMQ console, you can view the vhost on the vhosts page. For more information about how to view a vhost, see View vhost details.
ExchangeName ExchangeTest The exchange that you created in the Message Queue for RabbitMQ instance. In the Message Queue for RabbitMQ console, you can view the exchange on the Exchanges page.
BindingKey BindingKeyTest The binding key that is used to bind the exchange with a queue in Message Queue for RabbitMQ. In the Message Queue for RabbitMQ console, you can view the bindings of the exchange and obtain the binding key on the Exchanges page.
QueueName QueueTest The queue that you created in the Message Queue for RabbitMQ instance. This parameter is required only when you subscribe to messages. In the Message Queue for RabbitMQ console, you can view the bindings of the exchange and obtain the queues that are bound to the exchange on the Exchanges page.
import com.alibaba.mq.amqp.utils.UserUtils;
import com.rabbitmq.client.impl.CredentialsProvider;
import org.apache.commons.lang3.StringUtils;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
/**
 * The AliyunCredentialsProvider class that is used to generate a pair of dynamic username and password. 
 */
public class AliyunCredentialsProvider implements CredentialsProvider {
    /**
     * Access Key ID.
     */
    private final String accessKeyId;
    /**
     * Access Key Secret.
     */
    private final String accessKeySecret;
    /**
     * security temp token. (optional)
     */
    private final String securityToken;
    /**
     * The ID of the Message Queue for RabbitMQ instance. You can obtain the instance ID in the Message Queue for RabbitMQ console. 
     */
    private final String instanceId;
    public AliyunCredentialsProvider(final String accessKeyId, final String accessKeySecret,
                                     final String instanceId) {
        this(accessKeyId, accessKeySecret, null, instanceId);
    }
    public AliyunCredentialsProvider(final String accessKeyId, final String accessKeySecret,
                                     final String securityToken, final String instanceId) {
        this.accessKeyId = accessKeyId;
        this.accessKeySecret = accessKeySecret;
        this.securityToken = securityToken;
        this.instanceId = instanceId;
    }
    @Override
    public String getUsername() {
        if(StringUtils.isNotEmpty(securityToken)) {
            return UserUtils.getUserName(accessKeyId, instanceId, securityToken);
        } else {
            return UserUtils.getUserName(accessKeyId, instanceId);
        }
    }

    @Override
    public String getPassword() {
        try {
            return UserUtils.getPassord(accessKeySecret);
        } catch (InvalidKeyException e) {
            //todo
        } catch (NoSuchAlgorithmException e) {
            //todo
        }
        return null;
    }
}

Produce a message

Create, compile, and run ProducerTest.java.

Notice Before you compile ProducerTest.java and run it to produce messages, you must set the parameters described in Table 1 based on the comments in the code.
import com.rabbitmq.client.*;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
import java.util.HashMap;
import java.util.UUID;

public class ProducerTest {
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        // Specify the endpoint. You can view the endpoint of your instance on the Instance Details page in the Message Queue for RabbitMQ console. 
        factory.setHost("xxx.xxx.aliyuncs.com");
        // Set ${instanceId} to the ID of your instance. You can view the instance ID on the Instance Details page in the Message Queue for RabbitMQ console. 
        // Set ${AccessKey} to the AccessKey ID that you created in the Alibaba Cloud Management Console for identity authentication. 
        // Set ${SecretKey} to the AccessKey secret that you created in the Alibaba Cloud Management Console for identity authentication. 
        factory.setCredentialsProvider(new AliyunCredentialsProvider("${AccessKeyID}", "${AccessKeySecret}", "${SecurityToken}","${instanceId}"));
        // Enable automatic connection recovery. If you set the value to true, automatic connection recovery is enabled. If you set the value to false, automatic connection recovery is disabled. 
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(5000);
        // Specify the vhost name. Make sure that the vhost has been created in the Message Queue for RabbitMQ console. 
        factory.setVirtualHost("${VhostName}");
        // Specify the default port. Use port 5672 for non-encrypted connections and port 5671 for encrypted connections. 
        factory.setPort(5672);
        // Set a timeout period based on the network environment. 
        factory.setConnectionTimeout(30 * 1000);
        factory.setHandshakeTimeout(30 * 1000);
        factory.setShutdownTimeout(0);
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare("${ExchangeName}", "${ExchangeType}", true, false, false, null);
        channel.queueDeclare("${QueueName}", true, false, false, new HashMap<String, Object>());
        channel.queueBind("${QueueName}", "${ExchangeName}", "${BindingKey}");
        // Send a message. 
        for (int i = 0; i < 100; i++  ) {
            // Set ${ExchangeName} to an exchange that already exists in the Message Queue for RabbitMQ console. Make sure that the type of the exchange is consistent with that in the console. 
            // Set ${BindingKey} to the corresponding binding key based on your business requirements. 
            AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(UUID.randomUUID().toString()).build();
            channel.basicPublish("${ExchangeName}", "${BindingKey}", true, props,
                    ("messageBody"  + i).getBytes(StandardCharsets.UTF_8));
        }
        connection.close();
    }
}

Subscribe to messages

Create, compile, and run ConsumerTest.java to subscribe to messages.

Notice Before you compile ConsumerTest.java and run it to subscribe to messages, you must set the parameters described in Table 1 based on the comments in the code.
import com.rabbitmq.client.*;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.concurrent.TimeoutException;

public class ConsumerTest {
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        // Specify the endpoint. You can view the endpoint of your instance on the Instance Details page in the Message Queue for RabbitMQ console. 
        factory.setHost("xxx.xxx.aliyuncs.com");
        // Set ${instanceId} to the ID of your instance. You can view the instance ID on the Instance Details page in the Message Queue for RabbitMQ console. 
        // Set ${AccessKey} to the AccessKey ID that you created in the Alibaba Cloud Management Console for identity authentication. 
        // Set ${SecretKey} to the AccessKey secret that you created in the Alibaba Cloud Management Console for identity authentication. 
        factory.setCredentialsProvider(new AliyunCredentialsProvider("${AccessKeyID}", "${AccessKeySecret}","${SecurityToken}","${instanceId}"));
        // Enable automatic connection recovery. If you set the value to true, automatic connection recovery is enabled. If you set the value to false, automatic connection recovery is disabled. 
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(5000);
        // Specify the vhost name. Make sure that the vhost has been created in the Message Queue for RabbitMQ console. 
        factory.setVirtualHost("${VhostName}");
        // Specify the default port. Use port 5672 for non-encrypted connections and port 5671 for encrypted connections. 
        factory.setPort(5672);
        factory.setConnectionTimeout(300 * 1000);
        factory.setHandshakeTimeout(300 * 1000);
        factory.setShutdownTimeout(0);
        Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();
        // Set ${ExchangeName} to an exchange that already exists in the Message Queue for RabbitMQ console. Make sure that the type of the exchange is consistent with that in the console. 
        AMQP.Exchange.DeclareOk exchangeDeclareOk = channel.exchangeDeclare("${ExchangeName}", "${ExchangeType}", true, false, false, null);
        // Set ${QueueName} to a queue that already exists in the Message Queue for RabbitMQ console. 
        AMQP.Queue.DeclareOk queueDeclareOk = channel.queueDeclare("${QueueName}", true, false, false, new HashMap<String, Object>());
        // Set BindingKeyTest to the binding key that is used to bind the queue to the exchange. 
        AMQP.Queue.BindOk bindOk = channel.queueBind("${QueueName}", "${ExchangeName}", "BindingKeyTest");
        // Consume a message. 
        channel.basicConsume("${QueueName}", false, "ConsumerTag", new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                // Process the received message based on the business logic. 
                System.out.println("Received: "+ new String(body, StandardCharsets.UTF_8) + ", deliveryTag: "  + envelope.getDeliveryTag()  + ", messageId: " +  properties.getMessageId());
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });
        connection.close();
    }
}

Query messages

If you want to check whether the message is sent to Message Queue for RabbitMQ, you can query the message in the Message Queue for RabbitMQ console. For more information, see Message query.