This topic describes how a Java Message Service (JMS) client connects to Message Queue for RabbitMQ and send and receive messages in publish/subscribe (Pub/Sub) mode in the scenario where a pair of username and password is used for authentication. The username and password are generated by using the AccessKey pair of your Alibaba Cloud account or a Resource Access Management (RAM) user within the account.
Prerequisites
- Message Queue for RabbitMQ is activated. For more information, see Activate Message Queue for RabbitMQ.
- Create an instance
- Create a vhost
Background information
- Each message can have multiple consumers. After a producer sends a message to a topic,
all consumers that subscribe to the topic can consume the message.
- Producers and consumers have timing dependencies.
- If a consumer creates a nondurable subscription, the consumer must subscribe to a topic before a producer sends a message to the topic. In addition, the consumer must remain active to consume all the messages sent to the topic. If a message is sent when the consumer is inactive, the consumer cannot consume the message after it becomes active. For more information about the sample code, see Nondurable subscriptions.
- If a consumer creates a durable subscription, the consumer must subscribe to a topic before a producer sends a message to the topic. However, the consumer does not need to remain active to consume all the messages sent to the topic. If a message is sent when the consumer is inactive, the consumer can consume the message after it becomes active. For more information about the sample code, see Durable subscriptions.
- If an open source RabbitMQ client needs to access Alibaba Cloud services, you must
use the AccessKey ID and AccessKey secret of your Alibaba Cloud account or a Resource
Access Management (RAM) user within the account to generate a pair of username and
password. In the code of open source RabbitMQ SDK, set the userName parameter to the generated username and set the passWord parameter to the generated password. Message Queue for RabbitMQ authenticates the RabbitMQ client by using the pair of username and password.
Notice If you use an SDK to send or receive messages on your client, we recommend that you use persistent connections. This way, the client does not need to establish connections every time you use the client to send or receive messages. Frequent creation of connections consumes a large number of network and broker resources and may even trigger protection against SYN flood attacks on the broker. For more information, see Connection.
Messaging process

Obtain an endpoint
You must obtain an endpoint of your instance in the Message Queue for RabbitMQ console. Before you send and receive messages, you must specify the endpoint in the code of the producer and consumer. Then, the producer and consumer can access the Message Queue for RabbitMQ instance by using the endpoint.
Add JMS dependencies
Add the following dependencies to the pom.xml file:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.5.0</version> <!-- All versions of open source RabbitMQ are supported. -->
</dependency>
<dependency>
<groupId>com.alibaba.mq-amqp.jms</groupId>
<artifactId>mq-amqp-jms-client</artifactId>
<version>1.11.2-1.0.0</version>
<exclusions>
<exclusion>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.4</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.25</version>
<scope>compile</scope>
</dependency>
Generate a pair of username and password
For more information, see Create a static username/password pair.
Nondurable subscriptions
Create, compile, and run the Subscriber.java class.
Parameter | Example | Description |
---|---|---|
hostName | 1880770****.mq-amqp.cn-hangzhou-a.aliyuncs.com | The endpoint of the Message Queue for RabbitMQ instance. In the Message Queue for RabbitMQ console, you can obtain the endpoint on the Instance Details page. |
Port | 5672 | The default port. Use port 5672 for non-encrypted connections and port 5671 for encrypted connections. |
userName | MjoxODgwNzcwODY5MD**** | The static username that is generated in the Message Queue for RabbitMQ console. Message Queue for RabbitMQ encodes the AccessKey pair of your Alibaba Cloud account or a RAM user within the account and the ID of the Message Queue for RabbitMQ instance in Base64 to obtain a static username. In the Message Queue for RabbitMQ console, you can obtain the static username on the Static Accounts page. |
passWord | NDAxREVDQzI2MjA0OT**** | The static password that is generated in the Message Queue for RabbitMQ console. Message Queue for RabbitMQ uses the HMAC-SHA1 algorithm to generate a signature based on the AccessKey secret of your Alibaba Cloud account or a RAM user within the account and the timestamp parameter, which indicates the current system time. Then, Message Queue for RabbitMQ encodes the signature and the timestamp parameter in Base64 to obtain a static password. In the Message Queue for RabbitMQ console, you can obtain the static password on the Static Accounts page. |
virtualHost | Test | The vhost that you created in the Message Queue for RabbitMQ instance. In the Message Queue for RabbitMQ console, you can view the vhost on the vhosts page. For more information about how to view a vhost, see View vhost details. |
import com.rabbitmq.jms.admin.RMQConnectionFactory;
import com.rabbitmq.jms.client.AmqpConnectionFactoryPostProcessor;
import javax.jms.*;
public class Subscriber {
public static String DESTINATION = "systemA.systemB.Price.*";
public static RMQConnectionFactory getRMQConnectionFactory() {
RMQConnectionFactory connectionFactory = new RMQConnectionFactory();
connectionFactory.setHost("xxx.xxx.aliyuncs.com");
connectionFactory.setUsername("${Username}");
connectionFactory.setPassword("${Password}");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("VhostName");
connectionFactory.setAmqpConnectionFactoryPostProcessor(new AmqpConnectionFactoryPostProcessor() {
@Override
public void postProcess(com.rabbitmq.client.ConnectionFactory cf) {
cf.setAutomaticRecoveryEnabled(true);
cf.setNetworkRecoveryInterval(5000);
System.out.println(cf.getPassword());
}
});
return connectionFactory;
}
public static void main(String[] args) throws Exception {
RMQConnectionFactory connectionFactory = getRMQConnectionFactory();
TopicConnection connection = connectionFactory.createTopicConnection();
TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
Topic destTopic = session.createTopic(DESTINATION);
TopicSubscriber subscriber = session.createSubscriber(destTopic);
connection.start();
while (true) {
Message msg = subscriber.receive(1000);
if (msg == null) {
System.out.println("No new message, sleeping 5 secs");
Thread.sleep(5 * 1000);
continue;
}
if (msg instanceof TextMessage) {
String body = ((TextMessage) msg).getText();
System.out.println(body);
} else {
System.out.println("Unexpected message type: " + msg.getClass());
}
}
}
}
Durable subscriptions
Create, compile, and run the DurableSubscriber.java class.
import com.rabbitmq.jms.admin.RMQConnectionFactory;
import com.rabbitmq.jms.client.AmqpConnectionFactoryPostProcessor;
import javax.jms.*;
public class DurableSubscriber {
public static String DESTINATION = "systemA.systemB.Price.*";
public static String CLIENT_ID = "client_id";
public static String SUBSCRIBER_NAME = "subscriber_name";
public static RMQConnectionFactory getRMQConnectionFactory() {
RMQConnectionFactory connectionFactory = new RMQConnectionFactory();
connectionFactory.setHost("xxx.xxx.aliyuncs.com");
connectionFactory.setUsername("${Username}");
connectionFactory.setPassword("${Password}");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("${VhostName}");
connectionFactory.setAmqpConnectionFactoryPostProcessor(new AmqpConnectionFactoryPostProcessor() {
@Override
public void postProcess(com.rabbitmq.client.ConnectionFactory cf) {
cf.setAutomaticRecoveryEnabled(true);
cf.setNetworkRecoveryInterval(5000);
System.out.println(cf.getPassword());
}
});
return connectionFactory;
}
public static void main(String[] args) throws Exception {
RMQConnectionFactory connectionFactory = getRMQConnectionFactory();
TopicConnection connection = connectionFactory.createTopicConnection();
connection.setClientID(CLIENT_ID);
TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
Topic Topic = session.createTopic(DESTINATION);
TopicSubscriber subscriber = session.createDurableSubscriber(Topic,SUBSCRIBER_NAME);
connection.start();
while (true) {
Message msg = subscriber.receive(1000);
if (msg == null) {
System.out.println("No new message, sleeping 5 secs");
Thread.sleep(5 * 1000);
continue;
}
if (msg instanceof TextMessage) {
String body = ((TextMessage) msg).getText();
System.out.println(body);
} else {
System.out.println("Unexpected message type: " + msg.getClass());
}
}
}
}
Send messages
Create, compile, and run the Publisher.java class.
import com.rabbitmq.jms.admin.RMQConnectionFactory;
import com.rabbitmq.jms.client.AmqpConnectionFactoryPostProcessor;
import javax.jms.*;
public class Publisher {
public static String DESTINATION = "systemA.systemB.Price.aaa";
public static RMQConnectionFactory getRMQConnectionFactory() {
RMQConnectionFactory connectionFactory = new RMQConnectionFactory();
connectionFactory.setHost("xxx.xxx.aliyuncs.com");
connectionFactory.setUsername("${Username}");
connectionFactory.setPassword("${Password}");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("${VhostName}");
connectionFactory.setAmqpConnectionFactoryPostProcessor(new AmqpConnectionFactoryPostProcessor() {
@Override
public void postProcess(com.rabbitmq.client.ConnectionFactory cf) {
cf.setAutomaticRecoveryEnabled(true);
cf.setNetworkRecoveryInterval(5000);
System.out.println(cf.getPassword());
}
});
return connectionFactory;
}
public static void main(String[] args) throws JMSException {
RMQConnectionFactory factory = getRMQConnectionFactory();
TopicConnection connection = factory.createTopicConnection();
TopicSession session = connection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE);
TextMessage msg = session.createTextMessage("hello topic test1");
Topic topic = session.createTopic(DESTINATION);
TopicPublisher publisher = session.createPublisher(topic);
publisher.send(msg);
System.out.println("Message sent");
session.close();
connection.close();
}
}