本文以Java SDK為例,說明如何將開源SDK用戶端接入雲訊息佇列 RabbitMQ 版服務端,並完成訊息收發。
前提條件
擷取執行個體存取點
在收發訊息時,您需要為發布端和訂閱端配置該存取點,用戶端通過存取點接入雲訊息佇列 RabbitMQ 版執行個體。
登入雲訊息佇列 RabbitMQ 版控制台,然後在左側導覽列選擇实例列表。
在实例列表頁面的頂部功能表列選擇地區,然後在執行個體列表中,單擊目標執行個體名稱。
在实例详情頁面的接入点信息頁簽,將滑鼠指標移動到目標類型的存取點,單擊該存取點右側的
表徵圖,複製該存取點。類型
說明
樣本值
公網存取點
公網環境可讀寫。隨用隨付執行個體預設支援,預付費執行個體需在購買時選擇才支援。
XXX.net.mq.amqp.aliyuncs.com
VPC存取點
VPC環境可讀寫。隨用隨付執行個體和預付費執行個體預設都支援。
XXX.vpc.mq.amqp.aliyuncs.com
安裝Java依賴庫
在IDEA中建立一個Java工程。
在pom.xml檔案中添加以下依賴引入Java依賴庫。
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.5.0</version> <!-- 支援開源所有版本 --> </dependency>
建立使用者名稱密碼
開源RabbitMQ用戶端接入雲上服務時,建立使用者名稱密碼,將使用者名稱和密碼設定到開源用戶端SDK的userName和passWord參數中。雲訊息佇列 RabbitMQ 版會通過使用者名稱和密碼進行許可權認證。
根據執行個體的身份許可權管理員模式,選擇以下一種方式建立使用者名稱和密碼:
開源身分識別驗證和許可權管理方式
登入雲訊息佇列 RabbitMQ 版控制台,然後在左側導覽列選擇实例列表。
在实例列表頁面的頂部功能表列選擇地區,然後在執行個體列表中,單擊目標執行個體名稱。
在左側導覽列,單擊用户和权限管理。
在用户和权限管理頁面,單擊创建用户名密码。
在创建用户名密码面板,輸入用户名、密码和确认密码等資訊,單擊確定。
說明建立使用者名稱密碼完成後,還需要對該使用者進行授權,具體操作請參見許可權管理。
阿里雲存取控制(RAM)
登入雲訊息佇列 RabbitMQ 版控制台,然後在左側導覽列選擇实例列表。
在实例列表頁面的頂部功能表列選擇地區,然後在執行個體列表中,單擊目標執行個體名稱。
在左側導覽列,單擊用户和权限管理。
在用户和权限管理頁面,單擊创建用户名密码。
在创建用户名密码面板,輸入AccessKey ID和AccessKey Secret,然後單擊確定。
說明AccessKey ID和AccessKey Secret需要在阿里雲RAM控制台擷取,具體擷取方式,請參見建立AccessKey。
用户和权限管理頁面,顯示建立的靜態使用者名稱與密碼,密碼處於隱藏狀態。

在建立的靜態使用者名稱密碼的密码列,單擊显示密码,可查看使用者名稱的密碼。
建立用戶端串連
建立串連管理工廠ConnectionFactory.java,用於啟動開源用戶端和雲訊息佇列 RabbitMQ 版服務端的串連。
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.security.KeyManagementException;
import java.security.KeyStore;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.util.concurrent.TimeoutException;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManagerFactory;
public class ConnectionFactory {
private final String hostName;
private final int port;
private final String userName;
private final String password;
private final String virtualHost;
private final boolean enableSSL;
public ConnectionFactory(String hostName, int port, String userName,
String password, String virtualHost, boolean enableSSL) {
this.hostName = hostName;
this.port = port;
this.userName = userName;
this.password = password;
this.virtualHost = virtualHost;
this.enableSSL = enableSSL;
}
public Channel createChannel() throws IOException, TimeoutException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
//create a new con
Connection con = createCon();
//create a new channel
return con.createChannel();
}
private Connection createCon() throws IOException, TimeoutException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
com.rabbitmq.client.ConnectionFactory factory = new com.rabbitmq.client.ConnectionFactory();
factory.setHost(hostName);
factory.setUsername(userName);
factory.setPassword(password);
//設定為true,開啟Connection自動回復功能;設定為false,關閉Connection自動回復功能。
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(5000);
factory.setVirtualHost(virtualHost);
// 預設連接埠。
factory.setPort(port);
if (enableSSL) {
setSSL(factory);
}
// 基於網路環境合理設定逾時時間。
factory.setConnectionTimeout(30 * 1000);
factory.setHandshakeTimeout(30 * 1000);
factory.setShutdownTimeout(0);
return factory.newConnection();
}
private void setSSL(com.rabbitmq.client.ConnectionFactory factory) throws NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
SSLContext sslContext = SSLContext.getInstance("TLSv1.2");
TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
trustManagerFactory.init((KeyStore) null);
sslContext.init(null, trustManagerFactory.getTrustManagers(), null);
factory.useSslProtocol(sslContext);
}
public void closeCon(Channel channel) {
if (channel != null && channel.getConnection() != null) {
try {
channel.getConnection().close();
} catch (Throwable t) {
}
}
}
}生產訊息
在已建立的Java工程中,建立訊息發送程式Producer.java,按照SDK參數填寫說明配置相關參數並運行。有關發送訊息的注意事項,請參見生產訊息時需要注意什嗎?。
範例程式碼如下:
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.AlreadyClosedException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmCallback;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.util.UUID;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeoutException;
public class Producer {
//設定為雲訊息佇列 RabbitMQ 版執行個體的存取點。
public static final String hostName = "1880770****.mq-amqp.cn-hangzhou-a.aliyuncs.com";
//設定為雲訊息佇列 RabbitMQ 版執行個體的靜態使用者名稱。
public static final String userName = "MjoxODgwNzcwODY5MD****";
//設定為雲訊息佇列 RabbitMQ 版執行個體的靜態使用者名稱密碼。
public static final String password = "NDAxREVDQzI2MjA0OT****";
//設定為雲訊息佇列 RabbitMQ 版執行個體的Vhost名稱。
public static final String virtualHost = "vhost_test";
//如果使用5671連接埠,需要enableSSL設定為true。
public static final int port = 5672;
public static final boolean enableSSL = false;
private Channel channel;
private final ConcurrentNavigableMap<Long/*deliveryTag*/, String/*msgId*/> outstandingConfirms;
private final ConnectionFactory factory;
private final String exchangeName;
private final String queueName;
private final String routingKey;
public Producer(ConnectionFactory factory, String exchangeName, String queueName, String routingKey) throws IOException, TimeoutException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
this.factory = factory;
this.outstandingConfirms = new ConcurrentSkipListMap<>();
this.channel = factory.createChannel();
this.exchangeName = exchangeName;
this.queueName = queueName;
this.routingKey = routingKey;
}
public static void main(String[] args) throws IOException, TimeoutException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
//構建串連工廠。
ConnectionFactory factory = new ConnectionFactory(hostName, port, userName, password, virtualHost, enableSSL);
//初始化生產者。
Producer producer = new Producer(factory, "ExchangeTest", "QueueTest", "RoutingKeyTest");
//declare。
producer.declare();
producer.initChannel();
//發送訊息。
producer.doSend("hello,amqp");
}
private void initChannel() throws IOException {
channel.confirmSelect();
ConfirmCallback cleanOutstandingConfirms = (deliveryTag, multiple) -> {
if (multiple) {
ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(deliveryTag, true);
for (Long tag : confirmed.keySet()) {
String msgId = confirmed.get(tag);
System.out.format("Message with msgId %s has been ack-ed. deliveryTag: %d, multiple: %b%n", msgId, tag, true);
}
confirmed.clear();
} else {
String msgId = outstandingConfirms.remove(deliveryTag);
System.out.format("Message with msgId %s has been ack-ed. deliveryTag: %d, multiple: %b%n", msgId, deliveryTag, false);
}
};
channel.addConfirmListener(cleanOutstandingConfirms, (deliveryTag, multiple) -> {
String msgId = outstandingConfirms.get(deliveryTag);
System.err.format("Message with msgId %s has been nack-ed. deliveryTag: %d, multiple: %b%n", msgId, deliveryTag, multiple);
// send msg failed, re-publish
});
channel.addReturnListener(returnMessage -> System.out.println("return msgId=" + returnMessage.getProperties().getMessageId()));
}
private void declare() throws IOException {
channel.exchangeDeclare(exchangeName, "direct", true);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
}
private void doSend(String content) throws IOException, TimeoutException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
try {
String msgId = UUID.randomUUID().toString();
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(msgId).build();
channel.basicPublish(exchangeName, routingKey, true, props, content.getBytes(StandardCharsets.UTF_8));
outstandingConfirms.put(channel.getNextPublishSeqNo(), msgId);
} catch (AlreadyClosedException e) {
//need reconnect if channel is closed.
String message = e.getMessage();
System.out.println(message);
if (channelClosedByServer(message)) {
factory.closeCon(channel);
channel = factory.createChannel();
this.initChannel();
doSend(content);
} else {
throw e;
}
}
}
private boolean channelClosedByServer(String errorMsg) {
if (errorMsg != null
&& errorMsg.contains("channel.close")
&& errorMsg.contains("reply-code=541")
&& errorMsg.contains("reply-text=InternalError")) {
return true;
} else {
return false;
}
}
}雲訊息佇列 RabbitMQ 版會對單一實例的TPS流量峰值進行限流,更多限流資訊,請參見執行個體限流最佳實務。
訂閱訊息
在已建立的Java工程中,建立訊息訂閱程式Consumer.java,按照SDK參數填寫說明配置相關參數並運行。
範例程式碼如下:
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeoutException;
public class Consumer {
//設定為雲訊息佇列 RabbitMQ 版執行個體的存取點。
public static final String hostName = "1880770****.mq-amqp.cn-hangzhou-a.aliyuncs.com";
//設定為雲訊息佇列 RabbitMQ 版執行個體的靜態使用者名稱。
public static final String userName = "MjoxODgwNzcwODY5MD****";
//設定為雲訊息佇列 RabbitMQ 版執行個體的靜態使用者名稱密碼。
public static final String password = "NDAxREVDQzI2MjA0OT****";
//設定為雲訊息佇列 RabbitMQ 版執行個體的Vhost名稱。
public static final String virtualHost = "vhost_test";
//如果使用5671連接埠,需要enableSSL設定為true。
public static final int port = 5672;
public static final boolean enableSSL = false;
private final Channel channel;
private final String queue;
public Consumer(Channel channel, String queue) {
this.channel = channel;
this.queue = queue;
}
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
ConnectionFactory factory = new ConnectionFactory(hostName, port, userName, password, virtualHost, enableSSL);
Channel channel = factory.createChannel();
channel.basicQos(50);
//設定為雲訊息佇列 RabbitMQ 版執行個體的Queue名稱。需要和生產者中設定的Queue名稱一致。
Consumer consumer = new Consumer(channel, "queue-1");
consumer.consume();
}
public void consume() throws IOException, InterruptedException {
channel.basicConsume(queue, false, new DefaultConsumer(channel) {
@Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
//業務處理。
System.out.println("receive: msgId=" + properties.getMessageId());
//消費者需要在有效時間內提交ack,否則訊息會重新推送,最多推送16次。
//若推送16次還未成功,則訊息被丟棄或者進入死信Exchange。
//專業版執行個體的有效時間為1分鐘,企業版和Serverless執行個體為5分鐘,鉑金版執行個體為30分鐘。
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
CountDownLatch latch = new CountDownLatch(1);
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
channel.getConnection().close();
} catch (IOException e) {
System.out.println("close connection error." + e);
}
latch.countDown();
}));
latch.await();
}
}SDK參數填寫說明
參數 | 樣本值 | 描述 |
hostName | XXX.net.mq.amqp.aliyuncs.com | 雲訊息佇列 RabbitMQ 版執行個體存取點。擷取方式,請參見擷取執行個體存取點。 |
Port | 5672 | 預設連接埠。非加密連接埠為5672,加密連接埠為5671。 |
userName | MjoxODgwNzcwODY5MD**** | 用戶端接入雲訊息佇列 RabbitMQ 版服務端用於許可權認證的靜態使用者名稱。 需要提前在雲訊息佇列 RabbitMQ 版控制台建立。 具體操作,請參見建立使用者名稱密碼。 |
passWord | NDAxREVDQzI2MjA0OT**** | 用戶端接入雲訊息佇列 RabbitMQ 版服務端用於許可權認證的靜態使用者密碼。 需要提前在雲訊息佇列 RabbitMQ 版控制台建立。 具體操作,請參見建立使用者名稱密碼。 |
virtualHost | amqp_vhost | 雲訊息佇列 RabbitMQ 版執行個體的Vhost。需要提前在雲訊息佇列 RabbitMQ 版控制台建立。 具體操作,請參見步驟二:建立資源。 |
exchangeName | ExchangeTest | 雲訊息佇列 RabbitMQ 版的Exchange。 需要提前在雲訊息佇列 RabbitMQ 版控制台建立。 具體操作,請參見步驟二:建立資源。 |
queueName | QueueTest | 雲訊息佇列 RabbitMQ 版的Queue。 需要提前在雲訊息佇列 RabbitMQ 版控制台建立。 具體操作,請參見步驟二:建立資源。 |
routingKey | RoutingKeyTest | 雲訊息佇列 RabbitMQ 版Exchange與Queue綁定的Routing Key。 需要提前在雲訊息佇列 RabbitMQ 版控制台建立綁定關係。 具體操作,請參見步驟二:建立資源。 |
exchangeType | topic | Exchange的類型。雲訊息佇列 RabbitMQ 版支援的類型如下,更多資訊,請參見Exchange。
重要 請確保填寫的Exchange類型和您建立Exchange時選擇的類型一致。 |
相關文檔
雲訊息佇列 RabbitMQ 版與開源RabbitMQ完全相容,支援多語言SDK。更多語言SDK,請參見開源RabbitMQ AMQP協議支援的多語言或架構SDK,更多參數說明,請參見開源RabbitMQ用戶端文檔。
用戶端運行時若返回異常報錯,您可以參考錯誤碼說明查看異常原因和解決方案。
您可以在雲訊息佇列 RabbitMQ 版控制台通過訊息查詢或軌跡查詢驗證訊息的收髮狀態和訊息軌跡。具體操作,請參見查詢訊息和訊息軌跡。