This topic describes how to use an AMQP-capable JMS client to access Alibaba Cloud IoT Platform and receive messages subscribed.

Development environment

In this example, JDK 1.7 or later is used.

Download the SDK

AMQP SDKs are open-source SDKs. We recommend that you use the Apache Qpid JMS client for the Java development environment. You can visit Qpid JMS 0.47.0 to download the client and read its instructions.

Add the Maven dependency

<! -- amqp 1.0 qpid client -->
 <dependency>
   <groupId>org.apache.qpid</groupId>
   <artifactId>qpid-jms-client</artifactId>
   <version>0.47.0</version>
 </dependency>
 <! -- util for base64-->
 <dependency>
   <groupId>commons-codec</groupId>
  <artifactId>commons-codec</artifactId>
  <version>1.10</version>
</dependency>

Sample code

For information about the parameters in the following demo, see AMQP client access instructions.

import java.net.URI;
import java.util.Hashtable;
import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.naming.Context;
import javax.naming.InitialContext;
import org.apache.commons.codec.binary.Base64;
import org.apache.qpid.jms.JmsConnection;
import org.apache.qpid.jms.JmsConnectionListener;
import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AmqpJavaClientDemo {

    private final static Logger logger = LoggerFactory.getLogger(AmqpJavaClientDemo.class);

    public static void main(String[] args) throws Exception {
        //For information about parameters, see AMQP client access instructions.
        String accessKey = "$ {YourAccessKeyID }";
        String accessSecret = "${YourAccessKeySecret}";
        String consumerGroupId = "${YourConsumerGroupId}";
        long timeStamp = System.currentTimeMillis();
        //Signature method: hmacmd5, hmacsha1, or hmacsha256.
        String signMethod = "hmacsha1";
        //The value of the clientId parameter is displayed as the client ID on the consumer group status page for service subscription in the console.
        //We recommend that you use a unique identifier, such as the UUID, MAC address, or IP address as the clientId value. This allows you to identify clients.
        String clientId = "$ {YourClientId }";

        //For information about how to obtain the value of UserName, see AMQP client access instructions.
        String userName = clientId + "| authMode = aksign"
            + ",signMethod=" + signMethod
            + ",timestamp=" + timeStamp
            + ",authId=" + accessKey
            + ",consumerGroupId=" + consumerGroupId
            + "|";
        //For information about how to obtain the value of password, see AMQP client access instructions.
        String signContent = "authId =" + accessKey + "& timestamp =" + timeStamp;
        String password = doSign(signContent,accessSecret, signMethod);
        //Construct the connection URL based on the rules described in Qpid JMS documentation.
        String connectionUrl = "failover:(amqps://${uid}.iot-amqp.${regionId}.aliyuncs.com:5671? amqp.idleTimeout=80000)"
            + "? failover.reconnectDelay=30";

        Hashtable<String, String> hashtable = new Hashtable<>();
        hashtable.put("connectionfactory.SBCF",connectionUrl);
        hashtable.put("queue.QUEUE", "default");
        hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
        Context context = new InitialContext(hashtable);
        ConnectionFactory cf = (ConnectionFactory)context.lookup("SBCF");
        Destination queue = (Destination)context.lookup("QUEUE");
        // Create Connection
        Connection connection = cf.createConnection(userName, password);
        ((JmsConnection) connection).addConnectionListener(myJmsConnectionListener);
        // Create Session
        // Session. Client_acknowledgment: After you receive the message, manually call message.acknowledge().
        // Session. Auto_acknowledgment: The SDK is automatically acknowledged. (recommended)
        Session session = connection. createSession (false, Session. Auto_acknowledgment );
        connection.start();
        // Create Receiver Link
        MessageConsumer consumer = session.createConsumer(queue);
        consumer.setMessageListener(messageListener);
    }

    private static MessageListener messageListener = new MessageListener() {
        @Override
        public void onMessage(Message message) {
            try {
                byte[] body = message.getBody(byte[].class);
                String content = new String(bytes);
                String topic = message.getStringProperty("topic");
                String messageId = message.getStringProperty("messageId");
                logger.info("receive message"
                    + ", topic = " + topic
                    + ", messageId = " + messageId
                    + ", content = " + content);
                //If you select Session.CLIENT_ACKNOWLEDGE when creating a session, manual acknowledgment is required.
                //message.acknowledge();
                //Ensure that no time-consuming logic exists during message processing. If processing received messages takes a long period of time, initiate asynchronous requests.
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    };

    private static JmsConnectionListener myJmsConnectionListener = new JmsConnectionListener() {
        /**
         * The connection is successfully established.
         */
        @Override
        public void onConnectionEstablished(URI remoteURI) {
            logger.info("onConnectionEstablished, remoteUri:{}", remoteURI);
        }

        /**
         * The connection fails after the maximum number of retries.
         */
        @Override
        public void onConnectionFailure(Throwable error) {
            logger.error("onConnectionFailure, {}", error.getMessage());
        }

        /**
         * The connection is interrupted.
         */
        @Override
        public void onConnectionInterrupted(URI remoteURI) {
            logger.info("onConnectionInterrupted, remoteUri:{}", remoteURI);
        }

        /**
         * The connection is interrupted and automatically restored.
         */
        @Override
        public void onConnectionRestored(URI remoteURI) {
            logger.info("onConnectionRestored, remoteUri:{}", remoteURI);
        }

        @Override
        public void onInboundMessage(JmsInboundMessageDispatch envelope) {}

        @Override
        public void onSessionClosed(Session session, Throwable cause) {}

        @Override
        public void onConsumerClosed(MessageConsumer consumer, Throwable cause) {}

        @Override
        public void onProducerClosed(MessageProducer producer, Throwable cause) {}
    };

    /**
     * To obtain the string to sign in the value of password, see AMQP client access instructions.
     */
    private static String doSign(String toSignString, String secret, String signMethod) throws Exception {
        SecretKeySpec signingKey = new SecretKeySpec(secret.getBytes(), signMethod);
        Mac mac = Mac.getInstance(signMethod);
        mac.init(signingKey);
        Byte [] rawHmac = mac. doFinal (toSignString. getBytes ());
        return Base64.encodeBase64String(rawHmac);
    }
}