This topic describes how a Java Message Service (JMS) client connects to Message Queue for RabbitMQ and send and receive messages in publish/subscribe (Pub/Sub) mode in the scenario where a Resource Access Management (RAM) role is used to grant permissions across Alibaba Cloud accounts.

Prerequisites

Background information

  • Each message can have multiple consumers. After a producer sends a message to a topic, all consumers that subscribe to the topic can consume the message. JMS_pub_sub_model
  • Producers and consumers have timing dependencies.
    • If a consumer creates a nondurable subscription, the consumer must subscribe to a topic before a producer sends a message to the topic. In addition, the consumer must remain active to consume all the messages sent to the topic. If a message is sent when the consumer is inactive, the consumer cannot consume the message after it becomes active. For more information about the sample code, see Nondurable subscriptions.
    • If a consumer creates a durable subscription, the consumer must subscribe to a topic before a producer sends a message to the topic. However, the consumer does not need to remain active to consume all the messages sent to the topic. If a message is sent when the consumer is inactive, the consumer can consume the message after it becomes active. For more information about the sample code, see Durable subscriptions.
  • If you need to use the temporary Security Token Service (STS) token generated for a RAM role to access Message Queue for RabbitMQ, you must set the AccessKeyID, AccessKeySecret, and SecurityToken parameters in the AliyunCredentialsProvider class for permission verification.
    Notice If you use an SDK to send or receive messages on your client, we recommend that you use persistent connections. This way, the client does not need to establish connections every time you use the client to send or receive messages. Frequent creation of connections consumes a large number of network and broker resources and may even trigger protection against SYN flood attacks on the broker. For more information, see Connection.

Messaging process

JMS Pub Sub

Obtain an endpoint

You must obtain an endpoint of your instance in the Message Queue for RabbitMQ console. Before you send and receive messages, you must specify the endpoint in the code of the producer and consumer. Then, the producer and consumer can access the Message Queue for RabbitMQ instance by using the endpoint.

  1. Log on to the Message Queue for RabbitMQ console.
  2. In the Resource Distribution section of the Overview page, select the region where your instance is located.
  3. On the Instances page, click the name of your instance.
  4. On the Endpoint Information tab of the Instance Details page, move the pointer to the type of the endpoint that you want to use. Then, click the copy icon on the right side of the endpoint to copy the endpoint.
    Type Description Example
    Public endpoint You can access an instance from the Internet to read and write data. By default, pay-as-you-go instances have public endpoints. To use a public endpoint for a subscription instance, you must select a public endpoint when you create the subscription instance. XXX.mq-amqp.cn-hangzhou-a.aliyuncs.com
    VPC endpoint You can access an instance from a virtual private cloud (VPC) to read and write data. By default, both pay-as-you-go and subscription instances have VPC endpoints. XXX.mq-amqp.cn-hangzhou-a-internal.aliyuncs.com

Add JMS dependencies

Add the following dependencies to the pom.xml file:

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.5.0</version> <!-- All versions of open source RabbitMQ are supported. -->
</dependency>
<dependency>
    <groupId>com.alibaba.mq-amqp.jms</groupId>
    <artifactId>mq-amqp-jms-client</artifactId>
    <version>1.11.2-1.0.0</version>
    <exclusions>
        <exclusion>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>com.alibaba.mq-amqp</groupId>
    <artifactId>mq-amqp-client</artifactId>
    <version>1.0.5</version>
</dependency>
<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-lang3</artifactId>
    <version>3.4</version>
</dependency>
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-simple</artifactId>
    <version>1.7.25</version>
    <scope>compile</scope>
</dependency>

Configure AliyunCredentialsProvider.java

Create a permission verification class AliyunCredentialsProvider.java and set the AccessKeyID, AccessKeySecret, and SecurityToken parameters based on the comments in the code. For more information, see Table 1.

Table 1. Parameters
Parameter Example Description
hostName 1880770****.mq-amqp.cn-hangzhou-a.aliyuncs.com The endpoint of the Message Queue for RabbitMQ instance. In the Message Queue for RabbitMQ console, you can obtain the endpoint on the Instance Details page.
Port 5672 The default port. Use port 5672 for non-encrypted connections and port 5671 for encrypted connections.
AccessKeyID LTAI5tJQKnB9zVvQ**** The AccessKey ID of your Alibaba Cloud account or a RAM user within the account. You can log on to the RAM console, create a RAM role, grant the role the AliyunAMQPFullAccess permission, obtain the Alibaba Cloud Resource Name (ARN) of the RAM role, and then call the AssumeRole operation to obtain a temporary identity to assume the role. A successful call to the AssumeRole operation returns values of the AccessKeyID, AccessKeySecret, and SecurityToken of the RAM role. For more information about the ARN of a RAM role, see RAM role overview.
AccessKeySecret jw6MawfKOVBveRr84u**** The AccessKey secret of your Alibaba Cloud account or a RAM user within the account.
SecurityToken CAIS9wF1q6Ft5B2yfSjIr5fkJNPkvK5tgqeScX+ElnkMXvhvgIPO0Dz2IHhNfXZuB************ The STS token of the RAM role.
instanceId amqp-cn-v0h1kb9nu*** The ID of the Message Queue for RabbitMQ instance. In the Message Queue for RabbitMQ console, you can view the instance ID on the Instance Details page. For more information about how to view the ID of an instance, see View instance details.
virtualHost Test The vhost that you created in the Message Queue for RabbitMQ instance. In the Message Queue for RabbitMQ console, you can view the vhost on the vhosts page. For more information about how to view a vhost, see View vhost details.
import com.alibaba.mq.amqp.utils.UserUtils;
import com.rabbitmq.client.impl.CredentialsProvider;
import org.apache.commons.lang3.StringUtils;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
/**
 * The AliyunCredentialsProvider class that is used to generate a pair of dynamic username and password. 
 */
public class AliyunCredentialsProvider implements CredentialsProvider {
    /**
     * Access Key ID.
     */
    private final String accessKeyId;
    /**
     * Access Key Secret.
     */
    private final String accessKeySecret;
    /**
     * security temp token. (optional)
     */
    private final String securityToken;
    /**
     * The ID of the Message Queue for RabbitMQ instance. You can obtain the instance ID in the Message Queue for RabbitMQ console. 
     */
    private final String instanceId;
    public AliyunCredentialsProvider(final String accessKeyId, final String accessKeySecret,
                                     final String instanceId) {
        this(accessKeyId, accessKeySecret, null, instanceId);
    }
    public AliyunCredentialsProvider(final String accessKeyId, final String accessKeySecret,
                                     final String securityToken, final String instanceId) {
        this.accessKeyId = accessKeyId;
        this.accessKeySecret = accessKeySecret;
        this.securityToken = securityToken;
        this.instanceId = instanceId;
    }
    @Override
    public String getUsername() {
        if(StringUtils.isNotEmpty(securityToken)) {
            return UserUtils.getUserName(accessKeyId, instanceId, securityToken);
        } else {
            return UserUtils.getUserName(accessKeyId, instanceId);
        }
    }

    @Override
    public String getPassword() {
        try {
            return UserUtils.getPassord(accessKeySecret);
        } catch (InvalidKeyException e) {
            //todo
        } catch (NoSuchAlgorithmException e) {
            //todo
        }
        return null;
    }
}

Nondurable subscriptions

Create, compile, and run the Subscriber.java class.

Notice Before you compile Subscriber.java and run it to subscribe to messages, you must set the parameters described in Table 1 based on the comments in the code.
import com.rabbitmq.jms.admin.RMQConnectionFactory;
import com.rabbitmq.jms.client.AmqpConnectionFactoryPostProcessor;
import javax.jms.*;

public class Subscriber {
    public static String DESTINATION = "systemA.systemB.Price.*";

    public static RMQConnectionFactory getRMQConnectionFactory() {
        final AliyunCredentialsProvider provider = new AliyunCredentialsProvider("${AccessKeyID}", "${AccessKeySecret}","${SecurityToken}","${InstanceID}");
        RMQConnectionFactory connectionFactory = new RMQConnectionFactory();
        connectionFactory.setHost("xxx.xxx.aliyuncs.com");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("${VhostName}");
        connectionFactory.setAmqpConnectionFactoryPostProcessor(new AmqpConnectionFactoryPostProcessor() {
            @Override
            public void postProcess(com.rabbitmq.client.ConnectionFactory cf) {
                cf.setCredentialsProvider(provider);
                cf.setAutomaticRecoveryEnabled(true);
                cf.setNetworkRecoveryInterval(5000);
                System.out.println(cf.getPassword());
            }
        });
        return connectionFactory;
    }

    public static void main(String[] args) throws Exception {
        RMQConnectionFactory connectionFactory = getRMQConnectionFactory();
        TopicConnection connection = connectionFactory.createTopicConnection();
        TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
        Topic destTopic = session.createTopic(DESTINATION);
        TopicSubscriber subscriber = session.createSubscriber(destTopic);
        connection.start();
        while (true) {
            Message msg = subscriber.receive(1000);
            if (msg == null) {
                System.out.println("No new message, sleeping 5 secs");
                Thread.sleep(5 * 1000);
                continue;
            }
            if (msg instanceof TextMessage) {
                String body = ((TextMessage) msg).getText();
                System.out.println(body);
            } else {
                System.out.println("Unexpected message type: " + msg.getClass());
            }
        }
    }
}

Durable subscriptions

Create, compile, and run the DurableSubscriber.java class.

Notice Before you compile DurableSubscriber.java and run it to subscribe to messages, you must set the parameters described in Table 1 based on the comments in the code.
import com.rabbitmq.jms.admin.RMQConnectionFactory;
import com.rabbitmq.jms.client.AmqpConnectionFactoryPostProcessor;
import javax.jms.*;

public class DurableSubscriber {
    public static String DESTINATION = "systemA.systemB.Price.*";
    public static String CLIENT_ID = "client_id";
    public static String SUBSCRIBER_NAME = "subscriber_name";

    public static RMQConnectionFactory getRMQConnectionFactory() {
    final AliyunCredentialsProvider provider = new AliyunCredentialsProvider("${AccessKeyID}", "${AccessKeySecret}","${SecurityToken}","${InstanceID}");
        RMQConnectionFactory connectionFactory = new RMQConnectionFactory();
        connectionFactory.setHost("xxx.xxx.aliyuncs.com");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("${VhostName}");
        connectionFactory.setAmqpConnectionFactoryPostProcessor(new AmqpConnectionFactoryPostProcessor() {
            @Override
            public void postProcess(com.rabbitmq.client.ConnectionFactory cf) {
                cf.setCredentialsProvider(provider);
                cf.setAutomaticRecoveryEnabled(true);
                cf.setNetworkRecoveryInterval(5000);
                System.out.println(cf.getPassword());
            }
        });
        return connectionFactory;
    }


    public static void main(String[] args) throws Exception {
        RMQConnectionFactory connectionFactory = getRMQConnectionFactory();
        TopicConnection connection = connectionFactory.createTopicConnection();
        connection.setClientID(CLIENT_ID);
        TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
        Topic Topic = session.createTopic(DESTINATION);
        TopicSubscriber subscriber = session.createDurableSubscriber(Topic,SUBSCRIBER_NAME);
        connection.start();
        while (true) {
            Message msg = subscriber.receive(1000);
            if (msg == null) {
                System.out.println("No new message, sleeping 5 secs");
                Thread.sleep(5 * 1000);
                continue;
            }
            if (msg instanceof TextMessage) {
                String body = ((TextMessage) msg).getText();
                System.out.println(body);
            } else {
                System.out.println("Unexpected message type: " + msg.getClass());
            }
        }
    }
}

Send messages

Create, compile, and run the Publisher.java class.

Notice Before you compile Publisher.java and run it to send messages, you must set the parameters described in Table 1 based on the comments in the code.
import com.rabbitmq.jms.admin.RMQConnectionFactory;
import com.rabbitmq.jms.client.AmqpConnectionFactoryPostProcessor;
import javax.jms.*;

public class Publisher {

    public static String DESTINATION = "systemA.systemB.Price.aaa";

    public static RMQConnectionFactory getRMQConnectionFactory() {
               final AliyunCredentialsProvider provider = new AliyunCredentialsProvider("${AccessKeyID}", "${AccessKeySecret}","${SecurityToken}","${InstanceID}");
        RMQConnectionFactory connectionFactory = new RMQConnectionFactory();
        connectionFactory.setHost("xxx.xxx.aliyuncs.com");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("${VhostName}");
        connectionFactory.setAmqpConnectionFactoryPostProcessor(new AmqpConnectionFactoryPostProcessor() {
            @Override
            public void postProcess(com.rabbitmq.client.ConnectionFactory cf) {
                cf.setCredentialsProvider(provider);
                cf.setAutomaticRecoveryEnabled(true);
                cf.setNetworkRecoveryInterval(5000);
                System.out.println(cf.getPassword());
            }
        });
        return connectionFactory;
    }

    public static void main(String[] args) throws JMSException {
        RMQConnectionFactory factory = getRMQConnectionFactory();
        TopicConnection connection = factory.createTopicConnection();
        TopicSession session = connection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE);
        TextMessage msg = session.createTextMessage("hello topic test");
        Topic topic = session.createTopic(DESTINATION);
        TopicPublisher publisher = session.createPublisher(topic);
        publisher.send(msg);
        System.out.println("Message sent");
        session.close();
        connection.close();
    }
}