This topic describes how to use open source RabbitMQ SDK to send and receive messages in the scenario where you use a Resource Access Management (RAM) role to grant permissions across Alibaba Cloud accounts. This topic helps you better understand the complete process of sending and receiving messages. In this topic, SDK for Java is used as an example. The process of sending and receiving messages by using open source RabbitMQ SDK for other programming languages or frameworks is similar to that describe in this topic.
Prerequisites
- ApsaraMQ for RabbitMQ is activated. For more information, see Activate ApsaraMQ for RabbitMQ.
- An instance is created. For more information, see Manage instances.
- A vhost is created. For more information, see Manage vhosts.
Background information
If you need to use the temporary Security Token Service (STS) token generated for a RAM role to access ApsaraMQ for RabbitMQ, you must set the AccessKeyID, AccessKeySecret, and SecurityToken parameters in the AliyunCredentialsProvider class for permission verification.
Messaging process (Java used as an example)

Obtain an endpoint
You must obtain an endpoint of your instance in the ApsaraMQ for RabbitMQ console. Before you send and receive messages, you must specify the endpoint in the code of the producer and consumer. Then, the producer and consumer can access the ApsaraMQ for RabbitMQ instance by using the endpoint.
- Log on to the ApsaraMQ for RabbitMQ console. In the left-side navigation pane, select Instances.
- In the top navigation bar of the Instances page, select a region. In the instance list, click the name of the instance that you want to manage.
- On the Endpoint Information tab of the Instance Details page, move the pointer over the type of endpoint that you want to use. Then, click the
icon on the right side of the endpoint to copy the endpoint.
Type Description Example Public endpoint You can access an instance over the Internet to read and write data. By default, pay-as-you-go instances support public endpoints. To use a public endpoint for a subscription instance, you must select a public endpoint when you create the subscription instance. XXX.mq-amqp.cn-hangzhou-a.aliyuncs.com VPC endpoint You can access an instance over a virtual private cloud (VPC) to read and write data. By default, pay-as-you-go and subscription instances support VPC endpoints. XXX.mq-amqp.cn-hangzhou-a-internal.aliyuncs.com
Add Java dependencies
Add the following dependencies to the pom.xml file:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.5.0</version> <!-- All versions of open source RabbitMQ are supported. -->
</dependency>
<dependency>
<groupId>com.alibaba.mq-amqp</groupId>
<artifactId>mq-amqp-client</artifactId>
<version>1.0.5</version>
</dependency>
Configure AliyunCredentialsProvider.java
Create a permission verification class AliyunCredentialsProvider.java and set the AccessKeyID, AccessKeySecret, and SecurityToken parameters based on the comments in the code. For more information, see Parameters.
Parameter | Example | Description |
---|---|---|
hostName | 1880770****.mq-amqp.cn-hangzhou-a.aliyuncs.com | The endpoint of the ApsaraMQ for RabbitMQ instance. For more information about how to obtain an endpoint of an instance, see Obtain an endpoint. |
Port | 5672 | The default port. Use port 5672 for non-encrypted connections and port 5671 for encrypted connections. |
AccessKeyID | LTAI5tJQKnB9zVvQ**** | The AccessKey ID of your Alibaba Cloud account or a RAM user within the account. You can log on to the RAM console, create a RAM role, grant the role the AliyunAMQPFullAccess permission, obtain the Alibaba Cloud Resource Name (ARN) of the RAM role, and then call the AssumeRole operation to obtain a temporary identity to assume the role. A successful call to the AssumeRole operation returns values of the AccessKeyID, AccessKeySecret, and SecurityToken of the RAM role. For more information about the ARN of a RAM role, see RAM role overview. |
AccessKeySecret | jw6MawfKOVBveRr84u**** | The AccessKey secret of your Alibaba Cloud account or a RAM user within the account. |
SecurityToken | CAIS9wF1q6Ft5B2yfSjIr5fkJNPkvK5tgqeScX+ElnkMXvhvgIPO0Dz2IHhNfXZuB************ | The STS token of the RAM role. |
instanceId | amqp-cn-v0h1kb9nu*** | The ID of the ApsaraMQ for RabbitMQ instance. In the ApsaraMQ for RabbitMQ console, you can view the instance ID on the Instance Details page. For more information about how to view the ID of an instance, see View instance details. |
virtualHost | Test | The vhost that you created in the ApsaraMQ for RabbitMQ instance. In the ApsaraMQ for RabbitMQ console, you can view the vhost on the vhosts page. For more information about how to view a vhost, see View vhost details. |
ExchangeName | ExchangeTest | The exchange that you created in the ApsaraMQ for RabbitMQ instance. In the ApsaraMQ for RabbitMQ console, you can view the exchange on the Exchanges page. |
BindingKey | BindingKeyTest | The binding key that is used to bind the exchange with a queue in ApsaraMQ for RabbitMQ. In the ApsaraMQ for RabbitMQ console, you can view the bindings of the exchange and obtain the binding key on the Exchanges page. |
QueueName | QueueTest | The queue that you created in the ApsaraMQ for RabbitMQ instance. This parameter is required only when you subscribe to messages. In the ApsaraMQ for RabbitMQ console, you can view the bindings of the exchange and obtain the queues that are bound to the exchange on the Exchanges page. |
import com.alibaba.mq.amqp.utils.UserUtils;
import com.rabbitmq.client.impl.CredentialsProvider;
import org.apache.commons.lang3.StringUtils;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
/**
* The AliyunCredentialsProvider class that is used to generate a pair of dynamic username and password.
*/
public class AliyunCredentialsProvider implements CredentialsProvider {
/**
* Access Key ID.
*/
private final String accessKeyId;
/**
* Access Key Secret.
*/
private final String accessKeySecret;
/**
* security temp token. (optional)
*/
private final String securityToken;
/**
* The ID of the ApsaraMQ for RabbitMQ instance. You can obtain the instance ID in the ApsaraMQ for RabbitMQ console.
*/
private final String instanceId;
public AliyunCredentialsProvider(final String accessKeyId, final String accessKeySecret,
final String instanceId) {
this(accessKeyId, accessKeySecret, null, instanceId);
}
public AliyunCredentialsProvider(final String accessKeyId, final String accessKeySecret,
final String securityToken, final String instanceId) {
this.accessKeyId = accessKeyId;
this.accessKeySecret = accessKeySecret;
this.securityToken = securityToken;
this.instanceId = instanceId;
}
@Override
public String getUsername() {
if(StringUtils.isNotEmpty(securityToken)) {
return UserUtils.getUserName(accessKeyId, instanceId, securityToken);
} else {
return UserUtils.getUserName(accessKeyId, instanceId);
}
}
@Override
public String getPassword() {
try {
return UserUtils.getPassord(accessKeySecret);
} catch (InvalidKeyException e) {
//todo
} catch (NoSuchAlgorithmException e) {
//todo
}
return null;
}
}
Produce a message
Create, compile, and run ProducerTest.java.
import com.rabbitmq.client.*;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
import java.util.HashMap;
import java.util.UUID;
public class ProducerTest {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
// Specify the endpoint. You can view the endpoint of your instance on the Instance Details page in the ApsaraMQ for RabbitMQ console.
factory.setHost("xxx.xxx.aliyuncs.com");
// Set ${instanceId} to the ID of your instance. You can view the instance ID on the Instance Details page in the ApsaraMQ for RabbitMQ console.
// Set ${AccessKey} to the AccessKey ID that you created in the Alibaba Cloud Management Console for identity authentication.
// Set ${SecretKey} to the AccessKey secret that you created in the Alibaba Cloud Management Console for identity authentication.
factory.setCredentialsProvider(new AliyunCredentialsProvider("${AccessKeyID}", "${AccessKeySecret}", "${SecurityToken}","${instanceId}"));
// Enable automatic connection recovery. If you set the value to true, automatic connection recovery is enabled. If you set the value to false, automatic connection recovery is disabled.
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(5000);
// Specify the vhost name. Make sure that the vhost has been created in the ApsaraMQ for RabbitMQ console.
factory.setVirtualHost("${VhostName}");
// Specify the default port. Use port 5672 for non-encrypted connections and port 5671 for encrypted connections.
factory.setPort(5672);
// Set a timeout period based on the network environment.
factory.setConnectionTimeout(30 * 1000);
factory.setHandshakeTimeout(30 * 1000);
factory.setShutdownTimeout(0);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("${ExchangeName}", "${ExchangeType}", true, false, false, null);
channel.queueDeclare("${QueueName}", true, false, false, new HashMap<String, Object>());
channel.queueBind("${QueueName}", "${ExchangeName}", "${BindingKey}");
// Send a message.
for (int i = 0; i < 100; i++ ) {
// Set ${ExchangeName} to an exchange that already exists in the ApsaraMQ for RabbitMQ console. Make sure that the type of the exchange is consistent with that in the console.
// Set ${BindingKey} to the corresponding binding key based on your business requirements.
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(UUID.randomUUID().toString()).build();
channel.basicPublish("${ExchangeName}", "${BindingKey}", true, props,
("messageBody" + i).getBytes(StandardCharsets.UTF_8));
}
connection.close();
}
}
Subscribe to messages
Create, compile, and run ConsumerTest.java to subscribe to messages.
import com.rabbitmq.client.*;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.concurrent.TimeoutException;
public class ConsumerTest {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
// Specify the endpoint. You can view the endpoint of your instance on the Instance Details page in the ApsaraMQ for RabbitMQ console.
factory.setHost("xxx.xxx.aliyuncs.com");
// Set ${instanceId} to the ID of your instance. You can view the instance ID on the Instance Details page in the ApsaraMQ for RabbitMQ console.
// Set ${AccessKey} to the AccessKey ID that you created in the Alibaba Cloud Management Console for identity authentication.
// Set ${SecretKey} to the AccessKey secret that you created in the Alibaba Cloud Management Console for identity authentication.
factory.setCredentialsProvider(new AliyunCredentialsProvider("${AccessKeyID}", "${AccessKeySecret}","${SecurityToken}","${instanceId}"));
// Enable automatic connection recovery. If you set the value to true, automatic connection recovery is enabled. If you set the value to false, automatic connection recovery is disabled.
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(5000);
// Specify the vhost name. Make sure that the vhost has been created in the ApsaraMQ for RabbitMQ console.
factory.setVirtualHost("${VhostName}");
// Specify the default port. Use port 5672 for non-encrypted connections and port 5671 for encrypted connections.
factory.setPort(5672);
factory.setConnectionTimeout(300 * 1000);
factory.setHandshakeTimeout(300 * 1000);
factory.setShutdownTimeout(0);
Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
// Set ${ExchangeName} to an exchange that already exists in the ApsaraMQ for RabbitMQ console. Make sure that the type of the exchange is consistent with that in the console.
AMQP.Exchange.DeclareOk exchangeDeclareOk = channel.exchangeDeclare("${ExchangeName}", "${ExchangeType}", true, false, false, null);
// Set ${QueueName} to a queue that already exists in the ApsaraMQ for RabbitMQ console.
AMQP.Queue.DeclareOk queueDeclareOk = channel.queueDeclare("${QueueName}", true, false, false, new HashMap<String, Object>());
// Set BindingKeyTest to the binding key that is used to bind the queue to the exchange.
AMQP.Queue.BindOk bindOk = channel.queueBind("${QueueName}", "${ExchangeName}", "BindingKeyTest");
// Consume a message.
channel.basicConsume("${QueueName}", false, "ConsumerTag", new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
// Process the received message based on the business logic.
System.out.println("Received: "+ new String(body, StandardCharsets.UTF_8) + ", deliveryTag: " + envelope.getDeliveryTag() + ", messageId: " + properties.getMessageId());
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}
Query messages
If you want to check whether the message is sent to ApsaraMQ for RabbitMQ, you can query the message in the ApsaraMQ for RabbitMQ console. For more information, see Query Messages.