All Products
Search
Document Center

ApsaraMQ for RabbitMQ:Step 3: Use an SDK to send and receive messages

Last Updated:Jan 21, 2025

This topic describes how to use an open source SDK to connect to an ApsaraMQ for RabbitMQ broker to send and receive messages. In this topic, the SDK for Java is used.

Before you start

Obtain the endpoint of an instance

Before you send and receive messages, you must specify the endpoint that a producer or a consumer can use to access the ApsaraMQ for RabbitMQ instance.

  1. Log on to the ApsaraMQ for RabbitMQ console. In the left-side navigation pane, click Instances.

  2. In the top navigation bar of the Instances page, select the region where the instance that you want to manage resides. Then, in the instance list, click the name of the instance that you want to manage.

  3. On the Endpoint Information tab of the Instance Details page, move the pointer over the type of endpoint that you want to use. Then, click the 复制 icon next to the endpoint to copy the endpoint.

    Type

    Description

    Example

    Public endpoint

    You can access an instance from the Internet to read and write data. By default, pay-as-you-go instances support public endpoints. To use the public endpoint of a subscription instance, you must enable Internet access when you create the subscription instance.

    XXX.net.mq.amqp.aliyuncs.com

    VPC endpoint

    You can access an instance in a virtual private cloud (VPC) to read and write data. By default, pay-as-you-go and subscription instances support VPC endpoints.

    XXX.vpc.mq.amqp.aliyuncs.com

Install Java dependency

  1. Create a Java project in IntelliJ IDEA.

  2. Add the following dependency to the pom.xml file to import the Java dependency library:

    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.5.0</version> <!-- All versions of open source RabbitMQ are supported. -->
    </dependency>

Generate a pair of username and password

If you want to connect an open source RabbitMQ client to Alibaba Cloud services, you must use the AccessKey ID and AccessKey secret of your Alibaba Cloud account or a RAM user within the account to generate a pair of username and password and then set the userName parameter to the generated username and the passWord parameter to the generated password in the code of the open source RabbitMQ SDK. ApsaraMQ for RabbitMQ authenticates the open source RabbitMQ client by using the pair of username and password.

  1. Log on to the ApsaraMQ for RabbitMQ console. In the left-side navigation pane, click Instances.

  2. In the top navigation bar of the Instances page, select the region where the instance that you want to manage resides. Then, in the instance list, click the name of the instance that you want to manage.

  3. In the left-side navigation pane, click Static Accounts.

  4. On the Static Accounts page, click Create Username/Password.

  5. In the Create Username/Password panel, configure the AccessKey ID and AccessKey Secret parameters and click OK.

    Note

    You can obtain the values of the AccessKey ID and AccessKey Secret parameters in the RAM console. For more information, see Create an AccessKey pair.

    On the Static Accounts page, the created pair of static username and password appears. The password is masked.用户名密码

  6. In the Password column of the created pair of static username and password, click Display to view the password.

Create a client connection

Create the ConnectionFactory.java connection factory to start the connection between the open source RabbitMQ client and the ApsaraMQ for RabbitMQ broker.


import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.security.KeyManagementException;
import java.security.KeyStore;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.util.concurrent.TimeoutException;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManagerFactory;

public class ConnectionFactory {
    private final String hostName;
    private final int port;
    private final String userName;
    private final String password;
    private final String virtualHost;
    private final boolean enableSSL;

    public ConnectionFactory(String hostName, int port, String userName,
                            String password, String virtualHost, boolean enableSSL) {
        this.hostName = hostName;
        this.port = port;
        this.userName = userName;
        this.password = password;
        this.virtualHost = virtualHost;
        this.enableSSL = enableSSL;
    }

    public Channel createChannel() throws IOException, TimeoutException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
        //create a new con
        Connection con = createCon();

        //create a new channel
        return con.createChannel();
    }

    private Connection createCon() throws IOException, TimeoutException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
        com.rabbitmq.client.ConnectionFactory factory = new com.rabbitmq.client.ConnectionFactory();

        factory.setHost(hostName);
        factory.setUsername(userName);
        factory.setPassword(password);

        // Specifies whether to enable automatic connection recovery. If you set this parameter to true, automatic connection recovery is enabled. If you set this parameter to false, automatic connection recovery is disabled. 
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(5000);
        factory.setVirtualHost(virtualHost);
        // The default port. 
        factory.setPort(port);

        if (enableSSL) {
            setSSL(factory);
        }

        // The timeout period. Configure this parameter based on the network environment. 
        factory.setConnectionTimeout(30 * 1000);
        factory.setHandshakeTimeout(30 * 1000);
        factory.setShutdownTimeout(0);

        return factory.newConnection();
    }

    private void setSSL(com.rabbitmq.client.ConnectionFactory factory) throws NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
        SSLContext sslContext = SSLContext.getInstance("TLSv1.2");
        TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
        trustManagerFactory.init((KeyStore) null);
        sslContext.init(null, trustManagerFactory.getTrustManagers(), null);
        factory.useSslProtocol(sslContext);
    }

    public void closeCon(Channel channel) {
        if (channel != null && channel.getConnection() != null) {
            try {
                channel.getConnection().close();
            } catch (Throwable t) {
            }
        }
    }

Produce messages

In the Java project that you created, create a producer program named Producer.java, configure the relevant parameters based on the description in SDK parameters, and then run the program.

Sample code:

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.AlreadyClosedException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmCallback;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.util.UUID;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeoutException;

public class Producer {
    // The endpoint of the ApsaraMQ for RabbitMQ instance. 
    public static final String hostName = "1880770****.mq-amqp.cn-hangzhou-a.aliyuncs.com";
    // The static username of the ApsaraMQ for RabbitMQ instance. 
    public static final String userName = "MjoxODgwNzcwODY5MD****";
    // The static password of the ApsaraMQ for RabbitMQ instance. 
    public static final String password = "NDAxREVDQzI2MjA0OT****";
    // The name of the vhost of the ApsaraMQ for RabbitMQ instance. 
    public static final String virtualHost = "vhost_test";

    // If you want to use port 5671, you must set the enableSSL parameter to true. 
    public static final int port = 5672;
    public static final boolean enableSSL = false;

    private Channel channel;
    private final ConcurrentNavigableMap<Long/*deliveryTag*/, String/*msgId*/> outstandingConfirms;
    private final ConnectionFactory factory;
    private final String exchangeName;
    private final String queueName;
    private final String routingKey;

    public Producer(ConnectionFactory factory, String exchangeName, String queueName, String routingKey) throws IOException, TimeoutException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
        this.factory = factory;
        this.outstandingConfirms = new ConcurrentSkipListMap<>();
        this.channel = factory.createChannel();
        this.exchangeName = exchangeName;
        this.queueName = queueName;
        this.routingKey = routingKey;
    }

    public static void main(String[] args) throws IOException, TimeoutException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
        // Create a connection factory. 
        ConnectionFactory factory = new ConnectionFactory(hostName, port, userName, password, virtualHost, enableSSL);

        // Initialize the producer. 
        Producer producer = new Producer(factory, "ExchangeTest", "QueueTest", "RoutingKeyTest");

        // Declare the producer. 
        producer.declare();

        producer.initChannel();

        // Send messages. 
        producer.doSend("hello,amqp");
    }

    private void initChannel() throws IOException {
        channel.confirmSelect();

        ConfirmCallback cleanOutstandingConfirms = (deliveryTag, multiple) -> {
            if (multiple) {
                ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(deliveryTag, true);

                for (Long tag : confirmed.keySet()) {
                    String msgId = confirmed.get(tag);
                    System.out.format("Message with msgId %s has been ack-ed. deliveryTag: %d, multiple: %b%n", msgId, tag, true);
                }

                confirmed.clear();
            } else {
                String msgId = outstandingConfirms.remove(deliveryTag);
                System.out.format("Message with msgId %s has been ack-ed. deliveryTag: %d, multiple: %b%n", msgId, deliveryTag, false);
            }
        };
        channel.addConfirmListener(cleanOutstandingConfirms, (deliveryTag, multiple) -> {
            String msgId = outstandingConfirms.get(deliveryTag);
            System.err.format("Message with msgId %s has been nack-ed. deliveryTag: %d, multiple: %b%n", msgId, deliveryTag, multiple);
            // send msg failed, re-publish
        });


        channel.addReturnListener(returnMessage -> System.out.println("return msgId=" + returnMessage.getProperties().getMessageId()));
    }

    private void declare() throws IOException {
        channel.exchangeDeclare(exchangeName, "direct", true);
        channel.queueDeclare(queueName, true, false, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);
    }
    

    private void doSend(String content) throws IOException, TimeoutException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
        try {
            String msgId = UUID.randomUUID().toString();
            AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(msgId).build();

            channel.basicPublish(exchangeName, routingKey, true, props, content.getBytes(StandardCharsets.UTF_8));

            outstandingConfirms.put(channel.getNextPublishSeqNo(), msgId);
        } catch (AlreadyClosedException e) {
            //need reconnect if channel is closed.
            String message = e.getMessage();

            System.out.println(message);

            if (channelClosedByServer(message)) {
                factory.closeCon(channel);
                channel = factory.createChannel();
                this.initChannel();
                doSend(content);
            } else {
                throw e;
            }
        }
    }

    private boolean channelClosedByServer(String errorMsg) {
        if (errorMsg != null
            && errorMsg.contains("channel.close")
            && errorMsg.contains("reply-code=541")
            && errorMsg.contains("reply-text=InternalError")) {
            return true;
        } else {
            return false;
        }
    }
}
Note

Throttling can be triggered for an ApsaraMQ for RabbitMQ instance based on the peak transactions per second (TPS) of the instance. For more information, see Best practices for instance throttling.

Subscribe to messages

In the Java project that you created, create a consumer program named Consumer.java, configure the relevant parameters based on the description in SDK parameters, and then run the program.

Sample code:


import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeoutException;

public class Consumer {
    // The endpoint of the ApsaraMQ for RabbitMQ instance. 
    public static final String hostName = "1880770****.mq-amqp.cn-hangzhou-a.aliyuncs.com";
    // The static username of the ApsaraMQ for RabbitMQ instance. 
    public static final String userName = "MjoxODgwNzcwODY5MD****";
    // The static password of the ApsaraMQ for RabbitMQ instance. 
    public static final String password = "NDAxREVDQzI2MjA0OT****";
    // The name of the vhost of the ApsaraMQ for RabbitMQ instance. 
    public static final String virtualHost = "vhost_test";
    
    // If you want to use port 5671, you must set the enableSSL parameter to true. 
    public static final int port = 5672;
    public static final boolean enableSSL = false;

    private final Channel channel;
    private final String queue;

    public Consumer(Channel channel, String queue) {
        this.channel = channel;
        this.queue = queue;
    }

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
        ConnectionFactory factory = new ConnectionFactory(hostName, port, userName, password, virtualHost, enableSSL);
        Channel channel = factory.createChannel();
        channel.basicQos(50);
         
        // The name of the queue on the ApsaraMQ for RabbitMQ instance. The queue name that you specify for this parameter must be consistent with the queue name that you specified when you created the producer. 
        Consumer consumer = new Consumer(channel, "queue-1");

        consumer.consume();
    }

    public void consume() throws IOException, InterruptedException {
        channel.basicConsume(queue, false, "yourConsumerTag", new DefaultConsumer(channel) {
            @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                byte[] body) throws IOException {

                // Process the message. 
                System.out.println("receive: msgId=" + properties.getMessageId());

                // The consumer must commit acknowledgment (ACK) within the validity period. Otherwise, the message is pushed again. The message can be pushed up to 16 times. 
                // If the message still fails to be pushed after 16 times, it is discarded or sent to the dead-letter exchange. 
                // The validity period is 1 minute for Professional Edition instances, 5 minutes for Enterprise and Serverless Edition instances, and 30 minutes for Enterprise Platinum Edition instances. 
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });

        CountDownLatch latch = new CountDownLatch(1);
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            try {
                channel.getConnection().close();
            } catch (IOException e) {
                System.out.println("close connection error." + e);
            }
            latch.countDown();
        }));
        latch.await();
    }
}

SDK parameters

Parameter

Example

Description

hostName

XXX.net.mq.amqp.aliyuncs.com

The endpoint of the ApsaraMQ for RabbitMQ instance. For information about how to obtain the endpoint, see Obtain the endpoint of an instance.

Port

5672

The default port. Use port 5672 for non-encrypted connections and port 5671 for encrypted connections.

userName

MjoxODgwNzcwODY5MD****

The static username that is used for permission authentication when you connect the client to the ApsaraMQ for RabbitMQ broker.

You must create the static username in the ApsaraMQ for RabbitMQ console in advance.

For more information, see Generate a pair of username and password.

passWord

NDAxREVDQzI2MjA0OT****

The static password that is used for permission authentication when you connect the client to the ApsaraMQ for RabbitMQ broker.

You must create the static password in the ApsaraMQ for RabbitMQ console in advance.

For more information, see Generate a pair of username and password.

virtualHost

amqp_vhost

The vhost that you created on the ApsaraMQ for RabbitMQ instance. You must create the vhost in the ApsaraMQ for RabbitMQ console in advance.

For more information, see Step 2: Create resources.

exchangeName

ExchangeTest

The exchange that you created on the ApsaraMQ for RabbitMQ instance.

You must create the exchange in the ApsaraMQ for RabbitMQ console in advance.

For more information, see Step 2: Create resources.

queueName

QueueTest

The queue that you created on the ApsaraMQ for RabbitMQ instance.

You must create the queue in the ApsaraMQ for RabbitMQ console in advance.

For more information, see Step 2: Create resources.

routingKey

RoutingKeyTest

The routing key that is used to bind the exchange to the queue in ApsaraMQ for RabbitMQ.

You must create the binding in the ApsaraMQ for RabbitMQ console in advance.

For more information, see Step 2: Create resources.

exchangeType

topic

The exchange type. ApsaraMQ for RabbitMQ supports the following types of exchanges. For more information, see Exchange.

  • direct

  • topic

  • fanout

  • headers

  • x-delayed-message

  • x-consistent-hash

Important

Make sure that the exchange type that you specify is the same as the exchange type that you selected when you created the exchange.

References