All Products
Search
Document Center

ApsaraMQ for RabbitMQ:JMS Pub/Sub (cross-account authorization by using a RAM role)

Last Updated:Sep 08, 2023

This topic describes how to connect a Java Message Service (JMS) client to ApsaraMQ for RabbitMQ to send and receive messages in publish-subscribe (Pub/Sub) messaging pattern in scenarios in which a RAM role is used to grant permissions across Alibaba Cloud accounts.

Prerequisites

Background information

  • Each message can have multiple consumers. After a producer sends a message to a topic, all consumers that subscribe to the topic can consume the message. JMS_pub_sub_model

  • Timing dependencies exist between publishers and subscribers.

    • If a subscriber creates a non-persistent subscription, the subscriber must subscribe to a topic before a publisher sends a message to the topic. In addition, the subscriber must remain active to consume all messages that are sent to the topic. If a message is published when the subscriber is inactive, the subscriber cannot consume the message even after the subscriber becomes active. For information about the sample code, see Non-persistent subscriptions.

    • If a subscriber creates a persistent subscription, the subscriber must subscribe to a topic before a publisher sends a message to the topic. However, the subscriber does not need to remain active to consume all messages that are sent to the topic. If a message is published when the subscriber is inactive, the subscriber can still consume the message after the subscriber becomes active. For information about the sample code, see Persistent subscriptions.

  • If you need to use the temporary Security Token Service (STS) token generated for a RAM role to access ApsaraMQ for RabbitMQ, you must set the AccessKey ID, AccessKey Secret, and SecurityToken parameters in the AliyunCredentialsProvider class for permission verification.

    Important

    If you use an SDK to send or receive messages on your client, we recommend that you use persistent connections. This way, the client does not need to establish connections every time you use the client to send or receive messages. Frequent creation of connections consumes a large number of network and broker resources and may even trigger protection against SYN flood attacks on the broker. For more information, see Connection.

Messaging process

JMS Pub Sub

Obtain an endpoint

You must obtain the endpoint that is used to access your instance in the ApsaraMQ for RabbitMQ console. Before you send and receive messages, you must specify the endpoint in the code of the producer and consumer. Then, the producer and consumer can access the ApsaraMQ for RabbitMQ instance by using the endpoint.

  1. Log on to the ApsaraMQ for RabbitMQ console. In the left-side navigation pane, select Instances.

  2. In the top navigation bar of the Instances page, select a region. In the instance list, click the name of the instance that you want to manage.

  3. On the Endpoint Information tab of the Instance Details page, move the pointer over the type of endpoint that you want to use. Then, click the Copy icon on the right side of the endpoint to copy the endpoint.

    Type

    Description

    Example

    Public endpoint

    You can access an instance over the Internet to read and write data. By default, pay-as-you-go instances support public endpoints. To use a public endpoint for a subscription instance, you must select a public endpoint when you create the subscription instance.

    XXX.mq-amqp.cn-hangzhou-a.aliyuncs.com

    VPC endpoint

    You can access an instance over a virtual private cloud (VPC) to read and write data. By default, pay-as-you-go and subscription instances support VPC endpoints.

    XXX.mq-amqp.cn-hangzhou-a-internal.aliyuncs.com

Add JMS dependencies

Add the following dependencies to the pom.xml file:

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.5.0</version> <!-- All versions of open source RabbitMQ are supported. -->
</dependency>
<dependency>
    <groupId>com.alibaba.mq-amqp.jms</groupId>
    <artifactId>mq-amqp-jms-client</artifactId>
    <version>1.11.2-1.0.0</version>
    <exclusions>
        <exclusion>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>com.alibaba.mq-amqp</groupId>
    <artifactId>mq-amqp-client</artifactId>
    <version>1.0.5</version>
</dependency>
<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-lang3</artifactId>
    <version>3.4</version>
</dependency>
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-simple</artifactId>
    <version>1.7.25</version>
    <scope>compile</scope>
</dependency>

Configure the AliyunCredentialsProvider.java class

  1. Create a .properties configuration file and configure the AccessKeyID, AccessKeySecret and SecurityToken parameters in the file. For information about how to configure the parameters, see Parameters.

    # Access Key ID.
    accessKeyId=${accessKeyId}
    # Access Key Secret.
    accessKeySecret=${accessKeySecret}
    # security temp token. (optional)
    securityToken=${securityToken}
  2. Create the AliyunCredentialsProvider.java class for authentication and configure the parameters based on code comments. For more information, see Parameters.

    import java.io.FileReader;
    import java.io.IOException;
    import java.util.Properties;
    
    import com.alibaba.mq.amqp.utils.UserUtils;
    import com.rabbitmq.client.impl.CredentialsProvider;
    import org.apache.commons.lang3.StringUtils;
    import java.security.InvalidKeyException;
    import java.security.NoSuchAlgorithmException;
    
    public class AliyunCredentialsProvider implements CredentialsProvider {
    
        /**
         * Access Key ID.
         */
        private static final String accessKeyId;
        /**
         * Access Key Secret.
         */
        private static final String accessKeySecret;
        /**
         * security temp token. (optional)
         */
        private static final String securityToken;
        /**
         * The ID of the ApsaraMQ for RabbitMQ instance. You can obtain the instance ID in the ApsaraMQ for RabbitMQ console. 
         */
        private final String instanceId;
    
        // The AccessKey pair of your Alibaba Cloud account can be used to access all API operations. We recommend that you use the AccessKey pair of a RAM user to access API operations and perform routine O&M. 
        // We strongly recommend that you do not save your AccessKey pair in your project code. Otherwise, your AccessKey pair may be leaked and all resources that are contained in your Alibaba Cloud account are exposed to potential security risks. 
        // In this example, the AccessKey pair is saved in the configuration file. 
        static {
            // Replace ${ConfigPath} with the path of the .properties file that you created. 
            String path = "${ConfigPath}";
            Properties properties = new Properties();
    
            try {
                properties.load(new FileReader(path));
            } catch (IOException e) {
                // Automatically handle exceptions that occur during the reading of the configuration file. 
            }
    
            accessKeyId = properties.getProperty("accessKeyId");
            accessKeySecret = properties.getProperty("accessKeySecret");
            securityToken = properties.getProperty("securityToken");
    
        }
    
        public AliyunCredentialsProvider(final String instanceId) {
            this.instanceId = instanceId;
        }
    
        @Override
        public String getUsername() {
            if(StringUtils.isNotEmpty(securityToken)) {
                return UserUtils.getUserName(accessKeyId, instanceId, securityToken);
            } else {
                return UserUtils.getUserName(accessKeyId, instanceId);
            }
        }
    
        @Override
        public String getPassword() {
            try {
                return UserUtils.getPassord(accessKeySecret);
            } catch (InvalidKeyException e) {
                //todo
            } catch (NoSuchAlgorithmException e) {
                //todo
            }
            return null;
        }
    }

Parameters

Parameter

Example

Description

hostName

1880770****.mq-amqp.cn-hangzhou-a.aliyuncs.com

The endpoint of the ApsaraMQ for RabbitMQ instance. In the ApsaraMQ for RabbitMQ console, you can obtain the endpoint on the Instance Details page.

Port

5672

The default port. Use port 5672 for non-encrypted connections and port 5671 for encrypted connections.

AccessKeyID

LTAI5tJQKnB9zVvQ****

The AccessKey ID of your Alibaba Cloud account or a RAM user within the Alibaba Cloud account. You can log on to the RAM console, create a RAM role, grant the role the AliyunAMQPFullAccess permission, obtain the Alibaba Cloud Resource Name (ARN) of the RAM role, and then call the AssumeRole operation to obtain a temporary identity to assume the role. A successful call to the AssumeRole operation returns values of the AccessKeyID, AccessKeySecret, and SecurityToken of the RAM role. For more information about the ARN of a RAM role, see RAM role overview.

AccessKeySecret

jw6MawfKOVBveRr84u****

The AccessKey secret of your Alibaba Cloud account or a RAM user within the Alibaba Cloud account.

SecurityToken

CAIS9wF1q6Ft5B2yfSjIr5fkJNPkvK5tgqeScX+ElnkMXvhvgIPO0Dz2IHhNfXZuB************

The STS token of the RAM role.

instanceId

amqp-cn-v0h1kb9nu***

The ID of the ApsaraMQ for RabbitMQ instance. In the ApsaraMQ for RabbitMQ console, you can view the instance ID on the Instance Details page. For information about how to view the ID of an instance, see View the details of an instance.

virtualHost

Test

The vhost that you created in the ApsaraMQ for RabbitMQ instance. In the ApsaraMQ for RabbitMQ console, you can view the vhost on the vhosts page. For information about how to view a vhost, see View the information about the connection that is established to a vhost.

Non-persistent subscriptions

Create, compile, and run Subscriber.java to subscribe to messages.

Important

Before you compile Subscriber.java and run it to subscribe to messages, you must configure the parameters that are described in Parameters based on the comments in the code.

import com.rabbitmq.jms.admin.RMQConnectionFactory;
import com.rabbitmq.jms.client.AmqpConnectionFactoryPostProcessor;
import javax.jms.*;

public class Subscriber {
    public static String DESTINATION = "systemA.systemB.Price.*";

    public static RMQConnectionFactory getRMQConnectionFactory() {
        final AliyunCredentialsProvider provider = new AliyunCredentialsProvider("${InstanceID}");
        RMQConnectionFactory connectionFactory = new RMQConnectionFactory();
        connectionFactory.setHost("xxx.xxx.aliyuncs.com");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("${VhostName}");
        connectionFactory.setAmqpConnectionFactoryPostProcessor(new AmqpConnectionFactoryPostProcessor() {
            @Override
            public void postProcess(com.rabbitmq.client.ConnectionFactory cf) {
                cf.setCredentialsProvider(provider);
                cf.setAutomaticRecoveryEnabled(true);
                cf.setNetworkRecoveryInterval(5000);
                System.out.println(cf.getPassword());
            }
        });
        return connectionFactory;
    }

    public static void main(String[] args) throws Exception {
        RMQConnectionFactory connectionFactory = getRMQConnectionFactory();
        TopicConnection connection = connectionFactory.createTopicConnection();
        TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
        Topic destTopic = session.createTopic(DESTINATION);
        TopicSubscriber subscriber = session.createSubscriber(destTopic);
        connection.start();
        while (true) {
            Message msg = subscriber.receive(1000);
            if (msg == null) {
                System.out.println("No new message, sleeping 5 secs");
                Thread.sleep(5 * 1000);
                continue;
            }
            if (msg instanceof TextMessage) {
                String body = ((TextMessage) msg).getText();
                System.out.println(body);
            } else {
                System.out.println("Unexpected message type: " + msg.getClass());
            }
        }
    }
}

Persistent subscriptions

Create, compile, and run DurableSubscriber.java to subscribe to messages.

Important

Before you compile DurableSubscriber.java and run it to subscribe to messages, you must configure the parameters that are described in Parameters based on the comments in the code.

import com.rabbitmq.jms.admin.RMQConnectionFactory;
import com.rabbitmq.jms.client.AmqpConnectionFactoryPostProcessor;
import javax.jms.*;

public class DurableSubscriber {
    public static String DESTINATION = "systemA.systemB.Price.*";
    public static String CLIENT_ID = "client_id";
    public static String SUBSCRIBER_NAME = "subscriber_name";

    public static RMQConnectionFactory getRMQConnectionFactory() {
    final AliyunCredentialsProvider provider = new AliyunCredentialsProvider("${InstanceID}");
        RMQConnectionFactory connectionFactory = new RMQConnectionFactory();
        connectionFactory.setHost("xxx.xxx.aliyuncs.com");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("${VhostName}");
        connectionFactory.setAmqpConnectionFactoryPostProcessor(new AmqpConnectionFactoryPostProcessor() {
            @Override
            public void postProcess(com.rabbitmq.client.ConnectionFactory cf) {
                cf.setCredentialsProvider(provider);
                cf.setAutomaticRecoveryEnabled(true);
                cf.setNetworkRecoveryInterval(5000);
                System.out.println(cf.getPassword());
            }
        });
        return connectionFactory;
    }


    public static void main(String[] args) throws Exception {
        RMQConnectionFactory connectionFactory = getRMQConnectionFactory();
        TopicConnection connection = connectionFactory.createTopicConnection();
        connection.setClientID(CLIENT_ID);
        TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
        Topic Topic = session.createTopic(DESTINATION);
        TopicSubscriber subscriber = session.createDurableSubscriber(Topic,SUBSCRIBER_NAME);
        connection.start();
        while (true) {
            Message msg = subscriber.receive(1000);
            if (msg == null) {
                System.out.println("No new message, sleeping 5 secs");
                Thread.sleep(5 * 1000);
                continue;
            }
            if (msg instanceof TextMessage) {
                String body = ((TextMessage) msg).getText();
                System.out.println(body);
            } else {
                System.out.println("Unexpected message type: " + msg.getClass());
            }
        }
    }
}

Produce messages

Create, compile, and run Publisher.java to produce messages.

Important

Before you compile Publisher.java and run it to send messages, you must configure the parameters that are described in Parameters based on the comments in the code.

import com.rabbitmq.jms.admin.RMQConnectionFactory;
import com.rabbitmq.jms.client.AmqpConnectionFactoryPostProcessor;
import javax.jms.*;

public class Publisher {

    public static String DESTINATION = "systemA.systemB.Price.aaa";

    public static RMQConnectionFactory getRMQConnectionFactory() {
               final AliyunCredentialsProvider provider = new AliyunCredentialsProvider("${InstanceID}");
        RMQConnectionFactory connectionFactory = new RMQConnectionFactory();
        connectionFactory.setHost("xxx.xxx.aliyuncs.com");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("${VhostName}");
        connectionFactory.setAmqpConnectionFactoryPostProcessor(new AmqpConnectionFactoryPostProcessor() {
            @Override
            public void postProcess(com.rabbitmq.client.ConnectionFactory cf) {
                cf.setCredentialsProvider(provider);
                cf.setAutomaticRecoveryEnabled(true);
                cf.setNetworkRecoveryInterval(5000);
                System.out.println(cf.getPassword());
            }
        });
        return connectionFactory;
    }

    public static void main(String[] args) throws JMSException {
        RMQConnectionFactory factory = getRMQConnectionFactory();
        TopicConnection connection = factory.createTopicConnection();
        TopicSession session = connection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE);
        TextMessage msg = session.createTextMessage("hello topic test");
        Topic topic = session.createTopic(DESTINATION);
        TopicPublisher publisher = session.createPublisher(topic);
        publisher.send(msg);
        System.out.println("Message sent.");
        session.close();
        connection.close();
    }
}