This article describes how to connect an AMQP JMS client to Alibaba Cloud IoT Platform and receive messages from IoT Platform.

Development environment

JDK 1.7 or later is used in this example.

Download the SDK

AMQP SDKs are open-source SDKs. If you use the Java programming language, we recommend that you use the Apache Qpid JMS client. To download the client and view its instructions, visit Qpid JMS 0.47.0.

Add Maven dependency

<! -- amqp 1.0 qpid client -->
 <dependency>
   <groupId>org.apache.qpid</groupId>
   <artifactId>qpid-jms-client</artifactId>
   <version>0.47.0</version>
 </dependency>
 <! -- util for base64-->
 <dependency>
   <groupId>commons-codec</groupId>
  <artifactId>commons-codec</artifactId>
  <version>1.10</version>
</dependency>

Sample code

For more information about the parameters in the following sample code, see Connect an AMQP client to IoT Platform.

import java.net.URI;
import java.util.Hashtable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.naming.Context;
import javax.naming.InitialContext;
import org.apache.commons.codec.binary.Base64;
import org.apache.qpid.jms.JmsConnection;
import org.apache.qpid.jms.JmsConnectionListener;
import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AmqpJavaClientDemo {

    private final static Logger logger = LoggerFactory.getLogger(AmqpJavaClientDemo.class);

    // Asynchronous thread pool for business processing. You can modify the thread pool parameters based on your business requirements. You can also use other asynchronous methods to process the received messages.
    private final static ExecutorService executorService = new ThreadPoolExecutor(
        Runtime.getRuntime().availableProcessors(),
        Runtime.getRuntime().availableProcessors() * 2, 60, TimeUnit.SECONDS,
        new LinkedBlockingQueue<>(50000));

    public static void main(String[] args) throws Exception {
        // For more information about the parameters, see the "Connect an AMQP client to IoT Platform" topic.
        String accessKey = "${YourAccessKey}";
        String accessSecret = "${YourAccessSecret}";
        String consumerGroupId = "${YourConsumerGroupId}";
        // iotInstanceId: If you are using a purchased instance, you must specify the instance ID. If you are using a public instance, you can enter an empty string "".
        String iotInstanceId = "${iotInstanceId}"; 
        long timeStamp = System.currentTimeMillis();
        // Signature method: hmacmd5, hmacsha1, or hmacsha256.
        String signMethod = "hmacsha1";
        // The value of the clientId parameter is displayed in the Client ID column on the Consumer Group Status tab of an AMQP consumer group in the console.
        // We recommend that you set clientId to a unique identifier, such as the UUID, MAC address, or IP address. 
        String clientId = "${YourClientId}";

        // For more information about how to configure the UserName, see the "Connect an AMQP client to IoT Platform" topic.
        String userName = clientId + "|authMode=aksign"
            + ",signMethod=" + signMethod
            + ",timestamp=" + timeStamp
            + ",authId=" + accessKey
            + ",iotInstanceId=" + iotInstanceId
            + ",consumerGroupId=" + consumerGroupId
            + "|";
        // For more information about how to configure the password, see the "Connect an AMQP client to IoT Platform" topic.
        String signContent = "authId=" + accessKey + "&timestamp=" + timeStamp;
        String password = doSign(signContent,accessSecret, signMethod);
        // Build the connection URL as required by Qpid JMS.
        String connectionUrl = "failover:(amqps://${uid}.iot-amqp.${regionId}.aliyuncs.com:5671? amqp.idleTimeout=80000)"
            + "? failover.reconnectDelay=30";

        Hashtable<String, String> hashtable = new Hashtable<>();
        hashtable.put("connectionfactory.SBCF",connectionUrl);
        hashtable.put("queue.QUEUE", "default");
        hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
        Context context = new InitialContext(hashtable);
        ConnectionFactory cf = (ConnectionFactory)context.lookup("SBCF");
        Destination queue = (Destination)context.lookup("QUEUE");
        // Create Connection
        Connection connection = cf.createConnection(userName, password);
        ((JmsConnection) connection).addConnectionListener(myJmsConnectionListener);
        // Create Session
        // Session.CLIENT_ACKNOWLEDGE: After a message is received, manually call the message.acknowledge() method.
        // Session.AUTO_ACKNOWLEDGE: The SDK automatically sends an ACK packet (recommended).
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        connection.start();
        // Create Receiver Link
        MessageConsumer consumer = session.createConsumer(queue);
        consumer.setMessageListener(messageListener);
    }

    private static MessageListener messageListener = new MessageListener() {
        @Override
        public void onMessage(Message message) {
            try {
                //1. Ensure that an ACK packet is sent after a message is received.
                // We recommend that you select Session.AUTO_ACKNOWLEDGE when you create a session. Then an ACK packet is automatically sent.
                // You can also select Session.CLIENT_ACKNOWLEDGE when you create a session. Then, you must call the message.acknowledge() method to send an ACK packet.
                // message.acknowledge();
                //2. We recommend that you process received messages asynchronously. Do not implement a time-consuming logic in the onMessage() method.
                // If a time-consuming logic is implemented in this method, the thread may be blocked. This may affect the callback of the SDK after a message is received.
                executorService.submit(() -> processMessage(message));
            } catch (Exception e) {
                logger.error("submit task occurs exception ", e);
            }
        }
    };

    /**
     * Implement the business logic after messages are received.
     */
    private static void processMessage(Message message) {
        try {
            byte[] body = message.getBody(byte[].class);
            String content = new String(body);
            String topic = message.getStringProperty("topic");
            String messageId = message.getStringProperty("messageId");
            logger.info("receive message"
                + ", topic = " + topic
                + ", messageId = " + messageId
                + ", content = " + content);
        } catch (Exception e) {
            logger.error("processMessage occurs error ", e);
        }
    }

    private static JmsConnectionListener myJmsConnectionListener = new JmsConnectionListener() {
        /**
         * A connection is established.
         */
        @Override
        public void onConnectionEstablished(URI remoteURI) {
            logger.info("onConnectionEstablished, remoteUri:{}", remoteURI);
        }

        /**
         * The connection fails after the retry attempts reach the maximum limit.
         */
        @Override
        public void onConnectionFailure(Throwable error) {
            logger.error("onConnectionFailure, {}", error.getMessage());
        }

        /**
         * The connection is interrupted.
         */
        @Override
        public void onConnectionInterrupted(URI remoteURI) {
            logger.info("onConnectionInterrupted, remoteUri:{}", remoteURI);
        }

        /**
         * The connection is interrupted and then automatically restored.
         */
        @Override
        public void onConnectionRestored(URI remoteURI) {
            logger.info("onConnectionRestored, remoteUri:{}", remoteURI);
        }

        @Override
        public void onInboundMessage(JmsInboundMessageDispatch envelope) {}

        @Override
        public void onSessionClosed(Session session, Throwable cause) {}

        @Override
        public void onConsumerClosed(MessageConsumer consumer, Throwable cause) {}

        @Override
        public void onProducerClosed(MessageProducer producer, Throwable cause) {}
    };

    /**
     * For more information about the signature algorithm of a password, see the "Connect an AMQP client to IoT Platform" topic.
     */
    private static String doSign(String toSignString, String secret, String signMethod) throws Exception {
        SecretKeySpec signingKey = new SecretKeySpec(secret.getBytes(), signMethod);
        Mac mac = Mac.getInstance(signMethod);
        mac.init(signingKey);
        byte[] rawHmac = mac.doFinal(toSignString.getBytes());
        return Base64.encodeBase64String(rawHmac);
    }
}