After a device is connected to IoT Platform, the device directly reports data to IoT Platform. Then, the data is forwarded to your server over AMQP. This topic describes how to configure the service subscription function..

Background information

Service subscription

Procedure

  1. Log on to the IoT Platform console.
  2. In the left-side navigation pane, choose Rules > Service Subscription.
  3. On the Subscriptions tab of the Service Subscription page, click Create Subscription.
  4. In the Create Subscription dialog box that appears, set the following parameters and click OK.
  5. Develop an AMQP client to receive messages.

    AMQP SDKs are open-source SDKs. We recommend that you use the Apache Qpid JMS client for the Java development environment. You can visit Qpid JMS 0.47.0 to download the client and read its instructions.

    Add the Maven dependency

    <! -- amqp 1.0 qpid client -->
     <dependency>
       <groupId>org.apache.qpid</groupId>
       <artifactId>qpid-jms-client</artifactId>
       <version>0.47.0</version>
     </dependency>
     <! -- util for base64-->
     <dependency>
       <groupId>commons-codec</groupId>
      <artifactId>commons-codec</artifactId>
      <version>1.10</version>
    </dependency>

    Sample code:

    For information about the parameters in the following demo, see AMQP client access instructions.

    import java.net.URI;
    import java.util.Hashtable;
    import javax.crypto.Mac;
    import javax.crypto.spec.SecretKeySpec;
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.Message;
    import javax.jms.MessageConsumer;
    import javax.jms.MessageListener;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    import javax.naming.Context;
    import javax.naming.InitialContext;
    import org.apache.commons.codec.binary.Base64;
    import org.apache.qpid.jms.JmsConnection;
    import org.apache.qpid.jms.JmsConnectionListener;
    import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    public class AmqpJavaClientDemo {
    
        private final static Logger logger = LoggerFactory.getLogger(AmqpJavaClientDemo.class);
    
        public static void main(String[] args) throws Exception {
            //For information about parameters, see AMQP client access instructions.
            String accessKey = "$ {YourAccessKeyID }";
            String accessSecret = "${YourAccessKeySecret}";
            String consumerGroupId = "${YourConsumerGroupId}";
            long timeStamp = System.currentTimeMillis();
            //Signature method: hmacmd5, hmacsha1, or hmacsha256.
            String signMethod = "hmacsha1";
            //The value of the clientId parameter is displayed as the client ID on the consumer group status page for service subscription in the console.
            //We recommend that you use a unique identifier, such as the UUID, MAC address, or IP address as the clientId value. This allows you to identify clients.
            String clientId = "$ {YourClientId }";
    
            //For information about how to obtain the value of UserName, see AMQP client access instructions.
            String userName = clientId + "| authMode = aksign"
                + ",signMethod=" + signMethod
                + ",timestamp=" + timeStamp
                + ",authId=" + accessKey
                + ",consumerGroupId=" + consumerGroupId
                + "|";
            //For information about how to obtain the value of password, see AMQP client access instructions.
            String signContent = "authId =" + accessKey + "& timestamp =" + timeStamp;
            String password = doSign(signContent,accessSecret, signMethod);
            //Construct the connection URL based on the rules described in Qpid JMS documentation.
            String connectionUrl = "failover:(amqps://${uid}.iot-amqp.${regionId}.aliyuncs.com:5671? amqp.idleTimeout=80000)"
                + "? failover.reconnectDelay=30";
    
            Hashtable<String, String> hashtable = new Hashtable<>();
            hashtable.put("connectionfactory.SBCF",connectionUrl);
            hashtable.put("queue.QUEUE", "default");
            hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
            Context context = new InitialContext(hashtable);
            ConnectionFactory cf = (ConnectionFactory)context.lookup("SBCF");
            Destination queue = (Destination)context.lookup("QUEUE");
            // Create Connection
            Connection connection = cf.createConnection(userName, password);
            ((JmsConnection) connection).addConnectionListener(myJmsConnectionListener);
            // Create Session
            // Session. Client_acknowledgment: After you receive the message, manually call message.acknowledge().
            // Session. Auto_acknowledgment: The SDK is automatically acknowledged. (recommended)
            Session session = connection. createSession (false, Session. Auto_acknowledgment );
            connection.start();
            // Create Receiver Link
            MessageConsumer consumer = session.createConsumer(queue);
            consumer.setMessageListener(messageListener);
        }
    
        private static MessageListener messageListener = new MessageListener() {
            @Override
            public void onMessage(Message message) {
                try {
                    byte[] body = message.getBody(byte[].class);
                    String content = new String(bytes);
                    String topic = message.getStringProperty("topic");
                    String messageId = message.getStringProperty("messageId");
                    logger.info("receive message"
                        + ", topic = " + topic
                        + ", messageId = " + messageId
                        + ", content = " + content);
                    //If you select Session.CLIENT_ACKNOWLEDGE when creating a session, manual acknowledgment is required.
                    //message.acknowledge();
                    //Ensure that no time-consuming logic exists during message processing. If processing received messages takes a long period of time, initiate asynchronous requests.
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        };
    
        private static JmsConnectionListener myJmsConnectionListener = new JmsConnectionListener() {
            /**
             * The connection is successfully established.
             */
            @Override
            public void onConnectionEstablished(URI remoteURI) {
                logger.info("onConnectionEstablished, remoteUri:{}", remoteURI);
            }
    
            /**
             * The connection fails after the maximum number of retries.
             */
            @Override
            public void onConnectionFailure(Throwable error) {
                logger.error("onConnectionFailure, {}", error.getMessage());
            }
    
            /**
             * The connection is interrupted.
             */
            @Override
            public void onConnectionInterrupted(URI remoteURI) {
                logger.info("onConnectionInterrupted, remoteUri:{}", remoteURI);
            }
    
            /**
             * The connection is interrupted and automatically restored.
             */
            @Override
            public void onConnectionRestored(URI remoteURI) {
                logger.info("onConnectionRestored, remoteUri:{}", remoteURI);
            }
    
            @Override
            public void onInboundMessage(JmsInboundMessageDispatch envelope) {}
    
            @Override
            public void onSessionClosed(Session session, Throwable cause) {}
    
            @Override
            public void onConsumerClosed(MessageConsumer consumer, Throwable cause) {}
    
            @Override
            public void onProducerClosed(MessageProducer producer, Throwable cause) {}
        };
    
        /**
         * To obtain the string to sign in the value of password, see AMQP client access instructions.
         */
        private static String doSign(String toSignString, String secret, String signMethod) throws Exception {
            SecretKeySpec signingKey = new SecretKeySpec(secret.getBytes(), signMethod);
            Mac mac = Mac.getInstance(signMethod);
            mac.init(signingKey);
            Byte [] rawHmac = mac. doFinal (toSignString. getBytes ());
            return Base64.encodeBase64String(rawHmac);
        }
    }