This topic describes how to use open source RabbitMQ SDK to send and receive messages in the scenario where you use a Resource Access Management (RAM) role to grant permissions across Alibaba Cloud accounts. This topic helps you better understand the complete process of sending and receiving messages. In this topic, SDK for Java is used as an example. The process of sending and receiving messages by using open source RabbitMQ SDK for other programming languages or frameworks is similar to that describe in this topic.

Prerequisites

Background information

If you need to use the temporary Security Token Service (STS) token generated for a RAM role to access ApsaraMQ for RabbitMQ, you must set the AccessKeyID, AccessKeySecret, and SecurityToken parameters in the AliyunCredentialsProvider class for permission verification.

Messaging process (Java used as an example)

Process of using open source RabbitMQ SDK to send and receive messages
Note ApsaraMQ for RabbitMQ is compatible with open source RabbitMQ. For more information about open source RabbitMQ SDK for other programming languages, see Programming languages and frameworks supported by open source RabbitMQ SDK over AMQP.

Obtain an endpoint

You must obtain an endpoint of your instance in the ApsaraMQ for RabbitMQ console. Before you send and receive messages, you must specify the endpoint in the code of the producer and consumer. Then, the producer and consumer can access the ApsaraMQ for RabbitMQ instance by using the endpoint.

  1. Log on to the ApsaraMQ for RabbitMQ console. In the left-side navigation pane, select Instances.
  2. In the top navigation bar of the Instances page, select a region. In the instance list, click the name of the instance that you want to manage.
  3. On the Endpoint Information tab of the Instance Details page, move the pointer over the type of endpoint that you want to use. Then, click the Copy icon on the right side of the endpoint to copy the endpoint.
    TypeDescriptionExample
    Public endpointYou can access an instance over the Internet to read and write data. By default, pay-as-you-go instances support public endpoints. To use a public endpoint for a subscription instance, you must select a public endpoint when you create the subscription instance. XXX.mq-amqp.cn-hangzhou-a.aliyuncs.com
    VPC endpointYou can access an instance over a virtual private cloud (VPC) to read and write data. By default, pay-as-you-go and subscription instances support VPC endpoints. XXX.mq-amqp.cn-hangzhou-a-internal.aliyuncs.com

Add Java dependencies

Add the following dependencies to the pom.xml file:

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.5.0</version> <!-- All versions of open source RabbitMQ are supported. -->
</dependency>
<dependency>
    <groupId>com.alibaba.mq-amqp</groupId>
    <artifactId>mq-amqp-client</artifactId>
    <version>1.0.5</version>
</dependency>

Configure AliyunCredentialsProvider.java

Create a permission verification class AliyunCredentialsProvider.java and set the AccessKeyID, AccessKeySecret, and SecurityToken parameters based on the comments in the code. For more information, see Parameters.

Table 1. Parameters
ParameterExampleDescription
hostName1880770****.mq-amqp.cn-hangzhou-a.aliyuncs.comThe endpoint of the ApsaraMQ for RabbitMQ instance. For more information about how to obtain an endpoint of an instance, see Obtain an endpoint.
Port5672The default port. Use port 5672 for non-encrypted connections and port 5671 for encrypted connections.
AccessKeyIDLTAI5tJQKnB9zVvQ****The AccessKey ID of your Alibaba Cloud account or a RAM user within the account. You can log on to the RAM console, create a RAM role, grant the role the AliyunAMQPFullAccess permission, obtain the Alibaba Cloud Resource Name (ARN) of the RAM role, and then call the AssumeRole operation to obtain a temporary identity to assume the role. A successful call to the AssumeRole operation returns values of the AccessKeyID, AccessKeySecret, and SecurityToken of the RAM role. For more information about the ARN of a RAM role, see RAM role overview.
AccessKeySecretjw6MawfKOVBveRr84u****The AccessKey secret of your Alibaba Cloud account or a RAM user within the account.
SecurityTokenCAIS9wF1q6Ft5B2yfSjIr5fkJNPkvK5tgqeScX+ElnkMXvhvgIPO0Dz2IHhNfXZuB************The STS token of the RAM role.
instanceIdamqp-cn-v0h1kb9nu***The ID of the ApsaraMQ for RabbitMQ instance. In the ApsaraMQ for RabbitMQ console, you can view the instance ID on the Instance Details page. For more information about how to view the ID of an instance, see View instance details.
virtualHostTestThe vhost that you created in the ApsaraMQ for RabbitMQ instance. In the ApsaraMQ for RabbitMQ console, you can view the vhost on the vhosts page. For more information about how to view a vhost, see View vhost details.
ExchangeNameExchangeTestThe exchange that you created in the ApsaraMQ for RabbitMQ instance. In the ApsaraMQ for RabbitMQ console, you can view the exchange on the Exchanges page.
BindingKeyBindingKeyTestThe binding key that is used to bind the exchange with a queue in ApsaraMQ for RabbitMQ. In the ApsaraMQ for RabbitMQ console, you can view the bindings of the exchange and obtain the binding key on the Exchanges page.
QueueNameQueueTestThe queue that you created in the ApsaraMQ for RabbitMQ instance. This parameter is required only when you subscribe to messages. In the ApsaraMQ for RabbitMQ console, you can view the bindings of the exchange and obtain the queues that are bound to the exchange on the Exchanges page.
import com.alibaba.mq.amqp.utils.UserUtils;
import com.rabbitmq.client.impl.CredentialsProvider;
import org.apache.commons.lang3.StringUtils;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
/**
 * The AliyunCredentialsProvider class that is used to generate a pair of dynamic username and password. 
 */
public class AliyunCredentialsProvider implements CredentialsProvider {
    /**
     * Access Key ID.
     */
    private final String accessKeyId;
    /**
     * Access Key Secret.
     */
    private final String accessKeySecret;
    /**
     * security temp token. (optional)
     */
    private final String securityToken;
    /**
     * The ID of the ApsaraMQ for RabbitMQ instance. You can obtain the instance ID in the ApsaraMQ for RabbitMQ console. 
     */
    private final String instanceId;
    public AliyunCredentialsProvider(final String accessKeyId, final String accessKeySecret,
                                     final String instanceId) {
        this(accessKeyId, accessKeySecret, null, instanceId);
    }
    public AliyunCredentialsProvider(final String accessKeyId, final String accessKeySecret,
                                     final String securityToken, final String instanceId) {
        this.accessKeyId = accessKeyId;
        this.accessKeySecret = accessKeySecret;
        this.securityToken = securityToken;
        this.instanceId = instanceId;
    }
    @Override
    public String getUsername() {
        if(StringUtils.isNotEmpty(securityToken)) {
            return UserUtils.getUserName(accessKeyId, instanceId, securityToken);
        } else {
            return UserUtils.getUserName(accessKeyId, instanceId);
        }
    }

    @Override
    public String getPassword() {
        try {
            return UserUtils.getPassord(accessKeySecret);
        } catch (InvalidKeyException e) {
            //todo
        } catch (NoSuchAlgorithmException e) {
            //todo
        }
        return null;
    }
}

Produce a message

Create, compile, and run ProducerTest.java.

Important Before you compile ProducerTest.java and run it to produce messages, you must set the parameters described in Parameters based on the comments in the code.
import com.rabbitmq.client.*;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
import java.util.HashMap;
import java.util.UUID;

public class ProducerTest {
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        // Specify the endpoint. You can view the endpoint of your instance on the Instance Details page in the ApsaraMQ for RabbitMQ console. 
        factory.setHost("xxx.xxx.aliyuncs.com");
        // Set ${instanceId} to the ID of your instance. You can view the instance ID on the Instance Details page in the ApsaraMQ for RabbitMQ console. 
        // Set ${AccessKey} to the AccessKey ID that you created in the Alibaba Cloud Management Console for identity authentication. 
        // Set ${SecretKey} to the AccessKey secret that you created in the Alibaba Cloud Management Console for identity authentication. 
        factory.setCredentialsProvider(new AliyunCredentialsProvider("${AccessKeyID}", "${AccessKeySecret}", "${SecurityToken}","${instanceId}"));
        // Enable automatic connection recovery. If you set the value to true, automatic connection recovery is enabled. If you set the value to false, automatic connection recovery is disabled. 
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(5000);
        // Specify the vhost name. Make sure that the vhost has been created in the ApsaraMQ for RabbitMQ console. 
        factory.setVirtualHost("${VhostName}");
        // Specify the default port. Use port 5672 for non-encrypted connections and port 5671 for encrypted connections. 
        factory.setPort(5672);
        // Set a timeout period based on the network environment. 
        factory.setConnectionTimeout(30 * 1000);
        factory.setHandshakeTimeout(30 * 1000);
        factory.setShutdownTimeout(0);
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare("${ExchangeName}", "${ExchangeType}", true, false, false, null);
        channel.queueDeclare("${QueueName}", true, false, false, new HashMap<String, Object>());
        channel.queueBind("${QueueName}", "${ExchangeName}", "${BindingKey}");
        // Send a message. 
        for (int i = 0; i < 100; i++  ) {
            // Set ${ExchangeName} to an exchange that already exists in the ApsaraMQ for RabbitMQ console. Make sure that the type of the exchange is consistent with that in the console. 
            // Set ${BindingKey} to the corresponding binding key based on your business requirements. 
            AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(UUID.randomUUID().toString()).build();
            channel.basicPublish("${ExchangeName}", "${BindingKey}", true, props,
                    ("messageBody"  + i).getBytes(StandardCharsets.UTF_8));
        }
        connection.close();
    }
}

Subscribe to messages

Create, compile, and run ConsumerTest.java to subscribe to messages.

Important Before you compile ConsumerTest.java and run it to subscribe to messages, you must set the parameters described in Parameters based on the comments in the code.
import com.rabbitmq.client.*;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.concurrent.TimeoutException;

public class ConsumerTest {
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        // Specify the endpoint. You can view the endpoint of your instance on the Instance Details page in the ApsaraMQ for RabbitMQ console. 
        factory.setHost("xxx.xxx.aliyuncs.com");
        // Set ${instanceId} to the ID of your instance. You can view the instance ID on the Instance Details page in the ApsaraMQ for RabbitMQ console. 
        // Set ${AccessKey} to the AccessKey ID that you created in the Alibaba Cloud Management Console for identity authentication. 
        // Set ${SecretKey} to the AccessKey secret that you created in the Alibaba Cloud Management Console for identity authentication. 
        factory.setCredentialsProvider(new AliyunCredentialsProvider("${AccessKeyID}", "${AccessKeySecret}","${SecurityToken}","${instanceId}"));
        // Enable automatic connection recovery. If you set the value to true, automatic connection recovery is enabled. If you set the value to false, automatic connection recovery is disabled. 
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(5000);
        // Specify the vhost name. Make sure that the vhost has been created in the ApsaraMQ for RabbitMQ console. 
        factory.setVirtualHost("${VhostName}");
        // Specify the default port. Use port 5672 for non-encrypted connections and port 5671 for encrypted connections. 
        factory.setPort(5672);
        factory.setConnectionTimeout(300 * 1000);
        factory.setHandshakeTimeout(300 * 1000);
        factory.setShutdownTimeout(0);
        Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();
        // Set ${ExchangeName} to an exchange that already exists in the ApsaraMQ for RabbitMQ console. Make sure that the type of the exchange is consistent with that in the console. 
        AMQP.Exchange.DeclareOk exchangeDeclareOk = channel.exchangeDeclare("${ExchangeName}", "${ExchangeType}", true, false, false, null);
        // Set ${QueueName} to a queue that already exists in the ApsaraMQ for RabbitMQ console. 
        AMQP.Queue.DeclareOk queueDeclareOk = channel.queueDeclare("${QueueName}", true, false, false, new HashMap<String, Object>());
        // Set BindingKeyTest to the binding key that is used to bind the queue to the exchange. 
        AMQP.Queue.BindOk bindOk = channel.queueBind("${QueueName}", "${ExchangeName}", "BindingKeyTest");
        // Consume a message. 
        channel.basicConsume("${QueueName}", false, "ConsumerTag", new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                // Process the received message based on the business logic. 
                System.out.println("Received: "+ new String(body, StandardCharsets.UTF_8) + ", deliveryTag: "  + envelope.getDeliveryTag()  + ", messageId: " +  properties.getMessageId());
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });
    }
}

Query messages

If you want to check whether the message is sent to ApsaraMQ for RabbitMQ, you can query the message in the ApsaraMQ for RabbitMQ console. For more information, see Query Messages.