All Products
Search
Document Center

ApsaraMQ for RabbitMQ:JMS P2P (Alibaba Cloud account or RAM user)

Last Updated:Sep 08, 2023

This topic describes how to connect a Java Message Service (JMS) client to ApsaraMQ for RabbitMQ to send and receive messages in point-to-point (P2P) messaging pattern in scenarios in which a pair of username and password is used for authentication. The username and password are generated by using the AccessKey pair of your Alibaba Cloud account or a Resource Access Management (RAM) user within the account.

Prerequisites

Background information

  • JMS P2P

    The JMS P2P messaging model has the following features:

    • Each message has only one consumer. After a producer sends a message to a queue, a specified consumer registered for the queue can receive the message. JMS P2P model

    • Producers and consumers have no timing dependencies. Producers can send messages to queues regardless of whether consumers are active. Consumers can receive messages from queues regardless of whether producers are active.

  • If an open source RabbitMQ client needs to access Alibaba Cloud services, you must use the AccessKey ID and AccessKey secret of your Alibaba Cloud account or a RAM user within the account to generate a pair of username and password. In the code of open source RabbitMQ SDK, set the userName parameter to the generated username and set the passWord parameter to the generated password. ApsaraMQ for RabbitMQ authenticates the RabbitMQ client by using the pair of username and password.

    Important

    If you use an SDK to send or receive messages on your client, we recommend that you use persistent connections. This way, the client does not need to establish connections every time you use the client to send or receive messages. Frequent creation of connections consumes a large number of network and broker resources and may even trigger protection against SYN flood attacks on the broker. For more information, see Connection.

Messaging process

消息收发流程

Obtain an endpoint

You must obtain the endpoint that is used to access your instance in the ApsaraMQ for RabbitMQ console. Before you send and receive messages, you must specify the endpoint in the code of the producer and consumer. Then, the producer and consumer can access the ApsaraMQ for RabbitMQ instance by using the endpoint.

  1. Log on to the ApsaraMQ for RabbitMQ console. In the left-side navigation pane, select Instances.

  2. In the top navigation bar of the Instances page, select a region. In the instance list, click the name of the instance that you want to manage.

  3. On the Endpoint Information tab of the Instance Details page, move the pointer over the type of endpoint that you want to use. Then, click the Copy icon on the right side of the endpoint to copy the endpoint.

    Type

    Description

    Example

    Public endpoint

    You can access an instance over the Internet to read and write data. By default, pay-as-you-go instances support public endpoints. To use a public endpoint for a subscription instance, you must select a public endpoint when you create the subscription instance.

    XXX.mq-amqp.cn-hangzhou-a.aliyuncs.com

    VPC endpoint

    You can access an instance over a virtual private cloud (VPC) to read and write data. By default, pay-as-you-go and subscription instances support VPC endpoints.

    XXX.mq-amqp.cn-hangzhou-a-internal.aliyuncs.com

Add JMS dependencies

Add the following dependencies to the pom.xml file:

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.5.0</version> <!-- All versions of open source RabbitMQ are supported. -->
</dependency>
<dependency>
    <groupId>com.alibaba.mq-amqp.jms</groupId>
    <artifactId>mq-amqp-jms-client</artifactId>
    <version>1.11.2-1.0.0</version>
    <exclusions>
        <exclusion>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-lang3</artifactId>
    <version>3.4</version>
</dependency>
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-simple</artifactId>
    <version>1.7.25</version>
    <scope>compile</scope>
</dependency>

Generate a pair of username and password

  1. Log on to the ApsaraMQ for RabbitMQ console. In the left-side navigation pane, select Instances.

  2. In the top navigation bar of the Instances page, select a region. In the instance list, click the name of the instance that you want to manage.

  3. In the left-side navigation pane, click Static Accounts.

  4. On the Static Accounts page, click Create Username/Password.

  5. In the Create Username/Password panel, configure the AccessKey ID and AccessKey Secret parameters. Then, click OK.

    Note

    You can obtain the values of the AccessKey ID and AccessKey Secret parameters in the RAM console. For more information, see Create an AccessKey pair.

    On the Static Accounts page, the created static username and password appears. The password is masked. Username and password

  6. In the Password column of the static username and password that you created, click Display to view the password that corresponds to the username.

Configure an object message

Create a class named Person.java that is used to configure an object message.

import java.io.Serializable;

public class Person implements Serializable{
    
    private static final long serialVersionUID = -5809782578272943****;
    String name = null;
    int age = 0;
    String address =null;

    public Person(String name, int age, String address){
        this.name = name;
        this.age = age;
        this.address = address;
    }

    public String getName(){
        return name;
    }

    public int getAge(){
        return age;
    }

    public String getAddress(){
        return address;
    }
}

Send messages

Create, compile, and run QueueSend.java to send messages.

Important

Before you compile QueueSend.java and run it to send messages, you must configure the parameters described in Parameters based on the comments in the code.

Parameters

Parameter

Example

Description

hostName

1880770****.mq-amqp.cn-hangzhou-a.aliyuncs.com

The endpoint of the ApsaraMQ for RabbitMQ instance. In the ApsaraMQ for RabbitMQ console, you can obtain the endpoint on the Instance Details page.

Port

5672

The default port. Use port 5672 for non-encrypted connections and port 5671 for encrypted connections.

userName

MjoxODgwNzcwODY5MD****

The static username that is generated in the ApsaraMQ for RabbitMQ console. ApsaraMQ for RabbitMQ encodes the AccessKey pair of your Alibaba Cloud account or a RAM user within the account and the ID of the ApsaraMQ for RabbitMQ instance in Base64 to obtain a static username. In the ApsaraMQ for RabbitMQ console, you can obtain the static username on the Static Accounts page.

passWord

NDAxREVDQzI2MjA0OT****

The static password that is generated in the ApsaraMQ for RabbitMQ console. ApsaraMQ for RabbitMQ uses the HMAC-SHA1 algorithm to generate a signature based on the AccessKey secret of your Alibaba Cloud account or a RAM user within the account and the timestamp parameter, which indicates the current system time. Then, ApsaraMQ for RabbitMQ encodes the signature and the timestamp parameter in Base64 to obtain a static password. In the ApsaraMQ for RabbitMQ console, you can search for the created username and password based on the instance ID on the Static Accounts page.

virtualHost

Test

The vhost that you created in the ApsaraMQ for RabbitMQ instance. In the ApsaraMQ for RabbitMQ console, you can view the vhost on the vhosts page. For information about how to view a vhost, see View the information about the connection that is established to a vhost.

import com.rabbitmq.jms.admin.RMQConnectionFactory;
import com.rabbitmq.jms.admin.RMQDestination;
import com.rabbitmq.jms.client.AmqpConnectionFactoryPostProcessor;
import javax.jms.*;
import java.io.Serializable;

public class QueueSend {

    public void sendTextMsg(Session session, String MsgContent, String name) throws JMSException {
        Destination queue = new RMQDestination(name, true, false);
        MessageProducer msgProducer = session.createProducer(queue);
        Message msg = session.createTextMessage(MsgContent);
        msgProducer.send(msg);
        System.out.println("Text message sent");
    }

    public void sendMap(Session session, MapMessage map, String name) throws JMSException {
        Destination queue = new RMQDestination(name, true, false);
        MessageProducer msgProducer = session.createProducer(queue);
        msgProducer.send(map);
        System.out.println("Map message sent");
    }

    public void sendObj(Session session, Person obj, String name) throws JMSException {
        Destination queue = new RMQDestination(name, true, false);
        MessageProducer msgProducer = session.createProducer(queue);
        ObjectMessage objMsg = session.createObjectMessage((Serializable) obj);// Make sure that the Serializable interface is implemented by the object before the object is sent.
        MessageProducer msgPorducer = session.createProducer(queue);
        msgPorducer.send(objMsg);
        System.out.println("Object message sent");
    }

    public static void main(String[] args) throws JMSException {
        RMQConnectionFactory connectionFactory = new RMQConnectionFactory();
        connectionFactory.setHost("xxx.xxx.aliyuncs.com");
        connectionFactory.setUsername("${Username}");
        connectionFactory.setPassword("${Password}");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("${VhostName}");
        connectionFactory.setAmqpConnectionFactoryPostProcessor(new AmqpConnectionFactoryPostProcessor() {
            @Override
            public void postProcess(com.rabbitmq.client.ConnectionFactory cf) {
                cf.setAutomaticRecoveryEnabled(true);
                cf.setNetworkRecoveryInterval(5000);
                System.out.println(cf.getPassword());
            }
        });
        Connection connection = connectionFactory.createConnection();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        QueueSend qs = new QueueSend();
        qs.sendTextMsg(session, "Send a JMS text message", "queue.msgText");
        MapMessage mapMsg = session.createMapMessage();
        mapMsg.setString("name", "Li");
        mapMsg.setBoolean("IsHero", false);
        mapMsg.setInt("age", 23);
        qs.sendMap(session, mapMsg, "queue.msgMap");
        Person person=new Person("Li", 23, "Daxing-Beijing");// Send an object message.
        qs.sendObj(session, person, "queue.msgObj");
        session.close();
        connection.close();
    }
}

Subscribe to messages

Create, compile, and run QueueAccept.java to subscribe to messages.

Important

Before you compile QueueAccept.java and run it to subscribe to messages, you must configure the parameters described in Parameters based on the comments in the code.

import com.rabbitmq.jms.admin.RMQConnectionFactory;
import com.rabbitmq.jms.admin.RMQDestination;
import com.rabbitmq.jms.client.AmqpConnectionFactoryPostProcessor;
import javax.jms.*;

public class QueueAccept implements MessageListener {

    public void onMessage(Message message) {
        if (message instanceof TextMessage) {
            TextMessage text = (TextMessage) message;
            try {
                System.out.println("Content of the text message" + text.getText());
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
        if (message instanceof MapMessage) {
            MapMessage map = (MapMessage) message;
            try {
                System.out.println("Name:" + map.getString("name"));
                System.out.println("Hero:" + map.getBoolean("IsHero"));
                System.out.println("Age:" + map.getInt("age"));
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
        if (message instanceof ObjectMessage) {
            ObjectMessage objMsg = (ObjectMessage) message;
            try {
                Person person = (Person) objMsg.getObject();
                System.out.println("Username:" + person.getName() + ", Age:" + person.getAge() + ", Address:" + person.getAddress());
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
        try {
            message.acknowledge();
            System.out.println("Manually acknowledge the message");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) throws JMSException {
        RMQConnectionFactory connectionFactory = null;
        Connection connection = null;
        Session session = null;
        if (connectionFactory == null) {
            connectionFactory = new RMQConnectionFactory();
            connectionFactory.setHost("xxx.xxx.aliyuncs.com");
            connectionFactory.setUsername("${Username}");
            connectionFactory.setPassword("${Password}");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("${VhostName}");
            connectionFactory.setAmqpConnectionFactoryPostProcessor(new AmqpConnectionFactoryPostProcessor() {
                @Override
                public void postProcess(com.rabbitmq.client.ConnectionFactory cf) {
                    cf.setAutomaticRecoveryEnabled(true);
                    cf.setNetworkRecoveryInterval(5000);
                    System.out.println(cf.getPassword());
                }
            });
        }
        if (connection == null) {
            connection = connectionFactory.createConnection();
            connection.start();
        }
        session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
        Destination destination = new RMQDestination("queue.msgText", true, false);
        MessageConsumer consumer = session.createConsumer(destination);
        consumer.setMessageListener(new QueueAccept());
        Destination queue1 = new RMQDestination("queue.msgMap", true, false);
        MessageConsumer consumer1 = session.createConsumer(queue1);
        consumer1.setMessageListener(new QueueAccept());
        Destination queue2 = new RMQDestination("queue.msgObj", true, false);
        MessageConsumer consumer2 = session.createConsumer(queue2);
        consumer2.setMessageListener(new QueueAccept());
    }
}