All Products
Search
Document Center

ApsaraMQ for RabbitMQ:Connecting to ApsaraMQ for RabbitMQ across Alibaba Cloud accounts using RAM roles

Last Updated:Mar 07, 2026

This topic uses a Java software development kit (SDK) as an example to describe how to send and receive messages with an open source SDK in a cross-account authorization scenario that uses a Resource Access Management (RAM) role. The process is similar for SDKs in other languages or frameworks.

Prerequisites

  • An ApsaraMQ for RabbitMQ instance is created. For more information, see Manage instances.

  • A vhost is created. For more information, see Manage vhosts.

Background information

When you use a RAM role and Security Token Service (STS) for authorization to access the ApsaraMQ for RabbitMQ service, you must use the AliyunCredentialsProvider authentication class to set the AccessKeyID, AccessKeySecret, and SecurityToken parameters.

You can use RAM roles to grant cross-account authorization, which allows one enterprise to access the ApsaraMQ for RabbitMQ resources of another enterprise.

  • Company A wants to focus on its business systems and own the ApsaraMQ for RabbitMQ resources. Company A can authorize Company B to perform operations, such as operations and maintenance (O&M) and monitoring, on its ApsaraMQ for RabbitMQ resources.

  • Company A does not need to change permissions when employees of Company B join or leave. Company B can assign access permissions for Company A's resources to its own RAM users, such as employees or applications. This provides fine-grained control over access and operations on the resources.

  • Company A can revoke the authorization granted to Company B at any time, for example, if their contract is terminated.

For more information, see RAM cross-account authorization.

Procedure for sending and receiving messages (Java example)

Open source SDK message sending and receiving process

Note

ApsaraMQ for RabbitMQ is fully compatible with open source RabbitMQ. For SDKs in other languages, see SDKs for multiple languages or frameworks that support the open source RabbitMQ AMQP protocol.

Install Java dependency libraries

Add the following dependencies to the pom.xml file.

<dependency>            
    <groupId>com.rabbitmq</groupId>            
    <artifactId>amqp-client</artifactId>            
    <version>5.5.0</version> <!-- Supports all open source versions -->       
</dependency>        
<dependency>          
    <groupId>com.alibaba.mq-amqp</groupId>          
    <artifactId>mq-amqp-client</artifactId>         
    <version>1.0.5</version>       
</dependency>       
<dependency>          
    <groupId>com.aliyun</groupId>          
    <artifactId>alibabacloud-sts20150401</artifactId>          
    <version>1.0.4</version>       
</dependency>

Configure the AliyunCredentialsProvider class

  1. Create the AliyunCredentialsProvider.java class. Set the required parameters based on the comments in the code. For more information, see Parameters.

    import com.alibaba.mq.amqp.utils.UserUtils;
    import com.aliyun.auth.credentials.Credential;
    import com.aliyun.auth.credentials.provider.StaticCredentialProvider;
    import com.aliyun.sdk.service.sts20150401.AsyncClient;
    import com.aliyun.sdk.service.sts20150401.models.AssumeRoleRequest;
    import com.aliyun.sdk.service.sts20150401.models.AssumeRoleResponse;
    import com.rabbitmq.client.impl.CredentialsProvider;
    import darabonba.core.client.ClientOverrideConfiguration;
    import org.apache.commons.lang3.StringUtils;
    
    import java.security.InvalidKeyException;
    import java.security.NoSuchAlgorithmException;
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutionException;
    
    public class AliyunCredentialsProvider implements CredentialsProvider {
        /**
         * The default time-to-live (TTL) in milliseconds. Set this value as needed.
         */
        private final long STS_TIMEOUT_DEFAULT = 1800 * 1000;
        /**
         * The instance ID. Obtain it from the ApsaraMQ for RabbitMQ console.
         */
        private final String instanceId;
        /**
         * The AccessKey ID.
         */
        private String accessKeyId;
        /**
         * The AccessKey secret.
         */
        private String accessKeySecret;
        /**
         *  (Optional) The security token.
         */
        private String securityToken;
        /**
         * The STS expiration time. Record this time to update the STS token in advance.
         */
        private Long timeStampLimit;
    
        // An AccessKey of an Alibaba Cloud account has permissions to access all APIs. Use a RAM user for API access or routine O&M.
        // Do not store the AccessKey ID and AccessKey secret in your project code. This can cause an AccessKey breach and threaten the security of all resources in your account.
        public AliyunCredentialsProvider(final String instanceId) {
            this.instanceId = instanceId;
        }
        public void updateProperties(String alibabaAccessKeyId, String alibabaAccessKeySecret, String region, String roleARN) throws ExecutionException, InterruptedException {
            this.timeStampLimit = System.currentTimeMillis() + STS_TIMEOUT_DEFAULT;
            // Call the AssumeRole operation to obtain identity information.
            StaticCredentialProvider provider = StaticCredentialProvider.create(Credential.builder()
                    .accessKeyId(alibabaAccessKeyId)
                    .accessKeySecret(alibabaAccessKeySecret)
                    .build());
            AsyncClient client = AsyncClient.builder()
                    .region(region) // Set the region ID, for example, cn-hangzhou.
                    .credentialsProvider(provider)
                    .overrideConfiguration(
                            ClientOverrideConfiguration.create()
                                    // For information about the endpoint, see https://api.aliyun.com/product/Sts.
                                    .setEndpointOverride("sts." + region + ".aliyuncs.com")
                            //.setConnectTimeout(Duration.ofSeconds(30))
                    )
                    .build();
            AssumeRoleRequest assumeRoleRequest = AssumeRoleRequest.builder()
                    .roleArn(roleARN) // The Alibaba Cloud Resource Name (ARN) of the role, obtained from the console.
                    .roleSessionName("testRoleName") // The name of the current role session. You can customize this name.
                    .durationSeconds(STS_TIMEOUT_DEFAULT / 1000)
                    .build();
            CompletableFuture<AssumeRoleResponse> response = client.assumeRole(assumeRoleRequest);
            // Synchronously get the return value of the API request
            AssumeRoleResponse resp = response.get();
            if (resp.getBody().getCredentials() != null) {
                System.out.println("[INFO] Update AK, SK, Token successfully.");
                this.accessKeyId = resp.getBody().getCredentials().getAccessKeyId();
                this.securityToken = resp.getBody().getCredentials().getSecurityToken();
                this.accessKeySecret = resp.getBody().getCredentials().getAccessKeySecret();
            }
            client.close();
        }
    
        // Check if the current token is about to expire.
        public boolean isNearlyExpired() {
            // Check 30 seconds in advance.
            return System.currentTimeMillis() > timeStampLimit - 30 * 1000L;
        }
    
        @Override
        public String getUsername() {
            if(StringUtils.isNotEmpty(securityToken)) {
                return UserUtils.getUserName(accessKeyId, instanceId, securityToken);
            } else {
                return UserUtils.getUserName(accessKeyId, instanceId);
            }
        }
    
        @Override
        public String getPassword() {
            try {
                return UserUtils.getPassord(accessKeySecret);
            } catch (InvalidKeyException e) {
                //todo
            } catch (NoSuchAlgorithmException e) {
                //todo
            }
            return null;
        }
    }

Table 1. Parameters

Parameter

Example

Description

hostName

1880770****.mq-amqp.cn-hangzhou-a.aliyuncs.com

ApsaraMQ for RabbitMQ instance endpoint.

Port

5672

The default port. Port 5672 is for non-encrypted connections, and port 5671 is for encrypted connections.

AccessKeyID

yourAccessKeyID

The AccessKey ID of an Alibaba Cloud account or a RAM user. Log on to the RAM console to create a RAM role. Grant the AliyunAMQPFullAccess permission to the role and get the role's Alibaba Cloud Resource Name (ARN). Call the AssumeRole operation to get a temporary identity to assume the role. The AssumeRole operation returns the AccessKeyID, AccessKeySecret, and SecurityToken of the RAM role. For more information about role ARNs, see RAM role overview.

AccessKeySecret

yourAccessKeySecret

The AccessKey secret of an Alibaba Cloud account or a RAM user.

region

cn-hangzhou

You can invoke the AssumeRole API in the corresponding region. For more information, see AssumeRole.

roleARN

acs:ram::125xxxxxxx223:role/xxx

The ARN of the RAM role. The format is acs:ram::<account_id>:role/<role_name>. For more information, see AssumeRole.

instanceId

amqp-cn-v0h1kb9nu***

The instance ID of ApsaraMQ for RabbitMQ. You can view the ID in the ApsaraMQ for RabbitMQ console on the Instance Details page. For more information about how to view the instance ID, see View instance details.

virtualHost

Test

Specifies the Vhost of the ApsaraMQ for RabbitMQ instance. You can view the Vhost on the Vhosts page in the ApsaraMQ for RabbitMQ console. For more information, see View Vhost connection details.

ExchangeName

ExchangeTest

The Exchange in ApsaraMQ for RabbitMQ. You can obtain it from the Exchanges page in the ApsaraMQ for RabbitMQ console.

RoutingKey

RoutingKeyTest

The routing key for the binding between an Exchange and a Queue in ApsaraMQ for RabbitMQ. You can view the bindings of an exchange on the Exchanges page in the ApsaraMQ for RabbitMQ console to obtain the routing key.

QueueName

QueueTest

An ApsaraMQ for RabbitMQ queue. This parameter is required only when you receive messages. You can go to the Exchanges page in the ApsaraMQ for RabbitMQ console to view the binding relationships of an exchange and obtain the queue that is bound to the exchange.

Producing messages

Create, compile, and run ProducerTest.java.

Important

Before you compile and run ProducerTest.java to send messages, configure the parameters listed in the Parameters table based on the comments in the code.

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;

public class ProducerTest {
    // Configure information such as the AccessKey pair and ARN as environment variables. Storing this information as plaintext in your project code can cause data breaches.
    // The AccessKey ID of your Alibaba Cloud account.
    private static final String alibabaAccessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
    // The AccessKey secret of your Alibaba Cloud account.
    private static final String alibabaAccessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
    // The region where the Alibaba Cloud service resides.
    private static final String region = System.getenv("ALIBABA_CLOUD_REGION");
    // The Alibaba Cloud role ARN. Obtain it from the console.
    private static final String roleARN = System.getenv("ALIBABA_CLOUD_ROLE_ARN");
    //Set the endpoint of the instance.
    private static final String hostName = "xxx.xxx.aliyuncs.com";
    private static final String instanceId = "${InstanceId}";
    //Set the Vhost of the instance.
    private static final String virtualHost = "${VirtualHost}";
    //Set the exchange, queue, and binding.
    private static final String exchangeName = "${ExchangeName}";
    private static final String queueName = "${QueueName}";
    private static final String routingKey = "${RoutingKey}";
    //Set the exchange type.
    private static final String exchangeType = "${ExchangeType}";

    public static void main(String[] args) throws InterruptedException, IOException, TimeoutException, ExecutionException {
        ConnectionFactory factory = new ConnectionFactory();
        // Set the endpoint. View the endpoint on the instance details page in the ApsaraMQ for RabbitMQ console.
        factory.setHost(hostName);
        // ${instanceId} is the instance ID. View the ID on the overview page in the ApsaraMQ for RabbitMQ console.
        AliyunCredentialsProvider aliyunCredentialsProvider =
                new AliyunCredentialsProvider(instanceId);
        updateSTSProperties(aliyunCredentialsProvider);
        // ${instanceId} is the instance ID. View the ID on the instance details page in the ApsaraMQ for RabbitMQ console.
        factory.setCredentialsProvider(aliyunCredentialsProvider);
        //Set this parameter to true to enable automatic connection recovery. Set it to false to disable this feature.
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(5000);
        // Set the Vhost name. Make sure that the Vhost is created in the ApsaraMQ for RabbitMQ console.
        factory.setVirtualHost(virtualHost);
        // The default port. Port 5672 is for non-encrypted connections, and port 5671 is for encrypted connections.
        factory.setPort(5672);
        // Set a reasonable timeout period based on your network environment.
        factory.setConnectionTimeout(30 * 1000);
        factory.setHandshakeTimeout(30 * 1000);
        factory.setShutdownTimeout(0);
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
        channel.queueDeclare(queueName, true, false, false, new HashMap<String, Object>());
        channel.queueBind(queueName, exchangeName, routingKey);
        // Start sending messages. A total of 3,600 messages are sent, with a 1-second pause after each message. The process lasts for 1 hour.
        for (int i = 0; i < 3600; i++) {
            try {
                if (aliyunCredentialsProvider.isNearlyExpired()) {
                    // The token may have expired. Re-authenticate.
                    System.out.println("[WARN] Token maybe expired, so try to update it.");
                    updateSTSProperties(aliyunCredentialsProvider);
                    factory.setCredentialsProvider(aliyunCredentialsProvider);
                    // After the configuration is updated, you must re-establish the connection.
                    connection = factory.newConnection();
                    channel = connection.createChannel();
                }
                // The ${ExchangeName} must exist in the ApsaraMQ for RabbitMQ console, and its type must be the same as the type specified in the console.
                // Enter a routing key for ${RoutingKey} as needed.
                AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(UUID.randomUUID().toString()).build();
                channel.basicPublish(exchangeName, routingKey, true, props,
                        ("Message body-"  + i).getBytes(StandardCharsets.UTF_8));
                System.out.println("[SendResult] Message sent successfully, messageId: " + props.getMessageId());
                Thread.sleep(1000L);
            } catch (Exception e) {
                System.out.println("[ERROR] Send fail, error: " + e.getMessage());
                Thread.sleep(5000L);
            }
        }
        connection.close();
    }

    public static void updateSTSProperties(AliyunCredentialsProvider aliyunCredentialsProvider) throws ExecutionException, InterruptedException {
        System.out.println("Try to update STS properties");
        // Configure the AccessKey pair as environment variables. Storing the AccessKey pair as plaintext in your project code can cause data breaches.
        aliyunCredentialsProvider.updateProperties(alibabaAccessKeyId, alibabaAccessKeySecret, region, roleARN);
    }
}
Note

ApsaraMQ for RabbitMQ throttles the peak traffic of a single instance in transactions per second (TPS). For more information, see Instance Throttling Best Practices.

Subscribe to messages

Create, compile, and run ConsumerTest.java to subscribe to messages.

Important

Before you compile and run ConsumerTest.java to subscribe to messages, configure the parameters listed in the Parameters table based on the comments in the code.

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;

public class ConsumerTest {
    // Configure information such as the AccessKey pair and ARN as environment variables. Storing this information as plaintext in your project code can cause data breaches.
    // The AccessKey ID of your Alibaba Cloud account.
    private static final String alibabaAccessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
    // The AccessKey secret of your Alibaba Cloud account.
    private static final String alibabaAccessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
    // The region where the Alibaba Cloud service resides.
    private static final String region = System.getenv("ALIBABA_CLOUD_REGION");
    // The Alibaba Cloud role ARN. Obtain it from the console.
    private static final String roleARN = System.getenv("ALIBABA_CLOUD_ROLE_ARN");
    //Set the endpoint of the instance.
    private static final String hostName = "xxx.xxx.aliyuncs.com";
    private static final String instanceId = "${InstanceId}";
    //Set the Vhost of the instance.
    private static final String virtualHost = "${VirtualHost}";
    //Set the queue.
    private static final String queueName = "${QueueName}";

    public static void main(String[] args) throws IOException, TimeoutException, ExecutionException, InterruptedException {
        ConnectionFactory factory = new ConnectionFactory();
        // Set the endpoint. View the endpoint on the instance details page in the ApsaraMQ for RabbitMQ console.
        factory.setHost(hostName);
        // ${instanceId} is the instance ID. View the ID on the overview page in the ApsaraMQ for RabbitMQ console.
        AliyunCredentialsProvider aliyunCredentialsProvider =
                new AliyunCredentialsProvider(instanceId);
        updateSTSProperties(aliyunCredentialsProvider);
        factory.setCredentialsProvider(aliyunCredentialsProvider);
        //Set this parameter to true to enable automatic connection recovery. Set it to false to disable this feature.
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(5000);
        // Set the Vhost name. Make sure that the Vhost is created in the ApsaraMQ for RabbitMQ console.
        factory.setVirtualHost(virtualHost);
        // The default port. Port 5672 is for non-encrypted connections, and port 5671 is for encrypted connections.
        factory.setPort(5672);
        factory.setConnectionTimeout(300 * 1000);
        factory.setHandshakeTimeout(300 * 1000);
        factory.setShutdownTimeout(0);
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        // Create ${QueueName}. The queue must exist in the ApsaraMQ for RabbitMQ console.
        AMQP.Queue.DeclareOk queueDeclareOk = channel.queueDeclare(queueName, true, false, false, new HashMap<String, Object>());
        consume(channel, queueName);
        System.out.println("Consumer started.");

        // Check in a loop whether the STS token is about to expire. If it is, update the connection and resume consumption.
        // For ease of understanding, a while loop is used here to check if the token is about to expire.
        // You can use a scheduled task to implement the check and update operations more elegantly.
        while (true) {
            // After each message is processed, you can check if the token is about to expire.
            // If it is about to expire, update the credential provider class.
            // This process requires you to re-create the connection to ensure business continuity.
            if (aliyunCredentialsProvider.isNearlyExpired()) {
                System.out.println("token maybe expired, so try to update it.");
                updateSTSProperties(aliyunCredentialsProvider);
                factory.setCredentialsProvider(aliyunCredentialsProvider);
                connection.close();
                connection = factory.newConnection();
                channel = connection.createChannel();
                // Resume message consumption.
                consume(channel, queueName);
                System.out.println("Consumer started.");
            } else {
                // Check once per second.
                Thread.sleep(1000);
            }
        }
    }

    public static void consume(Channel channel, String queueName) throws IOException {

        channel.basicConsume(queueName, false, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) {
                try {
                    //Process the received message based on your business logic.
                    System.out.println("Received: " + new String(body, StandardCharsets.UTF_8) + ", deliveryTag: " + envelope.getDeliveryTag() + ", messageId: " + properties.getMessageId());
                    channel.basicAck(envelope.getDeliveryTag(), false);
                } catch (Exception e) {
                    System.out.println("Exception, cause:" + e.getMessage());
                }
            }
        });
    }

    public static void updateSTSProperties(AliyunCredentialsProvider aliyunCredentialsProvider) throws ExecutionException, InterruptedException {
        System.out.println("Try to update STS properties");
        aliyunCredentialsProvider.updateProperties(alibabaAccessKeyId, alibabaAccessKeySecret, region, roleARN);
    }
}

Query messages

To confirm that messages are successfully sent to ApsaraMQ for RabbitMQ, you can query the messages in the ApsaraMQ for RabbitMQ console. For more information, see Query messages.