本文介绍JMS客户端如何在RAM角色跨账号授权场景接入消息队列RabbitMQ版并实现P2P消息收发。
前提条件
背景信息
- JMS P2P
JMS P2P(点对点)消息收发模式具有以下特点:
- 一条消息被一个指定的订阅者消费。某个发布者向某个Queue(队列)发送消息后,某个指定的订阅者从该Queue接收消息。
- 发布者和订阅者之间不存在时间依赖性。发布者向Queue发送消息时不需要订阅者同时处于活跃状态,订阅者从Queue接收消息时同样不需发布者处于活跃状态。
- 一条消息被一个指定的订阅者消费。某个发布者向某个Queue(队列)发送消息后,某个指定的订阅者从该Queue接收消息。
- 当您需要通过RAM STS角色授权的方式访问消息队列RabbitMQ版服务时,需要通过阿里云提供的权限认证类(AliyunCredentialsProvider)设置 AccessKey ID、AccessKey Secret与SecurityToken进行权限认证才能访问。
重要 您的客户端在调用SDK收发消息时,请尽可能使用长期存活的Connection,以免每次收发消息时都需要创建新的Connection,消耗大量的网络资源和服务端资源,甚至引起服务端SYN Flood防护。更多信息,请参见Connection。
收发消息流程

获取接入点
您需要在消息队列RabbitMQ版控制台获取实例的接入点。在收发消息时,您需要为发布端和订阅端配置该接入点,通过接入点接入消息队列RabbitMQ版实例。
安装JMS依赖库
在pom.xml中添加以下依赖。
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.5.0</version> <!-- 支持开源所有版本 -->
</dependency>
<dependency>
<groupId>com.alibaba.mq-amqp.jms</groupId>
<artifactId>mq-amqp-jms-client</artifactId>
<version>1.11.2-1.0.0</version>
<exclusions>
<exclusion>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.alibaba.mq-amqp</groupId>
<artifactId>mq-amqp-client</artifactId>
<version>1.0.5</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.4</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.25</version>
<scope>compile</scope>
</dependency>
配置权限认证类(AliyunCredentialsProvider)
创建权限认证类AliyunCredentialsProvider.java,根据代码提示信息,设置 AccessKeyID、AccessKeySecret与SecurityToken参数。具体信息,请参见参数列表。
参数 | 示例值 | 描述 |
---|---|---|
hostName | 1880770****.mq-amqp.cn-hangzhou-a.aliyuncs.com | 消息队列RabbitMQ版实例接入点。您可以在消息队列RabbitMQ版控制台的实例详情页面获取。 |
Port | 5672 | 默认端口。非加密端口为5672,加密端口为5671。 |
AccessKeyID | LTAI5tJQKnB9zVvQ**** | 阿里云账号或RAM账号的AccessKey ID。您可以登录RAM访问控制台,创建RAM角色,并赋予角色AliyunAMQPFullAccess权限,获取角色的ARN,调用AssumeRole接口获取一个扮演该角色的临时身份。AssumeRole执行成功会返回RAM角色的 AccessKeyID、AccessKeySecret以及SecurityToken。角色ARN的概念,请参见RAM角色概览。 |
AccessKeySecret | jw6MawfKOVBveRr84u**** | 阿里云账号或RAM账号的AccessKey Secret。 |
SecurityToken | CAIS9wF1q6Ft5B2yfSjIr5fkJNPkvK5tgqeScX+ElnkMXvhvgIPO0Dz2IHhNfXZuB************ | RAM角色的安全令牌(STS Token)。 |
instanceId | amqp-cn-v0h1kb9nu*** | 消息队列RabbitMQ版的实例ID。您可以在消息队列RabbitMQ版控制台的实例详情页面查看。如何查看实例ID,请参见查看实例详情。 |
virtualHost | Test | 消息队列RabbitMQ版实例的Vhost。您可以在消息队列RabbitMQ版控制台的Vhost 列表页面查看。如何查看Vhost,请参见查看Vhost连接详情。 |
import com.alibaba.mq.amqp.utils.UserUtils;
import com.rabbitmq.client.impl.CredentialsProvider;
import org.apache.commons.lang3.StringUtils;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
/**
* 阿里云UserName、Password生成类(动态变化)。
*/
public class AliyunCredentialsProvider implements CredentialsProvider {
/**
* Access Key ID.
*/
private final String accessKeyId;
/**
* Access Key Secret.
*/
private final String accessKeySecret;
/**
* security temp token. (optional)
*/
private final String securityToken;
/**
* 实例ID(从消息队列RabbitMQ版控制台获取)。
*/
private final String instanceId;
public AliyunCredentialsProvider(final String accessKeyId, final String accessKeySecret,
final String instanceId) {
this(accessKeyId, accessKeySecret, null, instanceId);
}
public AliyunCredentialsProvider(final String accessKeyId, final String accessKeySecret,
final String securityToken, final String instanceId) {
this.accessKeyId = accessKeyId;
this.accessKeySecret = accessKeySecret;
this.securityToken = securityToken;
this.instanceId = instanceId;
}
@Override
public String getUsername() {
if(StringUtils.isNotEmpty(securityToken)) {
return UserUtils.getUserName(accessKeyId, instanceId, securityToken);
} else {
return UserUtils.getUserName(accessKeyId, instanceId);
}
}
@Override
public String getPassword() {
try {
return UserUtils.getPassord(accessKeySecret);
} catch (InvalidKeyException e) {
//todo
} catch (NoSuchAlgorithmException e) {
//todo
}
return null;
}
}
配置Object类型消息
创建Object类型消息类Person.java。
import java.io.Serializable;
public class Person implements Serializable{
private static final long serialVersionUID = -5809782578272943****;
String name = null;
int age = 0;
String address =null;
public Person(String name, int age, String address){
this.name = name;
this.age = age;
this.address = address;
}
public String getName(){
return name;
}
public int getAge(){
return age;
}
public String getAddress(){
return address;
}
}
发送消息
创建并编译运行QueueSend.java生产消息。
重要 编译运行ProducerTest.java生产消息之前,您需要根据代码提示信息配置参数列表中所列举的参数。
import com.rabbitmq.jms.admin.RMQConnectionFactory;
import com.rabbitmq.jms.admin.RMQDestination;
import com.rabbitmq.jms.client.AmqpConnectionFactoryPostProcessor;
import javax.jms.*;
import java.io.Serializable;
public class QueueSend {
public void sendTextMsg(Session session, String MsgContent, String name) throws JMSException {
Destination queue = new RMQDestination(name, true, false);
MessageProducer msgProducer = session.createProducer(queue);
Message msg = session.createTextMessage(MsgContent);
msgProducer.send(msg);
System.out.println("文本类型消息已发送");
}
public void sendMap(Session session, MapMessage map, String name) throws JMSException {
Destination queue = new RMQDestination(name, true, false);
MessageProducer msgProducer = session.createProducer(queue);
msgProducer.send(map);
System.out.println("Map类型消息已发送");
}
public void sendObj(Session session, Person obj, String name) throws JMSException {
Destination queue = new RMQDestination(name, true, false);
MessageProducer msgProducer = session.createProducer(queue);
ObjectMessage objMsg = session.createObjectMessage((Serializable) obj);
MessageProducer msgPorducer = session.createProducer(queue);
msgPorducer.send(objMsg);
System.out.println("Object类型的消息已发送");
}
public static void main(String[] args) throws JMSException {
final AliyunCredentialsProvider provider = new AliyunCredentialsProvider("${AccessKeyID}", "${AccessKeySecret}","${SecurityToken}","${InstanceID}");
RMQConnectionFactory connectionFactory = new RMQConnectionFactory();
connectionFactory.setHost("xxx.xxx.aliyuncs.com");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("${VhostName}");
connectionFactory.setAmqpConnectionFactoryPostProcessor(new AmqpConnectionFactoryPostProcessor() {
@Override
public void postProcess(com.rabbitmq.client.ConnectionFactory cf) {
cf.setCredentialsProvider(provider);
cf.setAutomaticRecoveryEnabled(true);
cf.setNetworkRecoveryInterval(5000);
System.out.println(cf.getPassword());
}
});
Connection connection = connectionFactory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
QueueSend qs = new QueueSend();
qs.sendTextMsg(session, "发送JMS文本类型消息", "queue.msgText");
MapMessage mapMsg = session.createMapMessage();
mapMsg.setString("name", "李某");
mapMsg.setBoolean("IsHero", false);
mapMsg.setInt("age", 23);
qs.sendMap(session, mapMsg, "queue.msgMap");
Person person = new Person("李某", 23, "北京.大兴");
qs.sendObj(session, person, "queue.msgObj");
session.close();
connection.close();
}
}
订阅消息
创建并编译运行QueueAccept.java订阅消息。
重要 编译运行QueueAccept.java订阅消息之前,您需要根据代码提示信息配置参数列表中所列举的参数。
import com.rabbitmq.jms.admin.RMQConnectionFactory;
import com.rabbitmq.jms.admin.RMQDestination;
import com.rabbitmq.jms.client.AmqpConnectionFactoryPostProcessor;
import javax.jms.*;
public class QueueAccept implements MessageListener {
public void onMessage(Message message) {
if (message instanceof TextMessage) {
TextMessage text = (TextMessage) message;
try {
System.out.println("发送的文本消息内容为:" + text.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
if (message instanceof MapMessage) {
MapMessage map = (MapMessage) message;
try {
System.out.println("姓名:" + map.getString("name"));
System.out.println("是否是英雄:" + map.getBoolean("IsHero"));
System.out.println("年龄:" + map.getInt("age"));
} catch (JMSException e) {
e.printStackTrace();
}
}
if (message instanceof ObjectMessage) {
ObjectMessage objMsg = (ObjectMessage) message;
try {
Person person = (Person) objMsg.getObject();
System.out.println("用户名:" + person.getName() + ",年龄:" + person.getAge() + ",地址:" + person.getAddress());
} catch (JMSException e) {
e.printStackTrace();
}
}
try {
message.acknowledge();
System.out.println("消息手动确认");
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws JMSException {
RMQConnectionFactory connectionFactory = null;
Connection connection = null;
Session session = null;
if (connectionFactory == null) {
final AliyunCredentialsProvider provider = new AliyunCredentialsProvider("${AccessKeyID}", "${AccessKeySecret}","${SecurityToken}","${InstanceID}");
connectionFactory = new RMQConnectionFactory();
connectionFactory.setHost("xxx.xxx.aliyuncs.com");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("${VhostName}");
connectionFactory.setAmqpConnectionFactoryPostProcessor(new AmqpConnectionFactoryPostProcessor() {
@Override
public void postProcess(com.rabbitmq.client.ConnectionFactory cf) {
cf.setCredentialsProvider(provider);
cf.setAutomaticRecoveryEnabled(true);
cf.setNetworkRecoveryInterval(5000);
System.out.println(cf.getPassword());
}
});
}
if (connection == null) {
connection = connectionFactory.createConnection();
connection.start();
}
session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Destination destination = new RMQDestination("queue.msgText", true, false);
MessageConsumer consumer = session.createConsumer(destination);
consumer.setMessageListener(new QueueAccept());
Destination queue1 = new RMQDestination("queue.msgMap", true, false);
MessageConsumer consumer1 = session.createConsumer(queue1);
consumer1.setMessageListener(new QueueAccept());
Destination queue2 = new RMQDestination("queue.msgObj", true, false);
MessageConsumer consumer2 = session.createConsumer(queue2);
consumer2.setMessageListener(new QueueAccept());
}
}