This topic shows you how to use a JMS client to connect to Message Queue for RabbitMQ and implement the point-to-point (P2P) messaging model.

Background information

  • JMS P2P
    The JMS P2P messaging model has the following features:
    • Each message has only one consumer. After a producer sends a message to a queue, a specified consumer registered for the queue can receive the message.JMS P2P model
    • Producers and consumers have no timing dependencies. Producers can send messages to queues regardless of whether consumers are active. Consumers can receive messages from queues regardless of whether producers are active.
  • Username and password
    When you access Message Queue for RabbitMQ from a client, Message Queue for RabbitMQ authenticates your permissions based on your username and password. Message Queue for RabbitMQ allows you to generate usernames and passwords for your client by using the following methods:
    • Dynamic username and password: Use a permission authentication class provided by Alibaba Cloud to generate dynamic usernames and passwords.
    • Static username and password (recommended): Generate static usernames and passwords in the Message Queue for RabbitMQ console. This method is the same as that of open source RabbitMQ.
    Notice When you call an SDK to send or receive messages on a client, we recommend that you use persistent connections. This way, you do not need to create connections every time you send or receive messages. Frequent creation of connections consumes a large number of network and server resources and may even trigger protection against SYN flood attacks. For more information, see Connection.

Messaging process

JMS P2P

Obtain endpoints

You must obtain the endpoint of your instance in the Message Queue for RabbitMQ console. When you send or receive messages, you must configure the endpoint on the producer or consumer client. Otherwise, you cannot access the Message Queue for RabbitMQ instance.

  1. Log on to the Message Queue for RabbitMQ console.
  2. In the top navigation bar, select the region where your instance resides.
  3. In the left-side navigation pane, click Instances.
  4. On the Instances page, select your instance. In the Basic Information section, find the endpoint of the required network type and click the endpoint to copy it.
    Type Description Example
    Public endpoint You can access an instance from the Internet to read and write data. By default, pay-as-you-go instances have public endpoints. To use a public endpoint for a subscription instance, you must configure a public endpoint when you create the subscription instance. XXX.mq-amqp.cn-hangzhou-a.aliyuncs.com
    VPC endpoint You can access an instance from a VPC to read and write data. By default, both pay-as-you-go and subscription instances have VPC endpoints. XXX.mq-amqp.cn-hangzhou-a-internal.aliyuncs.com

Add JMS dependencies

Add the following dependencies to the pom.xml file:
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.5.0</version>
</dependency>
<dependency>
    <groupId>com.alibaba.mq-amqp.jms</groupId>
    <artifactId>mq-amqp-jms-client</artifactId>
    <version>1.11.2-1.0.0</version>
    <exclusions>
        <exclusion>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>com.alibaba.mq-amqp</groupId>
    <artifactId>mq-amqp-client</artifactId>
    <version>1.0.5</version>
</dependency>
<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-lang3</artifactId>
    <version>3.4</version>
</dependency>
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-simple</artifactId>
    <version>1.7.25</version>
    <scope>compile</scope>
</dependency>
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.5.0</version>
</dependency>
<dependency>
    <groupId>com.alibaba.mq-amqp.jms</groupId>
    <artifactId>mq-amqp-jms-client</artifactId>
    <version>1.11.2-1.0.0</version>
    <exclusions>
        <exclusion>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-lang3</artifactId>
    <version>3.4</version>
</dependency>
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-simple</artifactId>
    <version>1.7.25</version>
    <scope>compile</scope>
</dependency>

Generate a username and its password

Create the AliyunCredentialsProvider.java file that is used to generate a dynamic username and its password.

import com.alibaba.mq.amqp.utils.UserUtils;
import com.rabbitmq.client.impl.CredentialsProvider;
import org.apache.commons.lang3.StringUtils;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
/**
 * The AliyunCredentialsProvider class that is used to generate a dynamic username and its password. 
 */
public class AliyunCredentialsProvider implements CredentialsProvider {
    /**
     * Access Key ID.
     */
    private final String accessKeyId;
    /**
     * Access Key Secret.
     */
    private final String accessKeySecret;
    /**
     * security temp token. (optional)
     */
    private final String securityToken;
    /**
     * The ID of the instance. You can obtain the instance ID from the Message Queue for RabbitMQ console. 
     */
    private final String instanceId;
    public AliyunCredentialsProvider(final String accessKeyId, final String accessKeySecret,
                                     final String instanceId) {
        this(accessKeyId, accessKeySecret, null, instanceId);
    }
    public AliyunCredentialsProvider(final String accessKeyId, final String accessKeySecret,
                                     final String securityToken, final String instanceId) {
        this.accessKeyId = accessKeyId;
        this.accessKeySecret = accessKeySecret;
        this.securityToken = securityToken;
        this.instanceId = instanceId;
    }
    @Override
    public String getUsername() {
        if(StringUtils.isNotEmpty(securityToken)) {
            return UserUtils.getUserName(accessKeyId, instanceId, securityToken);
        } else {
            return UserUtils.getUserName(accessKeyId, instanceId);
        }
    }

    @Override
    public String getPassword() {
        try {
            return UserUtils.getPassord(accessKeySecret);
        } catch (InvalidKeyException e) {
            //todo
        } catch (NoSuchAlgorithmException e) {
            //todo
        }
        return null;
    }
}
For information about how to create a static username and its password, see Create a static username and its password.

Configure an ObjectMessage object

Create the Person.java file that is used to configure an ObjectMessage object.
import java.io.Serializable;

public class Person implements Serializable{
    
    private static final long serialVersionUID = -5809782578272943999L;
    String name = null;
    int age = 0;
    String address =null;

    public Person(String name, int age, String address){
        this.name = name;
        this.age = age;
        this.address = address;
    }

    public String getName(){
        return name;
    }

    public int getAge(){
        return age;
    }

    public String getAddress(){
        return address;
    }
}

Send a message

Create, compile, and run the QueueSend.java file.
import com.rabbitmq.jms.admin.RMQConnectionFactory;
import com.rabbitmq.jms.admin.RMQDestination;
import com.rabbitmq.jms.client.AmqpConnectionFactoryPostProcessor;
import javax.jms.*;
import java.io.Serializable;

public class QueueSend {

    public void sendTextMsg(Session session, String MsgContent, String name) throws JMSException {
        Destination queue = new RMQDestination(name, true, false);
        MessageProducer msgProducer = session.createProducer(queue);
        Message msg = session.createTextMessage(MsgContent);
        msgProducer.send(msg);
        System.out.println("A text message is sent.");
    }

    public void sendMap(Session session, MapMessage map, String name) throws JMSException {
        Destination queue = new RMQDestination(name, true, false);
        MessageProducer msgProducer = session.createProducer(queue);
        msgProducer.send(map);
        System.out.println("A map message is sent.");
    }

    public void sendObj(Session session, Person obj, String name) throws JMSException {
        Destination queue = new RMQDestination(name, true, false);
        MessageProducer msgProducer = session.createProducer(queue);
        ObjectMessage objMsg = session.createObjectMessage((Serializable) obj);
        MessageProducer msgPorducer = session.createProducer(queue);
        msgPorducer.send(objMsg);
        System.out.println("An object message is sent.");
    }

    public static void main(String[] args) throws JMSException {
        final AliyunCredentialsProvider provider = new AliyunCredentialsProvider("${AccessKeyID}", "${AccessKeySecret}","${InstanceID}");
        RMQConnectionFactory connectionFactory = new RMQConnectionFactory();
        connectionFactory.setHost("xxx.xxx.aliyuncs.com");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("${VhostName}");
        connectionFactory.setAmqpConnectionFactoryPostProcessor(new AmqpConnectionFactoryPostProcessor() {
            @Override
            public void postProcess(com.rabbitmq.client.ConnectionFactory cf) {
                cf.setCredentialsProvider(provider);
                cf.setAutomaticRecoveryEnabled(true);
                cf.setNetworkRecoveryInterval(5000);
                System.out.println(cf.getPassword());
            }
        });
        Connection connection = connectionFactory.createConnection();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        QueueSend qs = new QueueSend();
        qs.sendTextMsg(session, "Send a JMS text message.", "queue.msgText");
        MapMessage mapMsg = session.createMapMessage();
        mapMsg.setString("name", "Li");
        mapMsg.setBoolean("IsHero", false);
        mapMsg.setInt("age", 23);
        qs.sendMap(session, mapMsg, "queue.msgMap");
        Person person = new Person("Li", 23,  "Beijing. Daxing");
        qs.sendObj(session, person, "queue.msgObj");
        session.close();
        connection.close();
    }
}
import com.rabbitmq.jms.admin.RMQConnectionFactory;
import com.rabbitmq.jms.admin.RMQDestination;
import com.rabbitmq.jms.client.AmqpConnectionFactoryPostProcessor;
import javax.jms.*;
import java.io.Serializable;

public class QueueSend {

    public void sendTextMsg(Session session, String MsgContent, String name) throws JMSException {
        Destination queue = new RMQDestination(name, true, false);
        MessageProducer msgProducer = session.createProducer(queue);
        Message msg = session.createTextMessage(MsgContent);
        msgProducer.send(msg);
        System.out.println("A text message is sent.");
    }

    public void sendMap(Session session, MapMessage map, String name) throws JMSException {
        Destination queue = new RMQDestination(name, true, false);
        MessageProducer msgProducer = session.createProducer(queue);
        msgProducer.send(map);
        System.out.println("A map message is sent.");
    }

    public void sendObj(Session session, Person obj, String name) throws JMSException {
        Destination queue = new RMQDestination(name, true, false);
        MessageProducer msgProducer = session.createProducer(queue);
        ObjectMessage objMsg = session.createObjectMessage((Serializable) obj);// Make sure that the Serializable interface is implemented.
        MessageProducer msgPorducer = session.createProducer(queue);
        msgPorducer.send(objMsg);
        System.out.println("An object message is sent.");
    }

    public static void main(String[] args) throws JMSException {
        RMQConnectionFactory connectionFactory = new RMQConnectionFactory();
        connectionFactory.setHost("xxx.xxx.aliyuncs.com");
        connectionFactory.setUsername("${Username}");
        connectionFactory.setPassword("${Password}");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("${VhostName}");
        connectionFactory.setAmqpConnectionFactoryPostProcessor(new AmqpConnectionFactoryPostProcessor() {
            @Override
            public void postProcess(com.rabbitmq.client.ConnectionFactory cf) {
                cf.setAutomaticRecoveryEnabled(true);
                cf.setNetworkRecoveryInterval(5000);
                System.out.println(cf.getPassword());
            }
        });
        Connection connection = connectionFactory.createConnection();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        QueueSend qs = new QueueSend();
        qs.sendTextMsg(session, "Send a JMS text message.", "queue.msgText");
        MapMessage mapMsg = session.createMapMessage();
        mapMsg.setString("name", "Li");
        mapMsg.setBoolean("IsHero", false);
        mapMsg.setInt("age", 23);
        qs.sendMap(session, mapMsg, "queue.msgMap");
        Person person = new Person("Li", 23,  "Beijing. Daxing");// Send an object message.
        qs.sendObj(session, person, "queue.msgObj");
        session.close();
        connection.close();
    }
}

Receive a message

Create, compile, and run the QueueAccept.java file.
import com.rabbitmq.jms.admin.RMQConnectionFactory;
import com.rabbitmq.jms.admin.RMQDestination;
import com.rabbitmq.jms.client.AmqpConnectionFactoryPostProcessor;
import javax.jms.*;

public class QueueAccept implements MessageListener {

    public void onMessage(Message message) {
        if (message instanceof TextMessage) {
            TextMessage text = (TextMessage) message;
            try {
                System.out.println("Text message: " + text.getText());
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
        if (message instanceof MapMessage) {
            MapMessage map = (MapMessage) message;
            try {
                System.out.println("Name: " + map.getString("name"));
                System.out.println("Hero: " + map.getBoolean("IsHero"));
                System.out.println("Age: " + map.getInt("age"));
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
        if (message instanceof ObjectMessage) {
            ObjectMessage objMsg = (ObjectMessage) message;
            try {
                Person person = (Person) objMsg.getObject();
                System.out.println("Username: " + person.getName() + ", Age: " + person.getAge() + ", Address: " + person.getAddress());
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
        try {
            message.acknowledge();
            System.out.println("The message is manually acknowledged.");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) throws JMSException {
        RMQConnectionFactory connectionFactory = null;
        Connection connection = null;
        Session session = null;
        if (connectionFactory == null) {
            final AliyunCredentialsProvider provider = new AliyunCredentialsProvider("${AccessKeyID}", "${AccessKeySecret}","${InstanceID}");
            connectionFactory = new RMQConnectionFactory();
            connectionFactory.setHost("xxx.xxx.aliyuncs.com");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("${VhostName}");
            connectionFactory.setAmqpConnectionFactoryPostProcessor(new AmqpConnectionFactoryPostProcessor() {
                @Override
                public void postProcess(com.rabbitmq.client.ConnectionFactory cf) {
                    cf.setCredentialsProvider(provider);
                    cf.setAutomaticRecoveryEnabled(true);
                    cf.setNetworkRecoveryInterval(5000);
                    System.out.println(cf.getPassword());
                }
            });
        }
        if (connection == null) {
            connection = connectionFactory.createConnection();
            connection.start();
        }
        session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
        Destination destination = new RMQDestination("queue.msgText", true, false);
        MessageConsumer consumer = session.createConsumer(destination);
        consumer.setMessageListener(new QueueAccept());
        Destination queue1 = new RMQDestination("queue.msgMap", true, false);
        MessageConsumer consumer1 = session.createConsumer(queue1);
        consumer1.setMessageListener(new QueueAccept());
        Destination queue2 = new RMQDestination("queue.msgObj", true, false);
        MessageConsumer consumer2 = session.createConsumer(queue2);
        consumer2.setMessageListener(new QueueAccept());
    }
}
import com.rabbitmq.jms.admin.RMQConnectionFactory;
import com.rabbitmq.jms.admin.RMQDestination;
import com.rabbitmq.jms.client.AmqpConnectionFactoryPostProcessor;
import javax.jms.*;

public class QueueAccept implements MessageListener {

    public void onMessage(Message message) {
        if (message instanceof TextMessage) {
            TextMessage text = (TextMessage) message;
            try {
                System.out.println("Text message: " + text.getText());
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
        if (message instanceof MapMessage) {
            MapMessage map = (MapMessage) message;
            try {
                System.out.println("Name: " + map.getString("name"));
                System.out.println("Hero: " + map.getBoolean("IsHero"));
                System.out.println("Age: " + map.getInt("age"));
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
        if (message instanceof ObjectMessage) {
            ObjectMessage objMsg = (ObjectMessage) message;
            try {
                Person person = (Person) objMsg.getObject();
                System.out.println("Username: " + person.getName() + ", Age: " + person.getAge() + ", Address: " + person.getAddress());
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
        try {
            message.acknowledge();
            System.out.println("The message is manually acknowledged.");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) throws JMSException {
        RMQConnectionFactory connectionFactory = null;
        Connection connection = null;
        Session session = null;
        if (connectionFactory == null) {
            connectionFactory = new RMQConnectionFactory();
            connectionFactory.setHost("xxx.xxx.aliyuncs.com");
            connectionFactory.setUsername("${Username}");
            connectionFactory.setPassword("${Password}");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("${VhostName}");
            connectionFactory.setAmqpConnectionFactoryPostProcessor(new AmqpConnectionFactoryPostProcessor() {
                @Override
                public void postProcess(com.rabbitmq.client.ConnectionFactory cf) {
                    cf.setAutomaticRecoveryEnabled(true);
                    cf.setNetworkRecoveryInterval(5000);
                    System.out.println(cf.getPassword());
                }
            });
        }
        if (connection == null) {
            connection = connectionFactory.createConnection();
            connection.start();
        }
        session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
        Destination destination = new RMQDestination("queue.msgText", true, false);
        MessageConsumer consumer = session.createConsumer(destination);
        consumer.setMessageListener(new QueueAccept());
        Destination queue1 = new RMQDestination("queue.msgMap", true, false);
        MessageConsumer consumer1 = session.createConsumer(queue1);
        consumer1.setMessageListener(new QueueAccept());
        Destination queue2 = new RMQDestination("queue.msgObj", true, false);
        MessageConsumer consumer2 = session.createConsumer(queue2);
        consumer2.setMessageListener(new QueueAccept());
    }
}