After a device is connected to IoT Platform, the device submits data to IoT Platform. Then, the data can be forwarded to your server by using the Advanced Message Queuing Protocol (AMQP). This article describes how to configure an AMQP server-side subscription. Your server can receive data from the street lamp by using an AMQP client.

Background information

The following figure shows the procedure of obtaining device messages by configuring an AMQP server-side subscription.AMQP

Procedure

  1. Log on to the IoT Platform console.
  2. Configure a consumer group to consume messages. Your server can obtain the messages by listening to the consumer group.
    1. In the left-side navigation pane, choose Rules > Server-side Subscription. Then, click the Consumer Groups tab.
    2. Click Create Consumer Group.
    3. In the Create Consumer Group dialog box, set the consumer group name to StreetLampConsumerGroup and click OK.
  3. Configure the server-side subscription for the product to which the street light device belongs. This allows your server to subscribe to various types of messages under the product.
    1. In the left-side navigation pane, choose Rules > Server-side Subscription.
    2. On the Subscriptions tab of the Server-side Subscription page, click Create Subscription.
    3. In the Create Subscription dialog box, set the parameters and click OK.
      Power bank cabinet-Create Subscription
      Parameter Description
      Product Select StreetLamp.
      Subscription Type Select AMQP.
      Consumer Group Select StreetLampConsumerGroup that is created in the previous step.
      Message Type Select Device Upstream Notification.
  4. Connect the AMQP client with IoT Platform.
    In this example, an Apache Qpid JMS client is used to connect with IoT Platform. To download the client and view the instructions, see Qpid JMS 0.47.0.
    1. Add the following Maven dependency:
    2. Configure the JMS client to connect with IoT Platform and receive messages.
      import java.net.URI;
      import java.util.Hashtable;
      import java.util.concurrent.ExecutorService;
      import java.util.concurrent.LinkedBlockingQueue;
      import java.util.concurrent.ThreadPoolExecutor;
      import java.util.concurrent.TimeUnit;
      import javax.crypto.Mac;
      import javax.crypto.spec.SecretKeySpec;
      import javax.jms.Connection;
      import javax.jms.ConnectionFactory;
      import javax.jms.Destination;
      import javax.jms.Message;
      import javax.jms.MessageConsumer;
      import javax.jms.MessageListener;
      import javax.jms.MessageProducer;
      import javax.jms.Session;
      import javax.naming.Context;
      import javax.naming.InitialContext;
      import org.apache.commons.codec.binary.Base64;
      import org.apache.qpid.jms.JmsConnection;
      import org.apache.qpid.jms.JmsConnectionListener;
      import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
      import org.slf4j.Logger;
      import org.slf4j.LoggerFactory;
      
      public class AmqpJavaClientDemo {
      
          private final static Logger logger = LoggerFactory.getLogger(AmqpJavaClientDemo.class);
      
          // Asynchronous thread pool for business processing. You can modify the thread pool parameters based on your business requirements. You can also use other asynchronous methods to process the received messages.
          private final static ExecutorService executorService = new ThreadPoolExecutor(
              Runtime.getRuntime().availableProcessors(),
              Runtime.getRuntime().availableProcessors() * 2, 60, TimeUnit.SECONDS,
              new LinkedBlockingQueue<>(50000));
      
          public static void main(String[] args) throws Exception {
              // For more information about the parameters, see the "Connect an AMQP client to IoT Platform" topic.
              String accessKey = "${YourAccessKey}";
              String accessSecret = "${YourAccessSecret}";
              String consumerGroupId = "${YourConsumerGroupId}";
              // iotInstanceId: If you are using a purchased instance, you must specify the instance ID. If you are using a public instance, you can enter an empty string "".
              String iotInstanceId = "${YourIotInstanceId}"; 
              long timeStamp = System.currentTimeMillis();
              // Signature method: hmacmd5, hmacsha1, or hmacsha256.
              String signMethod = "hmacsha1";
              // The value of the clientId parameter is displayed in the Client ID column on the Consumer Group Status tab of an AMQP consumer group in the console.
              // We recommend that you set clientId to a unique identifier, such as the UUID, MAC address, or IP address. 
              String clientId = "${YourClientId}";
      
              // For more information about how to configure the userName, see the "Connect an AMQP client to IoT Platform" topic.
              String userName = clientId + "|authMode=aksign"
                  + ",signMethod=" + signMethod
                  + ",timestamp=" + timeStamp
                  + ",authId=" + accessKey
                  + ",iotInstanceId=" + iotInstanceId
                  + ",consumerGroupId=" + consumerGroupId
                  + "|";
              // For more information about how to configure the password, see the "Connect an AMQP client to IoT Platform" topic.
              String signContent = "authId=" + accessKey + "&timestamp=" + timeStamp;
              String password = doSign(signContent,accessSecret, signMethod);
              // For more information about how to configure the host(endpoint), see the "Connect an AMQP client to IoT Platform" topic.
              String connectionUrl = "failover:(amqps://${YourHost}:5671? amqp.idleTimeout=80000)"
                  + "? failover.reconnectDelay=30";
      
              Hashtable<String, String> hashtable = new Hashtable<>();
              hashtable.put("connectionfactory.SBCF",connectionUrl);
              hashtable.put("queue.QUEUE", "default");
              hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
              Context context = new InitialContext(hashtable);
              ConnectionFactory cf = (ConnectionFactory)context.lookup("SBCF");
              Destination queue = (Destination)context.lookup("QUEUE");
              // Create Connection
              Connection connection = cf.createConnection(userName, password);
              ((JmsConnection) connection).addConnectionListener(myJmsConnectionListener);
              // Create Session
              // Session.CLIENT_ACKNOWLEDGE: After a message is received, manually call the message.acknowledge() method.
              // Session.AUTO_ACKNOWLEDGE: The SDK automatically sends an ACK packet (recommended).
              Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
              connection.start();
              // Create Receiver Link
              MessageConsumer consumer = session.createConsumer(queue);
              consumer.setMessageListener(messageListener);
          }
      
          private static MessageListener messageListener = new MessageListener() {
              @Override
              public void onMessage(Message message) {
                  try {
                      //1. Ensure that an ACK packet is sent after a message is received.
                      // We recommend that you select Session.AUTO_ACKNOWLEDGE when you create a session. Then an ACK packet is automatically sent.
                      // You can also select Session.CLIENT_ACKNOWLEDGE when you create a session. Then, you must call the message.acknowledge() method to send an ACK packet.
                      // message.acknowledge();
                      //2. We recommend that you process received messages asynchronously. Do not implement a time-consuming logic in the onMessage() method.
                      // If a time-consuming logic is implemented in this method, the thread may be blocked. This may affect the callback of the SDK after a message is received.
                      executorService.submit(() -> processMessage(message));
                  } catch (Exception e) {
                      logger.error("submit task occurs exception ", e);
                  }
              }
          };
      
          /**
           * Implement the business logic after messages are received.
           */
          private static void processMessage(Message message) {
              try {
                  byte[] body = message.getBody(byte[].class);
                  String content = new String(body);
                  String topic = message.getStringProperty("topic");
                  String messageId = message.getStringProperty("messageId");
                  logger.info("receive message"
                      + ", topic = " + topic
                      + ", messageId = " + messageId
                      + ", content = " + content);
              } catch (Exception e) {
                  logger.error("processMessage occurs error ", e);
              }
          }
      
          private static JmsConnectionListener myJmsConnectionListener = new JmsConnectionListener() {
              /**
               * A connection is established.
               */
              @Override
              public void onConnectionEstablished(URI remoteURI) {
                  logger.info("onConnectionEstablished, remoteUri:{}", remoteURI);
              }
      
              /**
               * The connection fails after the retry attempts reach the maximum limit.
               */
              @Override
              public void onConnectionFailure(Throwable error) {
                  logger.error("onConnectionFailure, {}", error.getMessage());
              }
      
              /**
               * The connection is interrupted.
               */
              @Override
              public void onConnectionInterrupted(URI remoteURI) {
                  logger.info("onConnectionInterrupted, remoteUri:{}", remoteURI);
              }
      
              /**
               * The connection is interrupted and then automatically restored.
               */
              @Override
              public void onConnectionRestored(URI remoteURI) {
                  logger.info("onConnectionRestored, remoteUri:{}", remoteURI);
              }
      
              @Override
              public void onInboundMessage(JmsInboundMessageDispatch envelope) {}
      
              @Override
              public void onSessionClosed(Session session, Throwable cause) {}
      
              @Override
              public void onConsumerClosed(MessageConsumer consumer, Throwable cause) {}
      
              @Override
              public void onProducerClosed(MessageProducer producer, Throwable cause) {}
          };
      
          /**
           * For more information about the signature algorithm of a password, see the "Connect an AMQP client to IoT Platform" topic.
           */
          private static String doSign(String toSignString, String secret, String signMethod) throws Exception {
              SecretKeySpec signingKey = new SecretKeySpec(secret.getBytes(), signMethod);
              Mac mac = Mac.getInstance(signMethod);
              mac.init(signingKey);
              byte[] rawHmac = mac.doFinal(toSignString.getBytes());
              return Base64.encodeBase64String(rawHmac);
          }
      }

      You can specify the parameters in the preceding code based on the parameter descriptions in the following table.

      Parameter Example Description
      accessKey LTAI4GFGQvKuqHJhFa******
      accessSecret iMS8ZhCDdfJbCMeA005sieKe******
      consumerGroupId VWhGZ2QnP7kxWpeSSjt****** The ID of the consumer group that is created in Step 3. You can view the ID on the Public Instance page of the IoT Platform console. Choose Rules > Server-side Subscription, and then click the Consumer Groups tab. Find the consumer group and obtain the ID.
      iotInstanceId "" The ID of the instance. The public instance is used in this example. Specify iotInstanceId = "".
      clientId 12345 The client ID. You must use a unique identifier, such as the UUID, MAC address, or IP address of the client. The client ID must be 1 to 64 characters in length.

      After you specify this parameter, it will be displayed on the Consumer Group Status page of Server-side Subscription in the console.

      connectionUrl 198426864******.iot-amqp.cn-shanghai.aliyuncs.com The endpoint that the AMQP client uses to connect with IoT Platform. The public instance is used in this example. Format: ${uid}.iot-amqp.${regionId}.aliyuncs.com.

      Replace ${uid} with your Alibaba Cloud account ID. You can click your profile picture in the console to obtain your account ID. Replace ${regionId} with the ID of the region where your public instance resides. The China (Shanghai) region is used in this example. Therefore, you must specify the following value for the connectionUrl parameter:

      connectionUrl = "failover:(amqps://198426864*******.iot-amqp.cn-shanghai.aliyuncs.com:5671? amqp.idleTimeout=80000)"
      + "? failover.reconnectDelay=30"
    3. After you run the sample code, the following log data is returned. The data indicates that the AMQP client is connected to IoT Platform and can receive messages.
      [pool-1-thread-4] INFO AmqpJavaClientDemo - receive message, topic = /a10FL******/device1/thing/event/property/post, messageId = 1343530036824******, content = {"deviceType":"CustomCategory","iotId":"nTT9nDODtdWDfVp5******","requestId":"795","checkFailedData":{},"productKey":"a10FL******","gmtCreate":1609157497013,"deviceName":"device1","items":{"LightCurrent":{"value":1.5,"time":1609157497016}}}
For more information about how to configure a server-side subscription to receive device messages, see the following articles: