All Products
Search
Document Center

:Open source RabbitMQ SDKs (encrypted data transmission)

Last Updated:Feb 01, 2024

ApsaraMQ for RabbitMQ allows you to use the Transport Layer Security (TLS) encryption method to transmit data. This topic describes how to use an open source RabbitMQ SDK to send and receive messages in scenarios in which TLS encryption is used to transmit data. In the example of this topic, the SDK for Java is used. The process for sending and receiving messages by using open source RabbitMQ SDKs for other programming languages or frameworks is similar to the process described in this topic.

Prerequisites

  • An ApsaraMQ for RabbitMQ instance is created. For more information, see Manage instances.

  • A vhost is created. For more information, see Manage vhosts.

Background information

Open source RabbitMQ allows you to connect clients to the cloud by using a pair of username and password or an AccessKey pair. If you want to encrypt the transmitted data, you must change the port to port 5671 and enable TLS. By default, TLS v1.2 is used.

Messaging process (Java used as an example)

开源SDK收发流程

Obtain an endpoint

Before you send and receive messages, you must obtain the endpoint that is used to access the ApsaraMQ for RabbitMQ instance in the ApsaraMQ for RabbitMQ console and specify the endpoint for a publisher and a subscriber. This way, the publisher and the subscriber can use the endpoint to access the ApsaraMQ for RabbitMQ instance.

  1. Log on to the ApsaraMQ for RabbitMQ console. In the left-side navigation pane, click Instances.

  2. In the top navigation bar of the Instances page, select the region where the instance that you want to manage resides. Then, in the instance list, click the name of the instance that you want to manage.

  3. On the Endpoint Information tab of the Instance Details page, move the pointer over the type of endpoint that you want to use. Then, click the 复制 icon next to the endpoint to copy the endpoint.

    Type

    Description

    Example

    Public endpoint

    You can access an instance over the Internet to read and write data. By default, pay-as-you-go instances support public endpoints. To use the public endpoint of a subscription instance, you must select a public endpoint when you create the subscription instance.

    XXX.mq-amqp.cn-hangzhou-a.aliyuncs.com

    VPC endpoint

    You can access an instance over a VPC to read and write data. By default, pay-as-you-go and subscription instances support VPC endpoints.

    XXX.mq-amqp.cn-hangzhou-a-internal.aliyuncs.com

Install the Java dependency

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.5.0</version> <!-- All versions of open source RabbitMQ are supported. -->
</dependency>

Generate a pair of username and password

  1. Log on to the ApsaraMQ for RabbitMQ console. In the left-side navigation pane, click Instances.

  2. In the top navigation bar of the Instances page, select the region where the instance that you want to manage resides. Then, in the instance list, click the name of the instance that you want to manage.

  3. In the left-side navigation pane, click Static Accounts.

  4. On the Static Accounts page, click Create Username/Password.

  5. In the Create Username/Password panel, configure the AccessKey ID and AccessKey Secret parameters. Then, click OK.

    Note

    You can obtain the values of the AccessKey ID and AccessKey Secret parameters in the RAM console. For more information, see Create an AccessKey pair.

    On the Static Accounts page, the created static username and password appear. The password is masked. Username and password

  6. In the Password column of the static username and password that you created, click Display to view the password that corresponds to the username.

Produce messages

Create, compile, and run ProducerTest.java to produce messages.

Important

Before you compile and run ProducerTest.java to produce messages, you must configure the parameters that are described in Table 1. Parameters based on the code comments.

Table 1. Parameters

Parameter

Example

Description

hostName

XXX.net.mq.amqp.aliyuncs.com

The endpoint that is used to access the ApsaraMQ for RabbitMQ instance. For information about how to obtain the endpoint, see Step 2: Create resources.

Port

5671

The default port. Use port 5672 for non-encrypted connections and port 5671 for encrypted connections.

userName

MjoxODgwNzcwODY5MD****

The static username that is used for permission authentication when you connect the client to the ApsaraMQ for RabbitMQ broker.

You must create the static username in the ApsaraMQ for RabbitMQ console in advance.

For more information, see Create a pair of username and password.

passWord

NDAxREVDQzI2MjA0OT****

The static password that is used for permission authentication when you connect the client to the ApsaraMQ for RabbitMQ broker.

You must create the static password in the ApsaraMQ for RabbitMQ console in advance.

For more information, see Create a pair of username and password.

virtualHost

amqp_vhost

The vhost that you created on the ApsaraMQ for RabbitMQ instance. You must create the vhost in the ApsaraMQ for RabbitMQ console in advance.

For more information, see Create a vhost.

exchangeName

ExchangeTest

The exchange that you created on the ApsaraMQ for RabbitMQ instance.

You must create the exchange in the ApsaraMQ for RabbitMQ console in advance.

For more information, see Create an exchange.

queueName

QueueTest

The queue that you created on the ApsaraMQ for RabbitMQ instance.

You must create the queue in the ApsaraMQ for RabbitMQ console in advance.

For more information, see Create a queue.

bindingKey

BindingKeyTest

The binding key that is used to bind an ApsaraMQ for RabbitMQ exchange to the queue.

You must create the binding in the ApsaraMQ for RabbitMQ console in advance.

For more information, see Create a binding.

exchangeType

topic

The exchange type. ApsaraMQ for RabbitMQ supports the following types of exchanges. For more information, see Exchange.

  • direct

  • topic

  • fanout

  • headers

  • x-jms-topic

  • x-delayed-message

  • x-consistent-hash

Important

Make sure that the exchange type that you specify is the same as the exchange type that you selected when you created the exchange.

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.HashMap;
import java.util.UUID;
import java.util.concurrent.TimeoutException;

public class ProducerTest {
    public static void main(String[] args) throws IOException, TimeoutException, NoSuchAlgorithmException, KeyManagementException {
        // The endpoint of the ApsaraMQ for RabbitMQ instance. 
        String hostName = "xxx.xxx.aliyuncs.com";
        // The static username and password of the ApsaraMQ for RabbitMQ instance. 
        String userName = "${UserName}";
        String passWord = "${PassWord}";
        // The vhost of the ApsaraMQ for RabbitMQ instance. 
        String virtualHost = "${VirtualHost}";

        // In production environments, we recommend that you create the connection in advance and reuse it when needed. This prevents frequent establishment and closing of connections and improves system performance and stability. 
        Connection connection = createConnection(hostName, userName, passWord, virtualHost);
        Channel channel = connection.createChannel();

        // The binding between the exchange and the queue. 
        String exchangeName = "${ExchangeName}";
        String queueName = "${QueueName}";
        String bindingKey = "${BindingKey}";
        // The exchange type. 
        String exchangeType = "${ExchangeType}";

        // To ensure user experience, make sure that the exchange and the queue are created by using a suitable method. 
        // In production environments, we recommend that you create the exchange and the queue in the ApsaraMQ for RabbitMQ console in advance. Do not declare the exchange and the queue in the code. Otherwise, throttling is triggered for the corresponding API operation. 
        channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
        channel.queueDeclare(queueName, true, false, false, new HashMap<String, Object>());
        channel.queueBind(queueName, exchangeName, bindingKey);
        // Send a message. 
        for (int i = 0; i < 10; i++) {
            AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(UUID.randomUUID().toString()).build();
            channel.basicPublish(exchangeName, bindingKey, true, props,
                    ("Sample message body-" + i).getBytes(StandardCharsets.UTF_8));
            System.out.println("[SendResult] Message sent successfully, messageId: " + props.getMessageId() + ", exchange: " + exchangeName + ", routingKey: " + bindingKey);
        }
        connection.close();
    }

    public static Connection createConnection(String hostName, String userName, String passWord, String virtualHost) throws IOException, TimeoutException, NoSuchAlgorithmException, KeyManagementException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(hostName);
        factory.setUsername(userName);
        factory.setPassword(passWord);
        // Specify whether to enable automatic connection recovery. If you set this parameter to true, automatic connection recovery is enabled. If you set this parameter to false, automatic connection recovery is disabled. 
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(5000);
        factory.setVirtualHost(virtualHost);
        // The default port. Port 5672 is used for non-encrypted connections. Port 5671 is used for encrypted connections. 
        factory.setPort(5671);
        factory.useSslProtocol();
        // The timeout period. Configure this parameter based on the network environment. 
        factory.setConnectionTimeout(30 * 1000);
        factory.setHandshakeTimeout(30 * 1000);
        factory.setShutdownTimeout(0);
        Connection connection = factory.newConnection();
        return connection;
    }
}

Subscribe to messages

Create, compile, and run ConsumerTest.java to subscribe to messages.

Important

Before you compile and run ConsumerTest.java to subscribe to messages, you must configure the parameters that are described in Table 1. Parameters based on the code comments.

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.HashMap;
import java.util.concurrent.TimeoutException;

public class ConsumerTest {
    public static void main(String[] args) throws IOException, TimeoutException, NoSuchAlgorithmException, KeyManagementException {
        // The endpoint of the ApsaraMQ for RabbitMQ instance. 
        String hostName = "xxx.xxx.aliyuncs.com";
        // The static username and password of the ApsaraMQ for RabbitMQ instance. 
        String userName = "${UserName}";
        String passWord = "${PassWord}";
        // The vhost of the ApsaraMQ for RabbitMQ instance. 
        String virtualHost = "${VirtualHost}";

        Connection connection = createConnection(hostName, userName, passWord, virtualHost);
        final Channel channel = connection.createChannel();

        // Declare the queue. 
        String queueName = "${QueueName}";
        // Create the ${QueueName} queue. Make sure that the queue exists in the ApsaraMQ for RabbitMQ console. 
        AMQP.Queue.DeclareOk queueDeclareOk = channel.queueDeclare(queueName, true, false, false, new HashMap<String, Object>());

        // Consume a message. 
        channel.basicConsume(queueName, false, "ConsumerTag", new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
            throws IOException {
                // Process the received message based on the message consumption logic. 
                System.out.println("Received: "  + new String(body, StandardCharsets.UTF_8) +  ", deliveryTag: "  + envelope.getDeliveryTag()  + ", messageId: " +  properties.getMessageId());
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });
    }

    public static Connection createConnection(String hostName, String userName, String passWord, String virtualHost) throws IOException, TimeoutException, NoSuchAlgorithmException, KeyManagementException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(hostName);
        factory.setUsername(userName);
        factory.setPassword(passWord);
        // Specify whether to enable automatic connection recovery. If you set this parameter to true, automatic connection recovery is enabled. If you set this parameter to false, automatic connection recovery is disabled. 
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(5000);
        factory.setVirtualHost(virtualHost);
        // The default port. Use port 5672 for non-encrypted connections and port 5671 for encrypted connections. 
        factory.setPort(5671);
        factory.useSslProtocol();
        factory.setConnectionTimeout(300 * 1000);
        factory.setHandshakeTimeout(300 * 1000);
        factory.setShutdownTimeout(0);
        Connection connection = factory.newConnection();
        return connection;
    };
}