全部產品
Search
文件中心

ApsaraMQ for RabbitMQ:主帳號和子帳號授權情境

更新時間:Jun 06, 2025

阿里雲的雲訊息佇列 RabbitMQ 版支援AMQP 0-9-1協議,相容開源的RabbitMQ用戶端,您可以使用開源的用戶端SDK接入雲訊息佇列 RabbitMQ 版服務端進行訊息收發。

前提條件

背景資訊

藉助存取控制RAM的RAM使用者,您可以實現阿里雲帳號(主帳號)和RAM使用者(子帳號)許可權分割,按需為RAM使用者賦予不同的許可權,並避免因暴露阿里雲帳號密鑰而造成安全風險。

收發訊息流程程(以Java語言為例)

開源SDK收發流程

說明

雲訊息佇列 RabbitMQ 版與開源RabbitMQ完全相容。更多語言SDK,請參見開源RabbitMQ AMQP協議支援的多語言或架構SDK

擷取存取點

您需要在雲訊息佇列 RabbitMQ 版控制台擷取執行個體的存取點。在收發訊息時,您需要為發布端和訂閱端配置該存取點,通過存取點接入雲訊息佇列 RabbitMQ 版執行個體。

  1. 登入雲訊息佇列 RabbitMQ 版控制台,然後在左側導覽列選擇实例列表

  2. 实例列表頁面的頂部功能表列選擇地區,然後在執行個體列表中,單擊目標執行個體名稱。

  3. 实例详情頁面的接入点信息頁簽,將滑鼠指標移動到目標類型的存取點,單擊該存取點右側的複製表徵圖,複製該存取點。

    類型

    說明

    樣本值

    公網存取點

    公網環境可讀寫。隨用隨付執行個體預設支援,預付費執行個體需在購買時選擇才支援。

    XXX.mq-amqp.cn-hangzhou-a.aliyuncs.com

    VPC存取點

    VPC環境可讀寫。隨用隨付執行個體和預付費執行個體預設都支援。

    XXX.mq-amqp.cn-hangzhou-a-internal.aliyuncs.com

安裝Java依賴庫

pom.xml添加以下依賴。

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.5.0</version> <!-- 支援開源所有版本 -->
</dependency>

產生使用者名稱密碼

開源RabbitMQ用戶端接入雲上服務時,需要先通過AccessKey IDAccessKey Secret產生使用者名稱和密碼,將使用者名稱和密碼設定到開源用戶端SDK的userNamepassWord參數中。雲訊息佇列 RabbitMQ 版會通過使用者名稱和密碼進行許可權認證。

  1. 登入雲訊息佇列 RabbitMQ 版控制台,然後在左側導覽列選擇实例列表

  2. 实例列表頁面的頂部功能表列選擇地區,然後在執行個體列表中,單擊目標執行個體名稱。

  3. 在左側導覽列,單擊用户和权限管理

  4. 用户和权限管理頁面,單擊创建用户名密码

  5. 创建用户名密码面板,輸入AccessKey IDAccessKey Secret,然後單擊確定

    說明

    AccessKey IDAccessKey Secret需要在阿里雲RAM控制台擷取,具體擷取方式,請參見建立AccessKey

    用户和权限管理頁面,顯示建立的靜態使用者名稱與密碼,密碼處於隱藏狀態。使用者名稱密碼

  6. 在建立的靜態使用者名稱密碼的密码列,單擊显示密码,可查看使用者名稱的密碼。

建立用戶端串連

建立串連管理工廠ConnectionFactory.java,用於啟動開源用戶端和雲訊息佇列 RabbitMQ 版服務端的串連。


import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.security.KeyManagementException;
import java.security.KeyStore;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.util.concurrent.TimeoutException;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManagerFactory;

public class ConnectionFactory {
    private final String hostName;
    private final int port;
    private final String userName;
    private final String password;
    private final String virtualHost;
    private final boolean enableSSL;

    public ConnectionFactory(String hostName, int port, String userName,
                            String password, String virtualHost, boolean enableSSL) {
        this.hostName = hostName;
        this.port = port;
        this.userName = userName;
        this.password = password;
        this.virtualHost = virtualHost;
        this.enableSSL = enableSSL;
    }

    public Channel createChannel() throws IOException, TimeoutException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
        //create a new con
        Connection con = createCon();

        //create a new channel
        return con.createChannel();
    }

    private Connection createCon() throws IOException, TimeoutException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
        com.rabbitmq.client.ConnectionFactory factory = new com.rabbitmq.client.ConnectionFactory();

        factory.setHost(hostName);
        factory.setUsername(userName);
        factory.setPassword(password);

        //設定為true,開啟Connection自動回復功能;設定為false,關閉Connection自動回復功能。
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(5000);
        factory.setVirtualHost(virtualHost);
        // 預設連接埠。
        factory.setPort(port);

        if (enableSSL) {
            setSSL(factory);
        }

        // 基於網路環境合理設定逾時時間。
        factory.setConnectionTimeout(30 * 1000);
        factory.setHandshakeTimeout(30 * 1000);
        factory.setShutdownTimeout(0);

        return factory.newConnection();
    }

    private void setSSL(com.rabbitmq.client.ConnectionFactory factory) throws NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
        SSLContext sslContext = SSLContext.getInstance("TLSv1.2");
        TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
        trustManagerFactory.init((KeyStore) null);
        sslContext.init(null, trustManagerFactory.getTrustManagers(), null);
        factory.useSslProtocol(sslContext);
    }

    public void closeCon(Channel channel) {
        if (channel != null && channel.getConnection() != null) {
            try {
                channel.getConnection().close();
            } catch (Throwable t) {
            }
        }
    }
}

建立生產者發送訊息

建立並編譯運行Producer.java

重要

編譯運行Producer.java生產訊息之前,您需要根據代碼提示資訊配置參數列表中所列舉的參數。

表 1. 參數列表

參數

樣本值

描述

hostName

1880770****.mq-amqp.cn-hangzhou-a.aliyuncs.com

雲訊息佇列 RabbitMQ 版執行個體的存取點。擷取方式,請參見擷取存取點

port

5672

  • 非加密連接埠:5672。預設為非加密連接埠。

  • 加密連接埠:5671。

userName

MjoxODgwNzcwODY5MD****

雲訊息佇列 RabbitMQ 版執行個體的靜態使用者名稱,用於許可權驗證。

擷取方式,請參見產生使用者名稱密碼

passWord

NDAxREVDQzI2MjA0OT****

雲訊息佇列 RabbitMQ 版執行個體的靜態使用者名稱密碼,用於許可權驗證。

擷取方式,請參見產生使用者名稱密碼

virtualHost

vhost_test

雲訊息佇列 RabbitMQ 版執行個體的Vhost,需要提前建立。

具體操作,請參見前提條件

exchangeName

ExchangeTest

雲訊息佇列 RabbitMQ 版的Exchange。

routingKey

RoutingKeyTest

雲訊息佇列 RabbitMQ 版Exchange與Queue綁定關係的Routing Key。

queueName

QueueTest

雲訊息佇列 RabbitMQ 版的Queue。

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.AlreadyClosedException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmCallback;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.util.UUID;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeoutException;

public class Producer {
    //設定為雲訊息佇列 RabbitMQ 版執行個體的存取點。
    public static final String hostName = "1880770****.mq-amqp.cn-hangzhou-a.aliyuncs.com";
    //設定為雲訊息佇列 RabbitMQ 版執行個體的靜態使用者名稱。
    public static final String userName = "MjoxODgwNzcwODY5MD****";
    //設定為雲訊息佇列 RabbitMQ 版執行個體的靜態使用者名稱密碼。
    public static final String password = "NDAxREVDQzI2MjA0OT****";
    //設定為雲訊息佇列 RabbitMQ 版執行個體的Vhost名稱。
    public static final String virtualHost = "vhost_test";

    //如果使用5671連接埠,需要enableSSL設定為true。
    public static final int port = 5672;
    public static final boolean enableSSL = false;

    private Channel channel;
    private final ConcurrentNavigableMap<Long/*deliveryTag*/, String/*msgId*/> outstandingConfirms;
    private final ConnectionFactory factory;
    private final String exchangeName;
    private final String queueName;
    private final String routingKey;

    public Producer(ConnectionFactory factory, String exchangeName, String queueName, String routingKey) throws IOException, TimeoutException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
        this.factory = factory;
        this.outstandingConfirms = new ConcurrentSkipListMap<>();
        this.channel = factory.createChannel();
        this.exchangeName = exchangeName;
        this.queueName = queueName;
        this.routingKey = routingKey;
    }

    public static void main(String[] args) throws IOException, TimeoutException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
        //構建串連工廠。
        ConnectionFactory factory = new ConnectionFactory(hostName, port, userName, password, virtualHost, enableSSL);

        //初始化生產者。
        Producer producer = new Producer(factory, "ExchangeTest", "QueueTest", "RoutingKeyTest");

        //declare。
        producer.declare();

        producer.initChannel();

        //發送訊息。
        producer.doSend("hello,amqp");
    }

    private void initChannel() throws IOException {
        channel.confirmSelect();

        ConfirmCallback cleanOutstandingConfirms = (deliveryTag, multiple) -> {
            if (multiple) {
                ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(deliveryTag, true);

                for (Long tag : confirmed.keySet()) {
                    String msgId = confirmed.get(tag);
                    System.out.format("Message with msgId %s has been ack-ed. deliveryTag: %d, multiple: %b%n", msgId, tag, true);
                }

                confirmed.clear();
            } else {
                String msgId = outstandingConfirms.remove(deliveryTag);
                System.out.format("Message with msgId %s has been ack-ed. deliveryTag: %d, multiple: %b%n", msgId, deliveryTag, false);
            }
        };
        channel.addConfirmListener(cleanOutstandingConfirms, (deliveryTag, multiple) -> {
            String msgId = outstandingConfirms.get(deliveryTag);
            System.err.format("Message with msgId %s has been nack-ed. deliveryTag: %d, multiple: %b%n", msgId, deliveryTag, multiple);
            // send msg failed, re-publish
        });


        channel.addReturnListener(returnMessage -> System.out.println("return msgId=" + returnMessage.getProperties().getMessageId()));
    }

    private void declare() throws IOException {
        channel.exchangeDeclare(exchangeName, "direct", true);
        channel.queueDeclare(queueName, true, false, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);
    }
    

    private void doSend(String content) throws IOException, TimeoutException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
        try {
            String msgId = UUID.randomUUID().toString();
            AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(msgId).build();

            channel.basicPublish(exchangeName, routingKey, true, props, content.getBytes(StandardCharsets.UTF_8));

            outstandingConfirms.put(channel.getNextPublishSeqNo(), msgId);
        } catch (AlreadyClosedException e) {
            //need reconnect if channel is closed.
            String message = e.getMessage();

            System.out.println(message);

            if (channelClosedByServer(message)) {
                factory.closeCon(channel);
                channel = factory.createChannel();
                this.initChannel();
                doSend(content);
            } else {
                throw e;
            }
        }
    }

    private boolean channelClosedByServer(String errorMsg) {
        if (errorMsg != null
            && errorMsg.contains("channel.close")
            && errorMsg.contains("reply-code=541")
            && errorMsg.contains("reply-text=InternalError")) {
            return true;
        } else {
            return false;
        }
    }
}
說明

雲訊息佇列 RabbitMQ 版會對單一實例的TPS流量峰值進行限流,更多資訊,請參見執行個體限流最佳實務

建立消費者訂閱訊息

建立並編譯運行Consumer.java

重要

編譯運行Consumer.java訂閱訊息之前,您需要根據代碼提示資訊配置參數列表中所列舉的參數。


import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeoutException;

public class Consumer {
    //設定為雲訊息佇列 RabbitMQ 版執行個體的存取點。
    public static final String hostName = "1880770****.mq-amqp.cn-hangzhou-a.aliyuncs.com";
    //設定為雲訊息佇列 RabbitMQ 版執行個體的靜態使用者名稱。
    public static final String userName = "MjoxODgwNzcwODY5MD****";
    //設定為雲訊息佇列 RabbitMQ 版執行個體的靜態使用者名稱密碼。
    public static final String password = "NDAxREVDQzI2MjA0OT****";
    //設定為雲訊息佇列 RabbitMQ 版執行個體的Vhost名稱。
    public static final String virtualHost = "vhost_test";
    
    //如果使用5671連接埠,需要enableSSL設定為true。
    public static final int port = 5672;
    public static final boolean enableSSL = false;

    private final Channel channel;
    private final String queue;

    public Consumer(Channel channel, String queue) {
        this.channel = channel;
        this.queue = queue;
    }

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
        ConnectionFactory factory = new ConnectionFactory(hostName, port, userName, password, virtualHost, enableSSL);
        Channel channel = factory.createChannel();
        channel.basicQos(50);
         
        //設定為雲訊息佇列 RabbitMQ 版執行個體的Queue名稱。需要和生產者中設定的Queue名稱一致。
        Consumer consumer = new Consumer(channel, "queue-1");

        consumer.consume();
    }

    public void consume() throws IOException, InterruptedException {
        channel.basicConsume(queue, false, new DefaultConsumer(channel) {
            @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                byte[] body) throws IOException {

                //業務處理。
                System.out.println("receive: msgId=" + properties.getMessageId());

                //消費者需要在有效時間內提交ack,否則訊息會重新推送,最多推送16次。
                //若推送16次還未成功,則訊息被丟棄或者進入死信Exchange。
                //專業版執行個體的有效時間為1分鐘,企業版和Serverless執行個體為5分鐘,鉑金版執行個體為30分鐘。
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });

        CountDownLatch latch = new CountDownLatch(1);
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            try {
                channel.getConnection().close();
            } catch (IOException e) {
                System.out.println("close connection error." + e);
            }
            latch.countDown();
        }));
        latch.await();
    }
}
說明

雲訊息佇列 RabbitMQ 版與開源RabbitMQ完全相容。更多參數說明,請參見開源RabbitMQ用戶端文檔

結果驗證

您可以在雲訊息佇列 RabbitMQ 版控制台通過訊息查詢或軌跡查詢驗證訊息的收髮狀態和訊息軌跡。具體操作,請參見