This article describes how to connect an AMQP JMS client to Alibaba Cloud IoT Platform and receive messages from IoT Platform.

Development environment

JDK 1.7 or later is used in this example.

Download the SDK for Java

AMQP SDKs are open-source SDKs. If you use Java, we recommend that you use the Apache Qpid JMS client. To download the client and view the instructions, see Qpid JMS 0.47.0.

Add the Maven dependency

<! -- amqp 1.0 qpid client -->
 <dependency>
   <groupId>org.apache.qpid</groupId>
   <artifactId>qpid-jms-client</artifactId>
   <version>0.47.0</version>
 </dependency>
 <! -- util for base64-->
 <dependency>
   <groupId>commons-codec</groupId>
  <artifactId>commons-codec</artifactId>
  <version>1.10</version>
</dependency>

Sample code

For more information about the parameters in the following sample code, see Connect an AMQP client to IoT Platform.

import java.net.URI;
import java.util.Hashtable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.naming.Context;
import javax.naming.InitialContext;
import org.apache.commons.codec.binary.Base64;
import org.apache.qpid.jms.JmsConnection;
import org.apache.qpid.jms.JmsConnectionListener;
import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AmqpJavaClientDemo {

    private final static Logger logger = LoggerFactory.getLogger(AmqpJavaClientDemo.class);

    // The asynchronous thread pool for business processing. You can modify the thread pool parameters based on your business requirements. You can also use other asynchronous methods to process the received messages.
    private final static ExecutorService executorService = new ThreadPoolExecutor(
        Runtime.getRuntime().availableProcessors(),
        Runtime.getRuntime().availableProcessors() * 2, 60, TimeUnit.SECONDS,
        new LinkedBlockingQueue<>(50000));

    public static void main(String[] args) throws Exception {
        // For more information about the parameters, see Connect an AMQP client to IoT Platform.
        String accessKey = "${YourAccessKey}";
        String accessSecret = "${YourAccessSecret}";
        String consumerGroupId = "${YourConsumerGroupId}";
        // iotInstanceId: If you use a purchased instance, you must specify the instance ID. If you use a public instance, you can enter an empty string ("").
        String iotInstanceId = "${YourIotInstanceId}"; 
        long timeStamp = System.currentTimeMillis();
        // The signature algorithm. Valid values: hmacmd5, hmacsha1, and hmacsha256.
        String signMethod = "hmacsha1";
        // The value of the clientId parameter is displayed in the console as the client ID on the consumer group status page for service subscription.
        // We recommend that you use a unique identifier, such as the UUID, MAC address, or IP address as the clientId value. This allows you to identify clients.
        String clientId = "${YourClientId}";

        // The structure of the userName parameter. For more information, see Connect an AMQP client to IoT Platform.
        String userName = clientId + "|authMode=aksign"
            + ",signMethod=" + signMethod
            + ",timestamp=" + timeStamp
            + ",authId=" + accessKey
            + ",iotInstanceId=" + iotInstanceId
            + ",consumerGroupId=" + consumerGroupId
            + "|";
        // The structure of the signature and the password parameters. For more information, see Connect an AMQP client to IoT Platform.
        String signContent = "authId=" + accessKey + "&timestamp=" + timeStamp;
        String password = doSign(signContent,accessSecret, signMethod);
        // The endpoint. For more information, see Connect an AMQP client to IoT Platform.
        String connectionUrl = "failover:(amqps://${YourHost}:5671? amqp.idleTimeout=80000)"
            + "? failover.reconnectDelay=30";

        Hashtable<String, String> hashtable = new Hashtable<>();
        hashtable.put("connectionfactory.SBCF",connectionUrl);
        hashtable.put("queue.QUEUE", "default");
        hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
        Context context = new InitialContext(hashtable);
        ConnectionFactory cf = (ConnectionFactory)context.lookup("SBCF");
        Destination queue = (Destination)context.lookup("QUEUE");
        // Creates a connection.
        Connection connection = cf.createConnection(userName, password);
        ((JmsConnection) connection).addConnectionListener(myJmsConnectionListener);
        // Creates a session.
        // Session.CLIENT_ACKNOWLEDGE: After a message is received, you must call the message.acknowledge() method to send an ACK packet.
        // Session.AUTO_ACKNOWLEDGE: Recommended. After a message is received, the SDK sends an ACK packet.
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        connection.start();
        // Creates a receiver link.
        MessageConsumer consumer = session.createConsumer(queue);
        consumer.setMessageListener(messageListener);
    }

    private static MessageListener messageListener = new MessageListener() {
        @Override
        public void onMessage(Message message) {
            try {
                //1. Make sure that an ACK packet is sent after a message is received.
                // We recommend that you select Session.AUTO_ACKNOWLEDGE when you create a session. Then, an ACK packet is automatically sent.
                // You can also select Session.CLIENT_ACKNOWLEDGE when you create a session. Then, you must call the message.acknowledge() method to send an ACK packet.
                // message.acknowledge();
                //2. We recommend that you asynchronously process the received messages. Do not implement a time-consuming logic in the onMessage() method.
                // If a time-consuming logic is implemented in this method, the thread may be blocked. This may affect the callback of the SDK after a message is received.
                executorService.submit(() -> processMessage(message));
            } catch (Exception e) {
                logger.error("submit task occurs exception ", e);
            }
        }
    };

    /**
     * Implement the business logic after a message is received.
     */
    private static void processMessage(Message message) {
        try {
            byte[] body = message.getBody(byte[].class);
            String content = new String(body);
            String topic = message.getStringProperty("topic");
            String messageId = message.getStringProperty("messageId");
            logger.info("receive message"
                + ", topic = " + topic
                + ", messageId = " + messageId
                + ", content = " + content);
        } catch (Exception e) {
            logger.error("processMessage occurs error ", e);
        }
    }

    private static JmsConnectionListener myJmsConnectionListener = new JmsConnectionListener() {
        /**
         * The connection is established.
         */
        @Override
        public void onConnectionEstablished(URI remoteURI) {
            logger.info("onConnectionEstablished, remoteUri:{}", remoteURI);
        }

        /**
         * The connection fails after the maximum number of retries.
         */
        @Override
        public void onConnectionFailure(Throwable error) {
            logger.error("onConnectionFailure, {}", error.getMessage());
        }

        /**
         * The connection is interrupted.
         */
        @Override
        public void onConnectionInterrupted(URI remoteURI) {
            logger.info("onConnectionInterrupted, remoteUri:{}", remoteURI);
        }

        /**
         * The connection is interrupted and automatically restored.
         */
        @Override
        public void onConnectionRestored(URI remoteURI) {
            logger.info("onConnectionRestored, remoteUri:{}", remoteURI);
        }

        @Override
        public void onInboundMessage(JmsInboundMessageDispatch envelope) {}

        @Override
        public void onSessionClosed(Session session, Throwable cause) {}

        @Override
        public void onConsumerClosed(MessageConsumer consumer, Throwable cause) {}

        @Override
        public void onProducerClosed(MessageProducer producer, Throwable cause) {}
    };

    /**
     # The structure of the signature and password parameters. For more information, see Connect an AMQP client to IoT Platform.
     */
    private static String doSign(String toSignString, String secret, String signMethod) throws Exception {
        SecretKeySpec signingKey = new SecretKeySpec(secret.getBytes(), signMethod);
        Mac mac = Mac.getInstance(signMethod);
        mac.init(signingKey);
        byte[] rawHmac = mac.doFinal(toSignString.getBytes());
        return Base64.encodeBase64String(rawHmac);
    }
}