This topic describes how a Java Message Service (JMS) client connects to Message Queue for RabbitMQ and send and receive messages in point-to-point (P2P) mode in the scenario where a Resource Access Management (RAM) role is used to grant permissions across Alibaba Cloud accounts.
Prerequisites
- Message Queue for RabbitMQ is activated. For more information, see Activate Message Queue for RabbitMQ.
- Create an instance
- Create a vhost
Background information
- JMS P2P
The JMS point-to-point (P2P) messaging model has the following features:
- Each message has only one consumer. After a producer sends a message to a queue, a
specified consumer registered for the queue can receive the message.
- Producers and consumers have no timing dependencies. Producers can send messages to queues regardless of whether consumers are active. Consumers can receive messages from queues regardless of whether producers are active.
- Each message has only one consumer. After a producer sends a message to a queue, a
specified consumer registered for the queue can receive the message.
- If you need to use the temporary Security Token Service (STS) token generated for
a RAM role to access Message Queue for RabbitMQ, you must set the AccessKeyID, AccessKeySecret, and SecurityToken parameters in the AliyunCredentialsProvider class for permission verification.
Notice If you use an SDK to send or receive messages on your client, we recommend that you use persistent connections. This way, the client does not need to establish connections every time you use the client to send or receive messages. Frequent creation of connections consumes a large number of network and broker resources and may even trigger protection against SYN flood attacks on the broker. For more information, see Connection.
Messaging process

Obtain an endpoint
You must obtain an endpoint of your instance in the Message Queue for RabbitMQ console. Before you send and receive messages, you must specify the endpoint in the code of the producer and consumer. Then, the producer and consumer can access the Message Queue for RabbitMQ instance by using the endpoint.
Add JMS dependencies
Add the following dependencies to the pom.xml file:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.5.0</version> <!-- All versions of open source RabbitMQ are supported. -->
</dependency>
<dependency>
<groupId>com.alibaba.mq-amqp.jms</groupId>
<artifactId>mq-amqp-jms-client</artifactId>
<version>1.11.2-1.0.0</version>
<exclusions>
<exclusion>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.alibaba.mq-amqp</groupId>
<artifactId>mq-amqp-client</artifactId>
<version>1.0.5</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.4</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.25</version>
<scope>compile</scope>
</dependency>
Configure AliyunCredentialsProvider.java
Create a permission verification class AliyunCredentialsProvider.java and set the AccessKeyID, AccessKeySecret, and SecurityToken parameters based on the comments in the code. For more information, see Table 1.
Parameter | Example | Description |
---|---|---|
hostName | 1880770****.mq-amqp.cn-hangzhou-a.aliyuncs.com | The endpoint of the Message Queue for RabbitMQ instance. In the Message Queue for RabbitMQ console, you can obtain the endpoint on the Instance Details page. |
Port | 5672 | The default port. Use port 5672 for non-encrypted connections and port 5671 for encrypted connections. |
AccessKeyID | LTAI5tJQKnB9zVvQ**** | The AccessKey ID of your Alibaba Cloud account or a RAM user within the account. You can log on to the RAM console, create a RAM role, grant the role the AliyunAMQPFullAccess permission, obtain the Alibaba Cloud Resource Name (ARN) of the RAM role, and then call the AssumeRole operation to obtain a temporary identity to assume the role. A successful call to the AssumeRole operation returns values of the AccessKeyID, AccessKeySecret, and SecurityToken of the RAM role. For more information about the ARN of a RAM role, see RAM role overview. |
AccessKeySecret | jw6MawfKOVBveRr84u**** | The AccessKey secret of your Alibaba Cloud account or a RAM user within the account. |
SecurityToken | CAIS9wF1q6Ft5B2yfSjIr5fkJNPkvK5tgqeScX+ElnkMXvhvgIPO0Dz2IHhNfXZuB************ | The STS token of the RAM role. |
instanceId | amqp-cn-v0h1kb9nu*** | The ID of the Message Queue for RabbitMQ instance. In the Message Queue for RabbitMQ console, you can view the instance ID on the Instance Details page. For more information about how to view the ID of an instance, see View instance details. |
virtualHost | Test | The vhost that you created in the Message Queue for RabbitMQ instance. In the Message Queue for RabbitMQ console, you can view the vhost on the vhosts page. For more information about how to view a vhost, see View vhost details. |
import com.alibaba.mq.amqp.utils.UserUtils;
import com.rabbitmq.client.impl.CredentialsProvider;
import org.apache.commons.lang3.StringUtils;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
/**
* The AliyunCredentialsProvider class that is used to generate a pair of dynamic username and password.
*/
public class AliyunCredentialsProvider implements CredentialsProvider {
/**
* Access Key ID.
*/
private final String accessKeyId;
/**
* Access Key Secret.
*/
private final String accessKeySecret;
/**
* security temp token. (optional)
*/
private final String securityToken;
/**
* The ID of the Message Queue for RabbitMQ instance. You can obtain the instance ID in the Message Queue for RabbitMQ console.
*/
private final String instanceId;
public AliyunCredentialsProvider(final String accessKeyId, final String accessKeySecret,
final String instanceId) {
this(accessKeyId, accessKeySecret, null, instanceId);
}
public AliyunCredentialsProvider(final String accessKeyId, final String accessKeySecret,
final String securityToken, final String instanceId) {
this.accessKeyId = accessKeyId;
this.accessKeySecret = accessKeySecret;
this.securityToken = securityToken;
this.instanceId = instanceId;
}
@Override
public String getUsername() {
if(StringUtils.isNotEmpty(securityToken)) {
return UserUtils.getUserName(accessKeyId, instanceId, securityToken);
} else {
return UserUtils.getUserName(accessKeyId, instanceId);
}
}
@Override
public String getPassword() {
try {
return UserUtils.getPassord(accessKeySecret);
} catch (InvalidKeyException e) {
//todo
} catch (NoSuchAlgorithmException e) {
//todo
}
return null;
}
}
Configure an ObjectMessage object
import java.io.Serializable;
public class Person implements Serializable{
private static final long serialVersionUID = -5809782578272943999L;
String name = null;
int age = 0;
String address =null;
public Person(String name, int age, String address){
this.name = name;
this.age = age;
this.address = address;
}
public String getName(){
return name;
}
public int getAge(){
return age;
}
public String getAddress(){
return address;
}
}
Send messages
Create, compile, and run the QueueSend.java class to send messages.
import com.rabbitmq.jms.admin.RMQConnectionFactory;
import com.rabbitmq.jms.admin.RMQDestination;
import com.rabbitmq.jms.client.AmqpConnectionFactoryPostProcessor;
import javax.jms.*;
import java.io.Serializable;
public class QueueSend {
public void sendTextMsg(Session session, String MsgContent, String name) throws JMSException {
Destination queue = new RMQDestination(name, true, false);
MessageProducer msgProducer = session.createProducer(queue);
Message msg = session.createTextMessage(MsgContent);
msgProducer.send(msg);
System.out.println("Text message sent");
}
public void sendMap(Session session, MapMessage map, String name) throws JMSException {
Destination queue = new RMQDestination(name, true, false);
MessageProducer msgProducer = session.createProducer(queue);
msgProducer.send(map);
System.out.println("Map message sent");
}
public void sendObj(Session session, Person obj, String name) throws JMSException {
Destination queue = new RMQDestination(name, true, false);
MessageProducer msgProducer = session.createProducer(queue);
ObjectMessage objMsg = session.createObjectMessage((Serializable) obj);
MessageProducer msgPorducer = session.createProducer(queue);
msgPorducer.send(objMsg);
System.out.println("Object message sent");
}
public static void main(String[] args) throws JMSException {
final AliyunCredentialsProvider provider = new AliyunCredentialsProvider("${AccessKeyID}", "${AccessKeySecret}","${SecurityToken}","${InstanceID}");
RMQConnectionFactory connectionFactory = new RMQConnectionFactory();
connectionFactory.setHost("xxx.xxx.aliyuncs.com");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("${VhostName}");
connectionFactory.setAmqpConnectionFactoryPostProcessor(new AmqpConnectionFactoryPostProcessor() {
@Override
public void postProcess(com.rabbitmq.client.ConnectionFactory cf) {
cf.setCredentialsProvider(provider);
cf.setAutomaticRecoveryEnabled(true);
cf.setNetworkRecoveryInterval(5000);
System.out.println(cf.getPassword());
}
});
Connection connection = connectionFactory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
QueueSend qs = new QueueSend();
qs.sendTextMsg(session, "Send a JMS text message", "queue.msgText");
MapMessage mapMsg = session.createMapMessage();
mapMsg.setString("name", "Li");
mapMsg.setBoolean("IsHero", false);
mapMsg.setInt("age", 23);
qs.sendMap(session, mapMsg, "queue.msgMap");
Person person = new Person("Li", 23, "Daxing, Beijing");
qs.sendObj(session, person, "queue.msgObj");
session.close();
connection.close();
}
}
Subscribe to messages
Create, compile, and run the QueueAccept.java class to subscribe to messages.
import com.rabbitmq.jms.admin.RMQConnectionFactory;
import com.rabbitmq.jms.admin.RMQDestination;
import com.rabbitmq.jms.client.AmqpConnectionFactoryPostProcessor;
import javax.jms.*;
public class QueueAccept implements MessageListener {
public void onMessage(Message message) {
if (message instanceof TextMessage) {
TextMessage text = (TextMessage) message;
try {
System.out.println("Content of the text message" + text.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
if (message instanceof MapMessage) {
MapMessage map = (MapMessage) message;
try {
System.out.println("Name:" + map.getString("name"));
System.out.println("Hero:" + map.getBoolean("IsHero"));
System.out.println("Age:" + map.getInt("age"));
} catch (JMSException e) {
e.printStackTrace();
}
}
if (message instanceof ObjectMessage) {
ObjectMessage objMsg = (ObjectMessage) message;
try {
Person person = (Person) objMsg.getObject();
System.out.println("Username:" + person.getName() + ", Age:" + person.getAge() + ", Address:" + person.getAddress());
} catch (JMSException e) {
e.printStackTrace();
}
}
try {
message.acknowledge();
System.out.println("Manually acknowledge the message");
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws JMSException {
RMQConnectionFactory connectionFactory = null;
Connection connection = null;
Session session = null;
if (connectionFactory == null) {
final AliyunCredentialsProvider provider = new AliyunCredentialsProvider("${AccessKeyID}", "${AccessKeySecret}","${SecurityToken}","${InstanceID}");
connectionFactory = new RMQConnectionFactory();
connectionFactory.setHost("xxx.xxx.aliyuncs.com");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("${VhostName}");
connectionFactory.setAmqpConnectionFactoryPostProcessor(new AmqpConnectionFactoryPostProcessor() {
@Override
public void postProcess(com.rabbitmq.client.ConnectionFactory cf) {
cf.setCredentialsProvider(provider);
cf.setAutomaticRecoveryEnabled(true);
cf.setNetworkRecoveryInterval(5000);
System.out.println(cf.getPassword());
}
});
}
if (connection == null) {
connection = connectionFactory.createConnection();
connection.start();
}
session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Destination destination = new RMQDestination("queue.msgText", true, false);
MessageConsumer consumer = session.createConsumer(destination);
consumer.setMessageListener(new QueueAccept());
Destination queue1 = new RMQDestination("queue.msgMap", true, false);
MessageConsumer consumer1 = session.createConsumer(queue1);
consumer1.setMessageListener(new QueueAccept());
Destination queue2 = new RMQDestination("queue.msgObj", true, false);
MessageConsumer consumer2 = session.createConsumer(queue2);
consumer2.setMessageListener(new QueueAccept());
}
}