All Products
Search
Document Center

ApsaraMQ for RabbitMQ:JMS Pub/Sub (Alibaba Cloud account or RAM user)

Last Updated:Sep 08, 2023

This topic describes how to connect a Java Message Service (JMS) client to ApsaraMQ for RabbitMQ to send and receive messages in publish-subscribe (Pub/Sub) messaging pattern in scenarios in which a pair of username and password is used for authentication. The username and password are generated by using the AccessKey pair of your Alibaba Cloud account or a Resource Access Management (RAM) user within the account.

Prerequisites

Background information

  • Each message can have multiple consumers. After a producer sends a message to a topic, all consumers that subscribe to the topic can consume the message. JMS_pub_sub_model

  • Timing dependencies exist between publishers and subscribers.

    • If a subscriber creates a non-persistent subscription, the subscriber must subscribe to a topic before a publisher sends a message to the topic. In addition, the subscriber must remain active to consume all messages sent to the topic. If a message is published when the subscriber is inactive, the subscriber cannot consume the message even after the subscriber becomes active. For information about the sample code, see Non-persistent subscriptions.

    • If a subscriber creates a persistent subscription, the subscriber must subscribe to a topic before a publisher sends a message to the topic. However, the subscriber does not need to remain active to consume all messages sent to the topic. If a message is published when the subscriber is inactive, the subscriber can still consume the message after the subscriber becomes active. For information about the sample code, see Persistent subscriptions.

  • If an open source RabbitMQ client needs to access Alibaba Cloud services, you must use the AccessKey ID and AccessKey secret of your Alibaba Cloud account or a RAM user within the account to generate a pair of username and password. In the code of open source RabbitMQ SDK, set the userName parameter to the generated username and set the passWord parameter to the generated password. ApsaraMQ for RabbitMQ authenticates the RabbitMQ client by using the pair of username and password.

    Important

    If you use an SDK to send or receive messages on your client, we recommend that you use persistent connections. This way, the client does not need to establish connections every time you use the client to send or receive messages. Frequent creation of connections consumes a large number of network and broker resources and may even trigger protection against SYN flood attacks on the broker. For more information, see Connection.

Messaging process

消息收发

Obtain an endpoint

You must obtain the endpoint that is used to access your instance in the ApsaraMQ for RabbitMQ console. Before you send and receive messages, you must specify the endpoint in the code of the producer and consumer. Then, the producer and consumer can access the ApsaraMQ for RabbitMQ instance by using the endpoint.

  1. Log on to the ApsaraMQ for RabbitMQ console. In the left-side navigation pane, select Instances.

  2. In the top navigation bar of the Instances page, select a region. In the instance list, click the name of the instance that you want to manage.

  3. On the Endpoint Information tab of the Instance Details page, move the pointer over the type of endpoint that you want to use. Then, click the Copy icon on the right side of the endpoint to copy the endpoint.

    Type

    Description

    Example

    Public endpoint

    You can access an instance over the Internet to read and write data. By default, pay-as-you-go instances support public endpoints. To use a public endpoint for a subscription instance, you must select a public endpoint when you create the subscription instance.

    XXX.mq-amqp.cn-hangzhou-a.aliyuncs.com

    VPC endpoint

    You can access an instance over a virtual private cloud (VPC) to read and write data. By default, pay-as-you-go and subscription instances support VPC endpoints.

    XXX.mq-amqp.cn-hangzhou-a-internal.aliyuncs.com

Add JMS dependencies

Add the following dependencies to the pom.xml file:

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.5.0</version> <!-- All versions of open source RabbitMQ are supported. -->
</dependency>
<dependency>
    <groupId>com.alibaba.mq-amqp.jms</groupId>
    <artifactId>mq-amqp-jms-client</artifactId>
    <version>1.11.2-1.0.0</version>
    <exclusions>
        <exclusion>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-lang3</artifactId>
    <version>3.4</version>
</dependency>
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-simple</artifactId>
    <version>1.7.25</version>
    <scope>compile</scope>
</dependency>

Generate a pair of username and password

  1. Log on to the ApsaraMQ for RabbitMQ console. In the left-side navigation pane, select Instances.

  2. In the top navigation bar of the Instances page, select a region. In the instance list, click the name of the instance that you want to manage.

  3. In the left-side navigation pane, click Static Accounts.

  4. On the Static Accounts page, click Create Username/Password.

  5. In the Create Username/Password panel, configure the AccessKey ID and AccessKey Secret parameters. Then, click OK.

    Note

    You can obtain the values of the AccessKey ID and AccessKey Secret parameters in the RAM console. For more information, see Create an AccessKey pair.

    On the Static Accounts page, the created static username and password appears. The password is masked. Username and password

  6. In the Password column of the static username and password that you created, click Display to view the password that corresponds to the username.

Non-persistent subscriptions

Create, compile, and run Subscriber.java to subscribe to messages.

Important

Before you compile Subscriber.java and run it to subscribe to messages, you must configure the parameters described in Parameters based on the comments in the code.

Parameters

Parameter

Example

Description

hostName

1880770****.mq-amqp.cn-hangzhou-a.aliyuncs.com

The endpoint of the ApsaraMQ for RabbitMQ instance. In the ApsaraMQ for RabbitMQ console, you can obtain the endpoint on the Instance Details page.

Port

5672

The default port. Use port 5672 for non-encrypted connections and port 5671 for encrypted connections.

userName

MjoxODgwNzcwODY5MD****

The static username that is generated in the ApsaraMQ for RabbitMQ console. ApsaraMQ for RabbitMQ encodes the AccessKey pair of your Alibaba Cloud account or a RAM user within the account and the ID of the ApsaraMQ for RabbitMQ instance in Base64 to obtain a static username. In the ApsaraMQ for RabbitMQ console, you can obtain the static username on the Static Accounts page.

passWord

NDAxREVDQzI2MjA0OT****

The static password that is generated in the ApsaraMQ for RabbitMQ console. ApsaraMQ for RabbitMQ uses the HMAC-SHA1 algorithm to generate a signature based on the AccessKey secret of your Alibaba Cloud account or a RAM user within the account and the timestamp parameter, which indicates the current system time. Then, ApsaraMQ for RabbitMQ encodes the signature and the timestamp parameter in Base64 to obtain a static password. In the ApsaraMQ for RabbitMQ console, you can search for the created username and password based on the instance ID on the Static Accounts page.

virtualHost

Test

The vhost that you created in the ApsaraMQ for RabbitMQ instance. In the ApsaraMQ for RabbitMQ console, you can view the vhost on the vhosts page. For information about how to view a vhost, see View the information about the connection that is established to a vhost.

import com.rabbitmq.jms.admin.RMQConnectionFactory;
import com.rabbitmq.jms.client.AmqpConnectionFactoryPostProcessor;
import javax.jms.*;

public class Subscriber {
    public static String DESTINATION = "systemA.systemB.Price.*";

    public static RMQConnectionFactory getRMQConnectionFactory() {
        RMQConnectionFactory connectionFactory = new RMQConnectionFactory();
        connectionFactory.setHost("xxx.xxx.aliyuncs.com");
        connectionFactory.setUsername("${Username}");
        connectionFactory.setPassword("${Password}");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("VhostName");
        connectionFactory.setAmqpConnectionFactoryPostProcessor(new AmqpConnectionFactoryPostProcessor() {
            @Override
            public void postProcess(com.rabbitmq.client.ConnectionFactory cf) {
                cf.setAutomaticRecoveryEnabled(true);
                cf.setNetworkRecoveryInterval(5000);
                System.out.println(cf.getPassword());
            }
        });
        return connectionFactory;
    }

    public static void main(String[] args) throws Exception {
        RMQConnectionFactory connectionFactory = getRMQConnectionFactory();
        TopicConnection connection = connectionFactory.createTopicConnection();
        TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
        Topic destTopic = session.createTopic(DESTINATION);
        TopicSubscriber subscriber = session.createSubscriber(destTopic);
        connection.start();
        while (true) {
            Message msg = subscriber.receive(1000);
            if (msg == null) {
                System.out.println("No new message, sleeping 5 secs");
                Thread.sleep(5 * 1000);
                continue;
            }
            if (msg instanceof TextMessage) {
                String body = ((TextMessage) msg).getText();
                System.out.println(body);
            } else {
                System.out.println("Unexpected message type: " + msg.getClass());
            }
        }
    }
}

Persistent subscriptions

Create, compile, and run DurableSubscriber.java to subscribe to messages.

Important

Before you compile DurableSubscriber.java and run it to subscribe to messages, you must configure the parameters described in Parameters based on the comments in the code.

import com.rabbitmq.jms.admin.RMQConnectionFactory;
import com.rabbitmq.jms.client.AmqpConnectionFactoryPostProcessor;
import javax.jms.*;

public class DurableSubscriber {
    public static String DESTINATION = "systemA.systemB.Price.*";
    public static String CLIENT_ID = "client_id";
    public static String SUBSCRIBER_NAME = "subscriber_name";

    public static RMQConnectionFactory getRMQConnectionFactory() {
        RMQConnectionFactory connectionFactory = new RMQConnectionFactory();
        connectionFactory.setHost("xxx.xxx.aliyuncs.com");
        connectionFactory.setUsername("${Username}");
        connectionFactory.setPassword("${Password}");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("${VhostName}");
        connectionFactory.setAmqpConnectionFactoryPostProcessor(new AmqpConnectionFactoryPostProcessor() {
            @Override
            public void postProcess(com.rabbitmq.client.ConnectionFactory cf) {
                cf.setAutomaticRecoveryEnabled(true);
                cf.setNetworkRecoveryInterval(5000);
                System.out.println(cf.getPassword());
            }
        });
        return connectionFactory;
    }


    public static void main(String[] args) throws Exception {
        RMQConnectionFactory connectionFactory = getRMQConnectionFactory();
        TopicConnection connection = connectionFactory.createTopicConnection();
        connection.setClientID(CLIENT_ID);
        TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
        Topic Topic = session.createTopic(DESTINATION);
        TopicSubscriber subscriber = session.createDurableSubscriber(Topic,SUBSCRIBER_NAME);
        connection.start();
        while (true) {
            Message msg = subscriber.receive(1000);
            if (msg == null) {
                System.out.println("No new message, sleeping 5 secs");
                Thread.sleep(5 * 1000);
                continue;
            }
            if (msg instanceof TextMessage) {
                String body = ((TextMessage) msg).getText();
                System.out.println(body);
            } else {
                System.out.println("Unexpected message type: " + msg.getClass());
            }
        }
    }
}

Publish messages

Create, compile, and run Publisher.java to publish messages.

Important

Before you compile Publisher.java and run it to publish messages, you must configure the parameters described in Parameters based on the comments in the code.

import com.rabbitmq.jms.admin.RMQConnectionFactory;
import com.rabbitmq.jms.client.AmqpConnectionFactoryPostProcessor;
import javax.jms.*;

public class Publisher {

    public static String DESTINATION = "systemA.systemB.Price.aaa";

    public static RMQConnectionFactory getRMQConnectionFactory() {
        RMQConnectionFactory connectionFactory = new RMQConnectionFactory();
        connectionFactory.setHost("xxx.xxx.aliyuncs.com");
        connectionFactory.setUsername("${Username}");
        connectionFactory.setPassword("${Password}");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("${VhostName}");
        connectionFactory.setAmqpConnectionFactoryPostProcessor(new AmqpConnectionFactoryPostProcessor() {
            @Override
            public void postProcess(com.rabbitmq.client.ConnectionFactory cf) {
                cf.setAutomaticRecoveryEnabled(true);
                cf.setNetworkRecoveryInterval(5000);
                System.out.println(cf.getPassword());
            }
        });
        return connectionFactory;
    }

    public static void main(String[] args) throws JMSException {
        RMQConnectionFactory factory = getRMQConnectionFactory();
        TopicConnection connection = factory.createTopicConnection();
        TopicSession session = connection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE);
        TextMessage msg = session.createTextMessage("hello topic test1");
        Topic topic = session.createTopic(DESTINATION);
        TopicPublisher publisher = session.createPublisher(topic);
        publisher.send(msg);
        System.out.println("Message sent");
        session.close();
        connection.close();
    }
}