This topic describes how a Java Message Service (JMS) client connects to Message Queue for RabbitMQ and send and receive messages in point-to-point (P2P) mode in the scenario where a pair of username and password is used for authentication. The username and password are generated by using the AccessKey pair of your Alibaba Cloud account or a Resource Access Management (RAM) user within the account.

Prerequisites

Background information

  • JMS P2P
    The JMS point-to-point (P2P) messaging model has the following features:
    • Each message has only one consumer. After a producer sends a message to a queue, a specified consumer registered for the queue can receive the message. JMS P2P model
    • Producers and consumers have no timing dependencies. Producers can send messages to queues regardless of whether consumers are active. Consumers can receive messages from queues regardless of whether producers are active.
  • If an open source RabbitMQ client needs to access Alibaba Cloud services, you must use the AccessKey ID and AccessKey secret of your Alibaba Cloud account or a Resource Access Management (RAM) user within the account to generate a pair of username and password. In the code of open source RabbitMQ SDK, set the userName parameter to the generated username and set the passWord parameter to the generated password. Message Queue for RabbitMQ authenticates the RabbitMQ client by using the pair of username and password.
    Notice If you use an SDK to send or receive messages on your client, we recommend that you use persistent connections. This way, the client does not need to establish connections every time you use the client to send or receive messages. Frequent creation of connections consumes a large number of network and broker resources and may even trigger protection against SYN flood attacks on the broker. For more information, see Connection.

Messaging process

Messaging process

Obtain an endpoint

You must obtain an endpoint of your instance in the Message Queue for RabbitMQ console. Before you send and receive messages, you must specify the endpoint in the code of the producer and consumer. Then, the producer and consumer can access the Message Queue for RabbitMQ instance by using the endpoint.

  1. Log on to the Message Queue for RabbitMQ console.
  2. In the Resource Distribution section of the Overview page, select the region where your instance is located.
  3. On the Instances page, click the name of your instance.
  4. On the Endpoint Information tab of the Instance Details page, move the pointer to the type of the endpoint that you want to use. Then, click the copy icon on the right side of the endpoint to copy the endpoint.
    Type Description Example
    Public endpoint You can access an instance from the Internet to read and write data. By default, pay-as-you-go instances have public endpoints. To use a public endpoint for a subscription instance, you must select a public endpoint when you create the subscription instance. XXX.mq-amqp.cn-hangzhou-a.aliyuncs.com
    VPC endpoint You can access an instance from a virtual private cloud (VPC) to read and write data. By default, both pay-as-you-go and subscription instances have VPC endpoints. XXX.mq-amqp.cn-hangzhou-a-internal.aliyuncs.com

Add JMS dependencies

Add the following dependencies to the pom.xml file:

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.5.0</version> <!-- All versions of open source RabbitMQ are supported. -->
</dependency>
<dependency>
    <groupId>com.alibaba.mq-amqp.jms</groupId>
    <artifactId>mq-amqp-jms-client</artifactId>
    <version>1.11.2-1.0.0</version>
    <exclusions>
        <exclusion>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-lang3</artifactId>
    <version>3.4</version>
</dependency>
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-simple</artifactId>
    <version>1.7.25</version>
    <scope>compile</scope>
</dependency>

Generate a pair of username and password

For more information, see Create a static username/password pair.

Configure an ObjectMessage object

Create a class named Person.java that is used to configure an ObjectMessage object.
import java.io.Serializable;

public class Person implements Serializable{
    
    private static final long serialVersionUID = -5809782578272943999L;
    String name = null;
    int age = 0;
    String address =null;

    public Person(String name, int age, String address){
        this.name = name;
        this.age = age;
        this.address = address;
    }

    public String getName(){
        return name;
    }

    public int getAge(){
        return age;
    }

    public String getAddress(){
        return address;
    }
}

Send messages

Create, compile, and run the QueueSend.java class.

Notice Before you compile QueueSend.java and run it to send messages, you must set the parameters described in Table 1 based on the comments in the code.
Table 1. Parameters
Parameter Example Description
hostName 1880770****.mq-amqp.cn-hangzhou-a.aliyuncs.com The endpoint of the Message Queue for RabbitMQ instance. In the Message Queue for RabbitMQ console, you can obtain the endpoint on the Instance Details page.
Port 5672 The default port. Use port 5672 for non-encrypted connections and port 5671 for encrypted connections.
userName MjoxODgwNzcwODY5MD**** The static username that is generated in the Message Queue for RabbitMQ console. Message Queue for RabbitMQ encodes the AccessKey pair of your Alibaba Cloud account or a RAM user within the account and the ID of the Message Queue for RabbitMQ instance in Base64 to obtain a static username. In the Message Queue for RabbitMQ console, you can obtain the static username on the Static Accounts page.
passWord NDAxREVDQzI2MjA0OT**** The static password that is generated in the Message Queue for RabbitMQ console. Message Queue for RabbitMQ uses the HMAC-SHA1 algorithm to generate a signature based on the AccessKey secret of your Alibaba Cloud account or a RAM user within the account and the timestamp parameter, which indicates the current system time. Then, Message Queue for RabbitMQ encodes the signature and the timestamp parameter in Base64 to obtain a static password. In the Message Queue for RabbitMQ console, you can obtain the static password on the Static Accounts page.
virtualHost Test The vhost that you created in the Message Queue for RabbitMQ instance. In the Message Queue for RabbitMQ console, you can view the vhost on the vhosts page. For more information about how to view a vhost, see View vhost details.
import com.rabbitmq.jms.admin.RMQConnectionFactory;
import com.rabbitmq.jms.admin.RMQDestination;
import com.rabbitmq.jms.client.AmqpConnectionFactoryPostProcessor;
import javax.jms.*;
import java.io.Serializable;

public class QueueSend {

    public void sendTextMsg(Session session, String MsgContent, String name) throws JMSException {
        Destination queue = new RMQDestination(name, true, false);
        MessageProducer msgProducer = session.createProducer(queue);
        Message msg = session.createTextMessage(MsgContent);
        msgProducer.send(msg);
        System.out.println("Text message sent");
    }

    public void sendMap(Session session, MapMessage map, String name) throws JMSException {
        Destination queue = new RMQDestination(name, true, false);
        MessageProducer msgProducer = session.createProducer(queue);
        msgProducer.send(map);
        System.out.println("Map message sent");
    }

    public void sendObj(Session session, Person obj, String name) throws JMSException {
        Destination queue = new RMQDestination(name, true, false);
        MessageProducer msgProducer = session.createProducer(queue);
        ObjectMessage objMsg = session.createObjectMessage((Serializable) obj);// Make sure that the Serializable interface is implemented by the object before the object is sent.
        MessageProducer msgPorducer = session.createProducer(queue);
        msgPorducer.send(objMsg);
        System.out.println("Object message sent");
    }

    public static void main(String[] args) throws JMSException {
        RMQConnectionFactory connectionFactory = new RMQConnectionFactory();
        connectionFactory.setHost("xxx.xxx.aliyuncs.com");
        connectionFactory.setUsername("${Username}");
        connectionFactory.setPassword("${Password}");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("${VhostName}");
        connectionFactory.setAmqpConnectionFactoryPostProcessor(new AmqpConnectionFactoryPostProcessor() {
            @Override
            public void postProcess(com.rabbitmq.client.ConnectionFactory cf) {
                cf.setAutomaticRecoveryEnabled(true);
                cf.setNetworkRecoveryInterval(5000);
                System.out.println(cf.getPassword());
            }
        });
        Connection connection = connectionFactory.createConnection();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        QueueSend qs = new QueueSend();
        qs.sendTextMsg(session, "Send a JMS text message", "queue.msgText");
        MapMessage mapMsg = session.createMapMessage();
        mapMsg.setString("name", "Li");
        mapMsg.setBoolean("IsHero", false);
        mapMsg.setInt("age", 23);
        qs.sendMap(session, mapMsg, "queue.msgMap");
        Person person = new Person("Li", 23, "Daxing, Beijing");// Send an object message.
        qs.sendObj(session, person, "queue.msgObj");
        session.close();
        connection.close();
    }
}

Subscribe to messages

Create, compile, and run the QueueAccept.java class.

Notice Before you compile QueueAccept.java and run it to subscribe to messages, you must set the parameters described in Table 1 based on the comments in the code.
import com.rabbitmq.jms.admin.RMQConnectionFactory;
import com.rabbitmq.jms.admin.RMQDestination;
import com.rabbitmq.jms.client.AmqpConnectionFactoryPostProcessor;
import javax.jms.*;

public class QueueAccept implements MessageListener {

    public void onMessage(Message message) {
        if (message instanceof TextMessage) {
            TextMessage text = (TextMessage) message;
            try {
                System.out.println("Content of the text message" + text.getText());
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
        if (message instanceof MapMessage) {
            MapMessage map = (MapMessage) message;
            try {
                System.out.println("Name:" + map.getString("name"));
                System.out.println("Hero:" + map.getBoolean("IsHero"));
                System.out.println("Age:" + map.getInt("age"));
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
        if (message instanceof ObjectMessage) {
            ObjectMessage objMsg = (ObjectMessage) message;
            try {
                Person person = (Person) objMsg.getObject();
                System.out.println("Username:" + person.getName() + ", Age:" + person.getAge() + ", Address:" + person.getAddress());
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
        try {
            message.acknowledge();
            System.out.println("Manually acknowledge the message");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) throws JMSException {
        RMQConnectionFactory connectionFactory = null;
        Connection connection = null;
        Session session = null;
        if (connectionFactory == null) {
            connectionFactory = new RMQConnectionFactory();
            connectionFactory.setHost("xxx.xxx.aliyuncs.com");
            connectionFactory.setUsername("${Username}");
            connectionFactory.setPassword("${Password}");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("${VhostName}");
            connectionFactory.setAmqpConnectionFactoryPostProcessor(new AmqpConnectionFactoryPostProcessor() {
                @Override
                public void postProcess(com.rabbitmq.client.ConnectionFactory cf) {
                    cf.setAutomaticRecoveryEnabled(true);
                    cf.setNetworkRecoveryInterval(5000);
                    System.out.println(cf.getPassword());
                }
            });
        }
        if (connection == null) {
            connection = connectionFactory.createConnection();
            connection.start();
        }
        session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
        Destination destination = new RMQDestination("queue.msgText", true, false);
        MessageConsumer consumer = session.createConsumer(destination);
        consumer.setMessageListener(new QueueAccept());
        Destination queue1 = new RMQDestination("queue.msgMap", true, false);
        MessageConsumer consumer1 = session.createConsumer(queue1);
        consumer1.setMessageListener(new QueueAccept());
        Destination queue2 = new RMQDestination("queue.msgObj", true, false);
        MessageConsumer consumer2 = session.createConsumer(queue2);
        consumer2.setMessageListener(new QueueAccept());
    }
}