All Products
Search
Document Center

ApsaraMQ for RabbitMQ:Open source RabbitMQ SDKs (cross-account authorization by using a RAM role)

Last Updated:Mar 15, 2024

This topic describes how to use an open source RabbitMQ SDK to send and receive messages in scenarios in which you use a Resource Access Management (RAM) role to grant permissions across Alibaba Cloud accounts. This topic helps you better understand the messaging process. In this topic, the SDK for Java is used to send and receive messages. The process for sending and receiving messages by using open source RabbitMQ SDKs for other programming languages or frameworks is similar to the process described in this topic.

Prerequisites

  • An ApsaraMQ for RabbitMQ instance is created. For more information, see Manage instances.

  • A vhost is created. For more information, see Manage vhosts.

Background information

If you want to use the temporary Security Token Service (STS) token generated for a RAM role to access ApsaraMQ for RabbitMQ, you must configure the AccessKeyID, AccessKeySecret, and SecurityToken parameters in the AliyunCredentialsProvider class for authentication.

You can use a RAM role to grant permissions across Alibaba Cloud accounts. This way, an enterprise can access the ApsaraMQ for RabbitMQ instance of another enterprise.

  • Enterprise A wants to focus on its business systems and act only as the owner of ApsaraMQ for RabbitMQ resources. Enterprise A can authorize Enterprise B to maintain, monitor, and manage ApsaraMQ for RabbitMQ resources.

  • If an employee joins or leaves Enterprise B, Enterprise A does not need to make modifications to the granted permissions. Enterprise B can grant its RAM users fine-grained permissions on cloud resources of Enterprise A. The RAM user credentials can be assigned to employees or applications.

  • If the agreement between Enterprise A and Enterprise B ends, Enterprise A can revoke the permissions from Enterprise B.

For more information, see Grant permissions across Alibaba Cloud accounts.

Messaging process (Java used as an example)

开源SDK消息收发流程

Note

ApsaraMQ for RabbitMQ is fully compatible with open source RabbitMQ. For information about open source RabbitMQ SDKs for other programming languages, see Programming languages and frameworks supported by open source RabbitMQ SDKs over AMQP.

Install Java dependencies

Add the following dependencies to the pom.xml file:

<dependency>            
    <groupId>com.rabbitmq</groupId>            
    <artifactId>amqp-client</artifactId>            
    <version>5.5.0</version> <!-- All versions of open source RabbitMQ are supported. -->       
</dependency>        
<dependency>          
    <groupId>com.alibaba.mq-amqp</groupId>          
    <artifactId>mq-amqp-client</artifactId>         
    <version>1.0.5</version>       
</dependency>       
<dependency>          
    <groupId>com.aliyun</groupId>          
    <artifactId>alibabacloud-sts20150401</artifactId>          
    <version>1.0.4</version>       
</dependency>

Configure the AliyunCredentialsProvider.java class

  1. Create the AliyunCredentialsProvider.java class for authentication and configure the parameters based on the code comments. For more information, see Parameters.

    import com.alibaba.mq.amqp.utils.UserUtils;
    import com.aliyun.auth.credentials.Credential;
    import com.aliyun.auth.credentials.provider.StaticCredentialProvider;
    import com.aliyun.sdk.service.sts20150401.AsyncClient;
    import com.aliyun.sdk.service.sts20150401.models.AssumeRoleRequest;
    import com.aliyun.sdk.service.sts20150401.models.AssumeRoleResponse;
    import com.rabbitmq.client.impl.CredentialsProvider;
    import darabonba.core.client.ClientOverrideConfiguration;
    import org.apache.commons.lang3.StringUtils;
    
    import java.security.InvalidKeyException;
    import java.security.NoSuchAlgorithmException;
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutionException;
    
    public class AliyunCredentialsProvider implements CredentialsProvider {
        /**
         * The default expiration period. Unit: milliseconds. You can configure this parameter based on your business requirements. 
         */
        private final long STS_TIMEOUT_DEFAULT = 1800 * 1000;
        /**
         * The ID of the ApsaraMQ for RabbitMQ instance. You can obtain the instance ID in the ApsaraMQ for RabbitMQ console. 
         */
        private final String instanceId;
        /**
         * The AccessKey ID. 
         */
        private String accessKeyId;
        /**
         * The AccessKey secret. 
         */
        private String accessKeySecret;
        /**
         * (Optional) The temporary security token. 
         */
        private String securityToken;
        /**
         * The expiration time of the STS token. This parameter allows you to update the STS token in advance. 
         */
        private Long timeStampLimit;
    
        // The AccessKey pair of your Alibaba Cloud account can be used to access all API operations. We recommend that you use the AccessKey pair of a RAM user to access API operations and perform routine O&M. 
        // We strongly recommend that you do not save your AccessKey pair in your project code. Otherwise, your AccessKey pair may be leaked and all resources that are contained in your Alibaba Cloud account are exposed to potential security risks. 
        public AliyunCredentialsProvider(final String instanceId) {
            this.instanceId = instanceId;
        }
        public void updateProperties(String alibabaAccessKeyId, String alibabaAccessKeySecret, String region, String roleARN) throws ExecutionException, InterruptedException {
            this.timeStampLimit = System.currentTimeMillis() + STS_TIMEOUT_DEFAULT;
            // Call the AssumeRole operation to obtain the identity credentials. 
            StaticCredentialProvider provider = StaticCredentialProvider.create(Credential.builder()
                    .accessKeyId(alibabaAccessKeyId)
                    .accessKeySecret(alibabaAccessKeySecret)
                    .build());
            AsyncClient client = AsyncClient.builder()
                    .region(region) // The region ID. Example: cn-hangzhou. 
                    .credentialsProvider(provider)
                    .overrideConfiguration(
                            ClientOverrideConfiguration.create()
                                    // For information about endpoints, see https://api.aliyun.com/product/Sts. 
                                    .setEndpointOverride("sts." + region + ".aliyuncs.com")
                            //.setConnectTimeout(Duration.ofSeconds(30))
                    )
                    .build();
            AssumeRoleRequest assumeRoleRequest = AssumeRoleRequest.builder()
                    .roleArn(roleARN) // The Alibaba Cloud Resource Name (ARN) of the RAM role that you obtain in the ApsaraMQ for RabbitMQ console. 
                    .roleSessionName("testRoleName") // The session name of the RAM role. You can specify a custom session name. 
                    .durationSeconds(STS_TIMEOUT_DEFAULT / 1000)
                    .build();
            CompletableFuture<AssumeRoleResponse> response = client.assumeRole(assumeRoleRequest);
            // Synchronously get the return value of the API request
            AssumeRoleResponse resp = response.get();
            if (resp.getBody().getCredentials() != null) {
                System.out.println("[INFO] Update AK, SK, Token successfully.");
                this.accessKeyId = resp.getBody().getCredentials().getAccessKeyId();
                this.securityToken = resp.getBody().getCredentials().getSecurityToken();
                this.accessKeySecret = resp.getBody().getCredentials().getAccessKeySecret();
            }
            client.close();
        }
    
        // Check whether the token is about to expire. 
        public boolean isNearlyExpired() {
            // Determine whether the token is about to expire 30 seconds in advance. 
            return System.currentTimeMillis() > timeStampLimit - 30 * 1000L;
        }
    
        @Override
        public String getUsername() {
            if(StringUtils.isNotEmpty(securityToken)) {
                return UserUtils.getUserName(accessKeyId, instanceId, securityToken);
            } else {
                return UserUtils.getUserName(accessKeyId, instanceId);
            }
        }
    
        @Override
        public String getPassword() {
            try {
                return UserUtils.getPassord(accessKeySecret);
            } catch (InvalidKeyException e) {
                //todo
            } catch (NoSuchAlgorithmException e) {
                //todo
            }
            return null;
        }
    }

Table 1. Parameters

Parameter

Example

Description

hostName

1880770****.mq-amqp.cn-hangzhou-a.aliyuncs.com

The endpoint of the ApsaraMQ for RabbitMQ instance.

Port

5672

The default port. Use port 5672 for non-encrypted connections and port 5671 for encrypted connections.

AccessKeyID

LTAI5tJQKnB9zVvQ****

The AccessKey ID of your Alibaba Cloud account or a RAM user within the Alibaba Cloud account. You can log on to the RAM console, create a RAM role, grant the role the AliyunAMQPFullAccess permission, obtain the ARN of the RAM role, and then call the AssumeRole operation to obtain a temporary identity to assume the role. A successful call to the AssumeRole operation returns the AccessKeyID, AccessKeySecret, and SecurityToken parameters of the RAM role. For information about the ARN of a RAM role, see RAM role overview.

AccessKeySecret

jw6MawfKOVBveRr84u****

The AccessKey secret of your Alibaba Cloud account or a RAM user within the Alibaba Cloud account.

region

cn-hangzhou

Call the AssumeRole operation in the corresponding region. For more information, see AssumeRole.

roleARN

acs:ram::125xxxxxxx223:role/xxx

The ARN of the RAM role. The ARN is in the acs:ram::<account-id>:role/<role-name> format. For more information, see AssumeRole.

instanceId

amqp-cn-v0h1kb9nu***

The ID of the ApsaraMQ for RabbitMQ instance. You can view the instance ID on the Instance Details page in the ApsaraMQ for RabbitMQ console. For information about how to view the ID of an instance, see View instance details.

virtualHost

Test

The vhost that you created in the ApsaraMQ for RabbitMQ instance. You can view the vhost on the vhosts page in the ApsaraMQ for RabbitMQ console. For more information, see View the statistics of a vhost.

ExchangeName

ExchangeTest

The exchange that you created in the ApsaraMQ for RabbitMQ instance. You can view the exchange on the Exchanges page in the ApsaraMQ for RabbitMQ console.

BindingKey

BindingKeyTest

The binding key that is used to bind the exchange to a queue in ApsaraMQ for RabbitMQ. You can view the binding of the exchange and obtain the binding key on the Exchanges page in the ApsaraMQ for RabbitMQ console.

QueueName

QueueTest

The queue that you created in the ApsaraMQ for RabbitMQ instance. This parameter is required only when you subscribe to messages. You can view the binding of the exchange and obtain the queue to which the exchange is bound on the Exchanges page in the ApsaraMQ for RabbitMQ console.

Produce messages

Create, compile, and run ProducerTest.java to produce messages.

Important

Before you compile and run ProducerTest.java to produce messages, you must configure the parameters that are described in the Parameters table.

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;

public class ProducerTest {
    // We recommend that you configure the AccessKey pair and ARN in environment variables. If you save the AccessKey pair and ARN in the project code, data may be leaked. 
    // The AccessKey ID of your Alibaba Cloud account. 
    private static final String alibabaAccessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
    // The AccessKey secret of your Alibaba Cloud account. 
    private static final String alibabaAccessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
    // The region where the Alibaba Cloud service resides. 
    private static final String region = System.getenv("ALIBABA_CLOUD_REGION");
    // The ARN of the RAM role. You can obtain the ARN in the ApsaraMQ for RabbitMQ console. 
    private static final String roleARN = System.getenv("ALIBABA_CLOUD_ROLE_ARN");
    // The endpoint of the ApsaraMQ for RabbitMQ instance. 
    private static final String hostName = "xxx.xxx.aliyuncs.com";
    private static final String instanceId = "${InstanceId}";
    // The vhost of the ApsaraMQ for RabbitMQ instance. 
    private static final String virtualHost = "${VirtualHost}";
    // The binding between the exchange and the queue. 
    private static final String exchangeName = "${ExchangeName}";
    private static final String queueName = "${QueueName}";
    private static final String bindingKey = "${BindingKey}";
    // The exchange type. 
    private static final String exchangeType = "${ExchangeType}";

    public static void main(String[] args) throws InterruptedException, IOException, TimeoutException, ExecutionException {
        ConnectionFactory factory = new ConnectionFactory();
        // The endpoint. You can view the endpoint of an instance on the Instance Details page in the ApsaraMQ for RabbitMQ console. 
        factory.setHost(hostName);
        // Replace ${instanceId} with the ID of the ApsaraMQ for RabbitMQ instance. You can view the instance ID on the Instance Details page in the ApsaraMQ for RabbitMQ console. 
        AliyunCredentialsProvider aliyunCredentialsProvider =
                new AliyunCredentialsProvider(instanceId);
        updateSTSProperties(aliyunCredentialsProvider);
        // Replace ${instanceId} with the ID of the ApsaraMQ for RabbitMQ instance. You can view the instance ID on the Instance Details page in the ApsaraMQ for RabbitMQ console. 
        factory.setCredentialsProvider(aliyunCredentialsProvider);
        // Specify whether to enable automatic connection recovery. If you set this parameter to true, automatic connection recovery is enabled. If you set this parameter to false, automatic connection recovery is disabled. 
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(5000);
        // The vhost name. Make sure that the vhost is created in the ApsaraMQ for RabbitMQ console. 
        factory.setVirtualHost(virtualHost);
        // The default port. Use port 5672 for non-encrypted connections and port 5671 for encrypted connections. 
        factory.setPort(5672);
        // Specify a timeout period based on the network environment. 
        factory.setConnectionTimeout(30 * 1000);
        factory.setHandshakeTimeout(30 * 1000);
        factory.setShutdownTimeout(0);
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
        channel.queueDeclare(queueName, true, false, false, new HashMap<String, Object>());
        channel.queueBind(queueName, exchangeName, bindingKey);
        // Start to send messages. In this example, 3,600 messages are sent and the interval between two consecutive messages is 1 second. The message sending lasts for 1 hour. 
        for (int i = 0; i < 3600; i++) {
            try {
                if (aliyunCredentialsProvider.isNearlyExpired()) {
                    // The authentication may have expired. Initiate the authentication request again.
                    System.out.println("[WARN] Token maybe expired, so try to update it.");
                    updateSTSProperties(aliyunCredentialsProvider);
                    factory.setCredentialsProvider(aliyunCredentialsProvider);
                    // After configurations are updated, reconnection is required. 
                    connection = factory.newConnection();
                    channel = connection.createChannel();
                }
                // Set ${ExchangeName} to an existing exchange in the ApsaraMQ for RabbitMQ console. Make sure that the exchange type is consistent with the exchange type in the console. 
                // Set ${BindingKey} to the corresponding binding key based on your business requirements. 
                AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(UUID.randomUUID().toString()).build();
                channel.basicPublish(exchangeName, bindingKey, true, props,
                        ("Send the message body-"  + i).getBytes(StandardCharsets.UTF_8));
                System.out.println("[SendResult] Message sent successfully, messageId: " + props.getMessageId());
                Thread.sleep(1000L);
            } catch (Exception e) {
                System.out.println("[ERROR] Send fail, error: " + e.getMessage());
                Thread.sleep(5000L);
            }
        }
        connection.close();
    }

    public static void updateSTSProperties(AliyunCredentialsProvider aliyunCredentialsProvider) throws ExecutionException, InterruptedException {
        System.out.println("Try to update STS properties");
        // We recommend that you configure the AccessKey pair in environment variables. If you save the AccessKey pair in the project code, data may be leaked. 
        aliyunCredentialsProvider.updateProperties(alibabaAccessKeyId, alibabaAccessKeySecret, region, roleARN);
    }
}

Subscribe to messages

Create, compile, and run ConsumerTest.java to subscribe to messages.

Important

Before you compile and run ConsumerTest.java to subscribe to messages, you must configure the parameters that are described in the Parameters table.

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;

public class ConsumerTest {
    // We recommend that you configure the AccessKey pair and ARN in environment variables. If you save the AccessKey pair and ARN in the project code, data may be leaked. 
    // The AccessKey ID of your Alibaba Cloud account. 
    private static final String alibabaAccessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
    // The AccessKey secret of your Alibaba Cloud account. 
    private static final String alibabaAccessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
    // The region where the Alibaba Cloud service resides. 
    private static final String region = System.getenv("ALIBABA_CLOUD_REGION");
    // The ARN of the RAM role. You can obtain the ARN in the ApsaraMQ for RabbitMQ console. 
    private static final String roleARN = System.getenv("ALIBABA_CLOUD_ROLE_ARN");
    // The endpoint of the ApsaraMQ for RabbitMQ instance. 
    private static final String hostName = "xxx.xxx.aliyuncs.com";
    private static final String instanceId = "${InstanceId}";
    // The vhost of the ApsaraMQ for RabbitMQ instance. 
    private static final String virtualHost = "${VirtualHost}";
    // The queue on the ApsaraMQ for RabbitMQ instance. 
    private static final String queueName = "${QueueName}";

    public static void main(String[] args) throws IOException, TimeoutException, ExecutionException, InterruptedException {
        ConnectionFactory factory = new ConnectionFactory();
        // The endpoint. You can view the endpoint of an instance on the Instance Details page in the ApsaraMQ for RabbitMQ console. 
        factory.setHost(hostName);
        // Replace ${instanceId} with the ID of the ApsaraMQ for RabbitMQ instance. You can view the instance ID on the Instances page in the ApsaraMQ for RabbitMQ console. 
        AliyunCredentialsProvider aliyunCredentialsProvider =
                new AliyunCredentialsProvider(instanceId);
        updateSTSProperties(aliyunCredentialsProvider);
        factory.setCredentialsProvider(aliyunCredentialsProvider);
        // Specify whether to enable automatic connection recovery. If you set this parameter to true, automatic connection recovery is enabled. If you set this parameter to false, automatic connection recovery is disabled. 
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(5000);
        // The vhost name. Make sure that the vhost is created in the ApsaraMQ for RabbitMQ console. 
        factory.setVirtualHost(virtualHost);
        // The default port. Use port 5672 for non-encrypted connections and port 5671 for encrypted connections. 
        factory.setPort(5672);
        factory.setConnectionTimeout(300 * 1000);
        factory.setHandshakeTimeout(300 * 1000);
        factory.setShutdownTimeout(0);
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        // Set ${QueueName} to an existing queue in the ApsaraMQ for RabbitMQ console. 
        AMQP.Queue.DeclareOk queueDeclareOk = channel.queueDeclare(queueName, true, false, false, new HashMap<String, Object>());
        consume(channel, queueName);
        System.out.println("Consumer started.");

        // Cyclically check whether the STS token is about to expire. If it has expired, update the connection and consume the messages again. 
        // In this example, the while statement is used to cyclically check whether the STS token is about to expire. 
        // You can also use scheduled tasks to implement scheduled checks and updates. 
        while (true) {
            // After each message is processed, the system determines whether the STS token is about to expire. 
            // If the STS token is about to expire, the system updates the authentication type. 
            // During the update, you must re-create the connection to ensure business running. 
            if (aliyunCredentialsProvider.isNearlyExpired()) {
                System.out.println("token maybe expired, so try to update it.");
                updateSTSProperties(aliyunCredentialsProvider);
                factory.setCredentialsProvider(aliyunCredentialsProvider);
                connection.close();
                connection = factory.newConnection();
                channel = connection.createChannel();
                // Consume the messages again. 
                consume(channel, queueName);
                System.out.println("Consumer started.");
            } else {
                // Check once every second. 
                Thread.sleep(1000);
            }
        }
    }

    public static void consume(Channel channel, String queueName) throws IOException {

        channel.basicConsume(queueName, false, "ConsumerTag", new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) {
                try {
                    // Process the received messages based on the message consumption logic. 
                    System.out.println("Received: " + new String(body, StandardCharsets.UTF_8) + ", deliveryTag: " + envelope.getDeliveryTag() + ", messageId: " + properties.getMessageId());
                    channel.basicAck(envelope.getDeliveryTag(), false);
                } catch (Exception e) {
                    System.out.println("Exception, cause:" + e.getMessage());
                }
            }
        });
    }

    public static void updateSTSProperties(AliyunCredentialsProvider aliyunCredentialsProvider) throws ExecutionException, InterruptedException {
        System.out.println("Try to update STS properties");
        aliyunCredentialsProvider.updateProperties(alibabaAccessKeyId, alibabaAccessKeySecret, region, roleARN);
    }
}

Query messages

If you want to check whether a message is sent to ApsaraMQ for RabbitMQ, you can query the message in the ApsaraMQ for RabbitMQ console. For more information, see Query messages.