This topic uses a Java software development kit (SDK) as an example to describe how to send and receive messages with an open source SDK in a cross-account authorization scenario that uses a Resource Access Management (RAM) role. The process is similar for SDKs in other languages or frameworks.
Prerequisites
An ApsaraMQ for RabbitMQ instance is created. For more information, see Manage instances.
A vhost is created. For more information, see Manage vhosts.
Background information
When you use a RAM role and Security Token Service (STS) for authorization to access the ApsaraMQ for RabbitMQ service, you must use the AliyunCredentialsProvider authentication class to set the AccessKeyID, AccessKeySecret, and SecurityToken parameters.
You can use RAM roles to grant cross-account authorization, which allows one enterprise to access the ApsaraMQ for RabbitMQ resources of another enterprise.
-
Company A wants to focus on its business systems and own the ApsaraMQ for RabbitMQ resources. Company A can authorize Company B to perform operations, such as operations and maintenance (O&M) and monitoring, on its ApsaraMQ for RabbitMQ resources.
-
Company A does not need to change permissions when employees of Company B join or leave. Company B can assign access permissions for Company A's resources to its own RAM users, such as employees or applications. This provides fine-grained control over access and operations on the resources.
-
Company A can revoke the authorization granted to Company B at any time, for example, if their contract is terminated.
For more information, see RAM cross-account authorization.
Procedure for sending and receiving messages (Java example)

ApsaraMQ for RabbitMQ is fully compatible with open source RabbitMQ. For SDKs in other languages, see SDKs for multiple languages or frameworks that support the open source RabbitMQ AMQP protocol.
Install Java dependency libraries
Add the following dependencies to the pom.xml file.
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.5.0</version> <!-- Supports all open source versions -->
</dependency>
<dependency>
<groupId>com.alibaba.mq-amqp</groupId>
<artifactId>mq-amqp-client</artifactId>
<version>1.0.5</version>
</dependency>
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>alibabacloud-sts20150401</artifactId>
<version>1.0.4</version>
</dependency>
Configure the AliyunCredentialsProvider class
-
Create the AliyunCredentialsProvider.java class. Set the required parameters based on the comments in the code. For more information, see Parameters.
import com.alibaba.mq.amqp.utils.UserUtils; import com.aliyun.auth.credentials.Credential; import com.aliyun.auth.credentials.provider.StaticCredentialProvider; import com.aliyun.sdk.service.sts20150401.AsyncClient; import com.aliyun.sdk.service.sts20150401.models.AssumeRoleRequest; import com.aliyun.sdk.service.sts20150401.models.AssumeRoleResponse; import com.rabbitmq.client.impl.CredentialsProvider; import darabonba.core.client.ClientOverrideConfiguration; import org.apache.commons.lang3.StringUtils; import java.security.InvalidKeyException; import java.security.NoSuchAlgorithmException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; public class AliyunCredentialsProvider implements CredentialsProvider { /** * The default time-to-live (TTL) in milliseconds. Set this value as needed. */ private final long STS_TIMEOUT_DEFAULT = 1800 * 1000; /** * The instance ID. Obtain it from the ApsaraMQ for RabbitMQ console. */ private final String instanceId; /** * The AccessKey ID. */ private String accessKeyId; /** * The AccessKey secret. */ private String accessKeySecret; /** * (Optional) The security token. */ private String securityToken; /** * The STS expiration time. Record this time to update the STS token in advance. */ private Long timeStampLimit; // An AccessKey of an Alibaba Cloud account has permissions to access all APIs. Use a RAM user for API access or routine O&M. // Do not store the AccessKey ID and AccessKey secret in your project code. This can cause an AccessKey breach and threaten the security of all resources in your account. public AliyunCredentialsProvider(final String instanceId) { this.instanceId = instanceId; } public void updateProperties(String alibabaAccessKeyId, String alibabaAccessKeySecret, String region, String roleARN) throws ExecutionException, InterruptedException { this.timeStampLimit = System.currentTimeMillis() + STS_TIMEOUT_DEFAULT; // Call the AssumeRole operation to obtain identity information. StaticCredentialProvider provider = StaticCredentialProvider.create(Credential.builder() .accessKeyId(alibabaAccessKeyId) .accessKeySecret(alibabaAccessKeySecret) .build()); AsyncClient client = AsyncClient.builder() .region(region) // Set the region ID, for example, cn-hangzhou. .credentialsProvider(provider) .overrideConfiguration( ClientOverrideConfiguration.create() // For information about the endpoint, see https://api.aliyun.com/product/Sts. .setEndpointOverride("sts." + region + ".aliyuncs.com") //.setConnectTimeout(Duration.ofSeconds(30)) ) .build(); AssumeRoleRequest assumeRoleRequest = AssumeRoleRequest.builder() .roleArn(roleARN) // The Alibaba Cloud Resource Name (ARN) of the role, obtained from the console. .roleSessionName("testRoleName") // The name of the current role session. You can customize this name. .durationSeconds(STS_TIMEOUT_DEFAULT / 1000) .build(); CompletableFuture<AssumeRoleResponse> response = client.assumeRole(assumeRoleRequest); // Synchronously get the return value of the API request AssumeRoleResponse resp = response.get(); if (resp.getBody().getCredentials() != null) { System.out.println("[INFO] Update AK, SK, Token successfully."); this.accessKeyId = resp.getBody().getCredentials().getAccessKeyId(); this.securityToken = resp.getBody().getCredentials().getSecurityToken(); this.accessKeySecret = resp.getBody().getCredentials().getAccessKeySecret(); } client.close(); } // Check if the current token is about to expire. public boolean isNearlyExpired() { // Check 30 seconds in advance. return System.currentTimeMillis() > timeStampLimit - 30 * 1000L; } @Override public String getUsername() { if(StringUtils.isNotEmpty(securityToken)) { return UserUtils.getUserName(accessKeyId, instanceId, securityToken); } else { return UserUtils.getUserName(accessKeyId, instanceId); } } @Override public String getPassword() { try { return UserUtils.getPassord(accessKeySecret); } catch (InvalidKeyException e) { //todo } catch (NoSuchAlgorithmException e) { //todo } return null; } }
Table 1. Parameters
|
Parameter |
Example |
Description |
|
hostName |
1880770****.mq-amqp.cn-hangzhou-a.aliyuncs.com |
ApsaraMQ for RabbitMQ instance endpoint. |
|
Port |
5672 |
The default port. Port 5672 is for non-encrypted connections, and port 5671 is for encrypted connections. |
|
AccessKeyID |
yourAccessKeyID |
The AccessKey ID of an Alibaba Cloud account or a RAM user. Log on to the RAM console to create a RAM role. Grant the AliyunAMQPFullAccess permission to the role and get the role's Alibaba Cloud Resource Name (ARN). Call the AssumeRole operation to get a temporary identity to assume the role. The AssumeRole operation returns the AccessKeyID, AccessKeySecret, and SecurityToken of the RAM role. For more information about role ARNs, see RAM role overview. |
|
AccessKeySecret |
yourAccessKeySecret |
The AccessKey secret of an Alibaba Cloud account or a RAM user. |
|
region |
cn-hangzhou |
You can invoke the AssumeRole API in the corresponding region. For more information, see AssumeRole. |
|
roleARN |
acs:ram::125xxxxxxx223:role/xxx |
The ARN of the RAM role. The format is |
|
instanceId |
amqp-cn-v0h1kb9nu*** |
The instance ID of ApsaraMQ for RabbitMQ. You can view the ID in the ApsaraMQ for RabbitMQ console on the Instance Details page. For more information about how to view the instance ID, see View instance details. |
|
virtualHost |
Test |
Specifies the Vhost of the ApsaraMQ for RabbitMQ instance. You can view the Vhost on the Vhosts page in the ApsaraMQ for RabbitMQ console. For more information, see View Vhost connection details. |
|
ExchangeName |
ExchangeTest |
The Exchange in ApsaraMQ for RabbitMQ. You can obtain it from the Exchanges page in the ApsaraMQ for RabbitMQ console. |
|
RoutingKey |
RoutingKeyTest |
The routing key for the binding between an Exchange and a Queue in ApsaraMQ for RabbitMQ. You can view the bindings of an exchange on the Exchanges page in the ApsaraMQ for RabbitMQ console to obtain the routing key. |
|
QueueName |
QueueTest |
An ApsaraMQ for RabbitMQ queue. This parameter is required only when you receive messages. You can go to the Exchanges page in the ApsaraMQ for RabbitMQ console to view the binding relationships of an exchange and obtain the queue that is bound to the exchange. |
Producing messages
Create, compile, and run ProducerTest.java.
Before you compile and run ProducerTest.java to send messages, configure the parameters listed in the Parameters table based on the comments in the code.
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
public class ProducerTest {
// Configure information such as the AccessKey pair and ARN as environment variables. Storing this information as plaintext in your project code can cause data breaches.
// The AccessKey ID of your Alibaba Cloud account.
private static final String alibabaAccessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
// The AccessKey secret of your Alibaba Cloud account.
private static final String alibabaAccessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
// The region where the Alibaba Cloud service resides.
private static final String region = System.getenv("ALIBABA_CLOUD_REGION");
// The Alibaba Cloud role ARN. Obtain it from the console.
private static final String roleARN = System.getenv("ALIBABA_CLOUD_ROLE_ARN");
//Set the endpoint of the instance.
private static final String hostName = "xxx.xxx.aliyuncs.com";
private static final String instanceId = "${InstanceId}";
//Set the Vhost of the instance.
private static final String virtualHost = "${VirtualHost}";
//Set the exchange, queue, and binding.
private static final String exchangeName = "${ExchangeName}";
private static final String queueName = "${QueueName}";
private static final String routingKey = "${RoutingKey}";
//Set the exchange type.
private static final String exchangeType = "${ExchangeType}";
public static void main(String[] args) throws InterruptedException, IOException, TimeoutException, ExecutionException {
ConnectionFactory factory = new ConnectionFactory();
// Set the endpoint. View the endpoint on the instance details page in the ApsaraMQ for RabbitMQ console.
factory.setHost(hostName);
// ${instanceId} is the instance ID. View the ID on the overview page in the ApsaraMQ for RabbitMQ console.
AliyunCredentialsProvider aliyunCredentialsProvider =
new AliyunCredentialsProvider(instanceId);
updateSTSProperties(aliyunCredentialsProvider);
// ${instanceId} is the instance ID. View the ID on the instance details page in the ApsaraMQ for RabbitMQ console.
factory.setCredentialsProvider(aliyunCredentialsProvider);
//Set this parameter to true to enable automatic connection recovery. Set it to false to disable this feature.
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(5000);
// Set the Vhost name. Make sure that the Vhost is created in the ApsaraMQ for RabbitMQ console.
factory.setVirtualHost(virtualHost);
// The default port. Port 5672 is for non-encrypted connections, and port 5671 is for encrypted connections.
factory.setPort(5672);
// Set a reasonable timeout period based on your network environment.
factory.setConnectionTimeout(30 * 1000);
factory.setHandshakeTimeout(30 * 1000);
factory.setShutdownTimeout(0);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
channel.queueDeclare(queueName, true, false, false, new HashMap<String, Object>());
channel.queueBind(queueName, exchangeName, routingKey);
// Start sending messages. A total of 3,600 messages are sent, with a 1-second pause after each message. The process lasts for 1 hour.
for (int i = 0; i < 3600; i++) {
try {
if (aliyunCredentialsProvider.isNearlyExpired()) {
// The token may have expired. Re-authenticate.
System.out.println("[WARN] Token maybe expired, so try to update it.");
updateSTSProperties(aliyunCredentialsProvider);
factory.setCredentialsProvider(aliyunCredentialsProvider);
// After the configuration is updated, you must re-establish the connection.
connection = factory.newConnection();
channel = connection.createChannel();
}
// The ${ExchangeName} must exist in the ApsaraMQ for RabbitMQ console, and its type must be the same as the type specified in the console.
// Enter a routing key for ${RoutingKey} as needed.
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(UUID.randomUUID().toString()).build();
channel.basicPublish(exchangeName, routingKey, true, props,
("Message body-" + i).getBytes(StandardCharsets.UTF_8));
System.out.println("[SendResult] Message sent successfully, messageId: " + props.getMessageId());
Thread.sleep(1000L);
} catch (Exception e) {
System.out.println("[ERROR] Send fail, error: " + e.getMessage());
Thread.sleep(5000L);
}
}
connection.close();
}
public static void updateSTSProperties(AliyunCredentialsProvider aliyunCredentialsProvider) throws ExecutionException, InterruptedException {
System.out.println("Try to update STS properties");
// Configure the AccessKey pair as environment variables. Storing the AccessKey pair as plaintext in your project code can cause data breaches.
aliyunCredentialsProvider.updateProperties(alibabaAccessKeyId, alibabaAccessKeySecret, region, roleARN);
}
}
ApsaraMQ for RabbitMQ throttles the peak traffic of a single instance in transactions per second (TPS). For more information, see Instance Throttling Best Practices.
Subscribe to messages
Create, compile, and run ConsumerTest.java to subscribe to messages.
Before you compile and run ConsumerTest.java to subscribe to messages, configure the parameters listed in the Parameters table based on the comments in the code.
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
public class ConsumerTest {
// Configure information such as the AccessKey pair and ARN as environment variables. Storing this information as plaintext in your project code can cause data breaches.
// The AccessKey ID of your Alibaba Cloud account.
private static final String alibabaAccessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
// The AccessKey secret of your Alibaba Cloud account.
private static final String alibabaAccessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
// The region where the Alibaba Cloud service resides.
private static final String region = System.getenv("ALIBABA_CLOUD_REGION");
// The Alibaba Cloud role ARN. Obtain it from the console.
private static final String roleARN = System.getenv("ALIBABA_CLOUD_ROLE_ARN");
//Set the endpoint of the instance.
private static final String hostName = "xxx.xxx.aliyuncs.com";
private static final String instanceId = "${InstanceId}";
//Set the Vhost of the instance.
private static final String virtualHost = "${VirtualHost}";
//Set the queue.
private static final String queueName = "${QueueName}";
public static void main(String[] args) throws IOException, TimeoutException, ExecutionException, InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
// Set the endpoint. View the endpoint on the instance details page in the ApsaraMQ for RabbitMQ console.
factory.setHost(hostName);
// ${instanceId} is the instance ID. View the ID on the overview page in the ApsaraMQ for RabbitMQ console.
AliyunCredentialsProvider aliyunCredentialsProvider =
new AliyunCredentialsProvider(instanceId);
updateSTSProperties(aliyunCredentialsProvider);
factory.setCredentialsProvider(aliyunCredentialsProvider);
//Set this parameter to true to enable automatic connection recovery. Set it to false to disable this feature.
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(5000);
// Set the Vhost name. Make sure that the Vhost is created in the ApsaraMQ for RabbitMQ console.
factory.setVirtualHost(virtualHost);
// The default port. Port 5672 is for non-encrypted connections, and port 5671 is for encrypted connections.
factory.setPort(5672);
factory.setConnectionTimeout(300 * 1000);
factory.setHandshakeTimeout(300 * 1000);
factory.setShutdownTimeout(0);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// Create ${QueueName}. The queue must exist in the ApsaraMQ for RabbitMQ console.
AMQP.Queue.DeclareOk queueDeclareOk = channel.queueDeclare(queueName, true, false, false, new HashMap<String, Object>());
consume(channel, queueName);
System.out.println("Consumer started.");
// Check in a loop whether the STS token is about to expire. If it is, update the connection and resume consumption.
// For ease of understanding, a while loop is used here to check if the token is about to expire.
// You can use a scheduled task to implement the check and update operations more elegantly.
while (true) {
// After each message is processed, you can check if the token is about to expire.
// If it is about to expire, update the credential provider class.
// This process requires you to re-create the connection to ensure business continuity.
if (aliyunCredentialsProvider.isNearlyExpired()) {
System.out.println("token maybe expired, so try to update it.");
updateSTSProperties(aliyunCredentialsProvider);
factory.setCredentialsProvider(aliyunCredentialsProvider);
connection.close();
connection = factory.newConnection();
channel = connection.createChannel();
// Resume message consumption.
consume(channel, queueName);
System.out.println("Consumer started.");
} else {
// Check once per second.
Thread.sleep(1000);
}
}
}
public static void consume(Channel channel, String queueName) throws IOException {
channel.basicConsume(queueName, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) {
try {
//Process the received message based on your business logic.
System.out.println("Received: " + new String(body, StandardCharsets.UTF_8) + ", deliveryTag: " + envelope.getDeliveryTag() + ", messageId: " + properties.getMessageId());
channel.basicAck(envelope.getDeliveryTag(), false);
} catch (Exception e) {
System.out.println("Exception, cause:" + e.getMessage());
}
}
});
}
public static void updateSTSProperties(AliyunCredentialsProvider aliyunCredentialsProvider) throws ExecutionException, InterruptedException {
System.out.println("Try to update STS properties");
aliyunCredentialsProvider.updateProperties(alibabaAccessKeyId, alibabaAccessKeySecret, region, roleARN);
}
}
Query messages
To confirm that messages are successfully sent to ApsaraMQ for RabbitMQ, you can query the messages in the ApsaraMQ for RabbitMQ console. For more information, see Query messages.