You can define custom topic categories in IoT Platform. Then, a device can send messages to a custom topic, and your server can receive the messages using an AMQP SDK. Your server can also call the API operation Pub to send commands to the device.

Scenario

In this example, an electronic thermometer periodically exchanges data with a server. The thermometer sends the current temperature to the server, and the server sends the precision setting command to the thermometer.

Prepare the development environment

In this example, both the devices and the server use Java SDKs, so you need to prepare the Java development environment first. You can download Java tools at Java official website and install the Java development environment.

Add the following Maven dependencies to import the device SDK (Link Kit Java SDK) and IoT SDK:

<dependencies>
 <dependency>
     <groupId>com.aliyun.alink.linksdk</groupId>
     <artifactId>iot-linkkit-java</artifactId>
     <version>1.2.0.1</version>
     <scope>compile</scope>
 </dependency>
 < dependency>
      <groupId>com.aliyun</groupId>
      <artifactId>aliyun-java-sdk-core</artifactId>
      <version>3.7.1</version>
  </dependency>
  <dependency>
      <groupId>com.aliyun</groupId>
      <artifactId>aliyun-java-sdk-iot</artifactId>
      <version>6.9.0</version>
  </dependency>
  <dependency>
    <groupId>com.aliyun.openservices</groupId>
    <artifactId>iot-client-message</artifactId>
    <version>1.1.2</version>
</dependency>
</dependencies>

Create a product and a device

First, you need to create a product, define custom topic categories, define the TSL model, configure service subscription, and create a device in the IoT Platform console.

  1. Log on to the IoT Platform console.
  2. In the left-side navigation pane, choose Devices > Products.
  3. Click Create Product to create a thermometer product.

    For more information, see Create a product.

  4. After the product is created, find the product and click View.
  5. On the Topic Categories tab of the Product Details page, add custom topic categories.

    For more information, see Create a topic category.

    In this example, add the following two topic categories:

    • /${productKey}/${deviceName}/user/devmsg: used by devices to publish messages. Set Device Operation Authorizations to Publish for this topic category.
    • /${productKey}/${deviceName}/user/cloudmsg: used by devices to receive subscribed messages. Set Device Operation Authorizations to Subscribe for this topic category.
  6. On the Service Subscription page, set the type of messages to be pushed to the AMQP SDK to Device Upstream Notification.
  7. In the left-side navigation pane, choose Devices and add a thermometer device under the thermometer product that has been created. For more information, see Create a device.

The server receives messages from the device

The following figure shows how the device sends a message to the server.

  • Configure the AMQP SDK, which will be installed in the server, to receive messages from IoT Platform.
    • Add the following dependency to install Qpid JMS 0.47.0.
      <!-- amqp 1.0 qpid client -->
       <dependency>
         <groupId>org.apache.qpid</groupId>
         <artifactId>qpid-jms-client</artifactId>
         <version>0.47.0</version>
       </dependency>
       <!-- util for base64-->
       <dependency>
         <groupId>commons-codec</groupId>
        <artifactId>commons-codec</artifactId>
        <version>1.10</version>
      </dependency>
    • Connect the AMQP SDK to IoT Platform.

      For information about the parameters in the following demo, see AMQP client access instructions.

      import java.net.URI;
      import java.util.Hashtable;
      import javax.crypto.Mac;
      import javax.crypto.spec.SecretKeySpec;
      import javax.jms.Connection;
      import javax.jms.ConnectionFactory;
      import javax.jms.Destination;
      import javax.jms.Message;
      import javax.jms.MessageConsumer;
      import javax.jms.MessageListener;
      import javax.jms.MessageProducer;
      import javax.jms.Session;
      import javax.naming.Context;
      import javax.naming.InitialContext;
      import org.apache.commons.codec.binary.Base64;
      import org.apache.qpid.jms.JmsConnection;
      import org.apache.qpid.jms.JmsConnectionListener;
      import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
      import org.slf4j.Logger;
      import org.slf4j.LoggerFactory;
      
      public class AmqpJavaClientDemo {
      
          private final static Logger logger = LoggerFactory.getLogger(AmqpJavaClientDemo.class);
      
          public static void main(String[] args) throws Exception {
              //For information about parameters, see AMQP client access instructions.
              String accessKey = "$ {YourAccessKeyID }";
              String accessSecret = "${YourAccessKeySecret}";
              String consumerGroupId = "${YourConsumerGroupId}";
              long timeStamp = System.currentTimeMillis();
              //Signature method: hmacmd5, hmacsha1, or hmacsha256.
              String signMethod = "hmacsha1";
              //The value of the clientId parameter is displayed as the client ID on the consumer group status page for service subscription in the console.
              //We recommend that you use a unique identifier, such as the UUID, MAC address, or IP address as the clientId value. This allows you to identify clients.
              String clientId = "$ {YourClientId }";
      
              //For information about how to obtain the value of UserName, see AMQP client access instructions.
              String userName = clientId + "| authMode = aksign"
                  + ",signMethod=" + signMethod
                  + ",timestamp=" + timeStamp
                  + ",authId=" + accessKey
                  + ",consumerGroupId=" + consumerGroupId
                  + "|";
              //For information about how to obtain the value of password, see AMQP client access instructions.
              String signContent = "authId =" + accessKey + "& timestamp =" + timeStamp;
              String password = doSign(signContent,accessSecret, signMethod);
              //Construct the connection URL based on the rules described in Qpid JMS documentation.
              String connectionUrl = "failover:(amqps://${uid}.iot-amqp.${regionId}.aliyuncs.com:5671? amqp.idleTimeout=80000)"
                  + "? failover.reconnectDelay=30";
      
              Hashtable<String, String> hashtable = new Hashtable<>();
              hashtable.put("connectionfactory.SBCF",connectionUrl);
              hashtable.put("queue.QUEUE", "default");
              hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
              Context context = new InitialContext(hashtable);
              ConnectionFactory cf = (ConnectionFactory)context.lookup("SBCF");
              Destination queue = (Destination)context.lookup("QUEUE");
              // Create Connection
              Connection connection = cf.createConnection(userName, password);
              ((JmsConnection) connection).addConnectionListener(myJmsConnectionListener);
              // Create Session
              // Session. Client_acknowledgment: After you receive the message, manually call message.acknowledge().
              // Session. Auto_acknowledgment: The SDK is automatically acknowledged. (recommended)
              Session session = connection. createSession (false, Session. Auto_acknowledgment );
              connection.start();
              // Create Receiver Link
              MessageConsumer consumer = session.createConsumer(queue);
              consumer.setMessageListener(messageListener);
          }
      
          private static MessageListener messageListener = new MessageListener() {
              @Override
              public void onMessage(Message message) {
                  try {
                      byte[] body = message.getBody(byte[].class);
                      String content = new String(bytes);
                      String topic = message.getStringProperty("topic");
                      String messageId = message.getStringProperty("messageId");
                      logger.info("receive message"
                          + ", topic = " + topic
                          + ", messageId = " + messageId
                          + ", content = " + content);
                      //If you select Session.CLIENT_ACKNOWLEDGE when creating a session, manual acknowledgment is required.
                      //message.acknowledge();
                      //Ensure that no time-consuming logic exists during message processing. If processing received messages takes a long period of time, initiate asynchronous requests.
                  } catch (Exception e) {
                      e.printStackTrace();
                  }
              }
          };
      
          private static JmsConnectionListener myJmsConnectionListener = new JmsConnectionListener() {
              /**
               * The connection is successfully established.
               */
              @Override
              public void onConnectionEstablished(URI remoteURI) {
                  logger.info("onConnectionEstablished, remoteUri:{}", remoteURI);
              }
      
              /**
               * The connection fails after the maximum number of retries.
               */
              @Override
              public void onConnectionFailure(Throwable error) {
                  logger.error("onConnectionFailure, {}", error.getMessage());
              }
      
              /**
               * The connection is interrupted.
               */
              @Override
              public void onConnectionInterrupted(URI remoteURI) {
                  logger.info("onConnectionInterrupted, remoteUri:{}", remoteURI);
              }
      
              /**
               * The connection is interrupted and automatically restored.
               */
              @Override
              public void onConnectionRestored(URI remoteURI) {
                  logger.info("onConnectionRestored, remoteUri:{}", remoteURI);
              }
      
              @Override
              public void onInboundMessage(JmsInboundMessageDispatch envelope) {}
      
              @Override
              public void onSessionClosed(Session session, Throwable cause) {}
      
              @Override
              public void onConsumerClosed(MessageConsumer consumer, Throwable cause) {}
      
              @Override
              public void onProducerClosed(MessageProducer producer, Throwable cause) {}
          };
      
          /**
           * To obtain the string to sign in the value of password, see AMQP client access instructions.
           */
          private static String doSign(String toSignString, String secret, String signMethod) throws Exception {
              SecretKeySpec signingKey = new SecretKeySpec(secret.getBytes(), signMethod);
              Mac mac = Mac.getInstance(signMethod);
              mac.init(signingKey);
              Byte [] rawHmac = mac. doFinal (toSignString. getBytes ());
              return Base64.encodeBase64String(rawHmac);
          }
      }
  • Configure the device SDK to send a message.
    • Configure device authentication information.
      final String productKey = "XXXXXX";
      final String deviceName = "XXXXXX";
      final String deviceSecret = "XXXXXXXXX";
      final String region = "XXXXXX";
    • Set connection initialization parameters, including MQTT connection information, device information, and initial device status.
      LinkKitInitParams params = new LinkKitInitParams();
      // Configure MQTT connection information. Link Kit uses MQTT as the underlying protocol. 
      IoTMqttClientConfig config = new IoTMqttClientConfig();
      config.productKey = productKey;
      config.deviceName = deviceName;
      config.deviceSecret = deviceSecret;
      config.channelHost = productKey + ".iot-as-mqtt." + region + ".aliyuncs.com:1883";
      // Configure device information.
      DeviceInfo deviceInfo = new DeviceInfo();
      deviceInfo.productKey = productKey;
      deviceInfo.deviceName = deviceName;
      deviceInfo.deviceSecret = deviceSecret;
      // Register the initial device status.
      Map<String, ValueWrapper> propertyValues = new HashMap<String, ValueWrapper>();
      
      params.mqttClientConfig = config;
      params.deviceInfo = deviceInfo;
      params.propertyValues = propertyValues;
    • Initialize the connection.
      // Initialize the connection and set the callback that is called when the initialization is successful.
      LinkKit.getInstance().init(params, new ILinkKitConnectListener() {
           @Override
           public void onError(AError aError) {
               System.out.println("Init error:" + aError);
           }
      
           // Set the callback that is called when the initialization is successful.
           @Override
           public void onInitDone(InitResult initResult) {
               System.out.println("Init done:" + initResult);
           }
       });
    • Send a message from the device.

      After connecting to IoT Platform, the device sends a message to the specified topic. Replace the content of the onInitDone callback like the following example:

      @Override
       public void onInitDone(InitResult initResult) {
           // Set the topic to which the message is published and the message content.
           MqttPublishRequest request = new MqttPublishRequest();
           request.topic = "/" + productKey + "/" + deviceName + "/user/devmsg";
           request.qos = 0;
           request.payloadObj = "{\"temperature\":35.0, \"time\":\"sometime\"}";
           // Publish the message and set the callback that is called when the message is published.
           LinkKit.getInstance().publish(request, new IConnectSendListener() {
               @Override
               public void onResponse(ARequest aRequest, AResponse aResponse) {
                   System.out.println("onResponse:" + aResponse.getData());
               }
      
               @Override
               public void onFailure(ARequest aRequest, AError aError) {
                   System.out.println("onFailure:" + aError.getCode() + aError.getMsg());
               }
           });
       }

      The server receives the following message:

      Message
      {payload={"temperature":35.0, "time":"sometime"},
      topic='/a1uzcH0****/device1/user/devmsg',
      messageId='1131755639450642944',
      qos=0,
      generateTime=1558666546105}

The server sends messages to the device

The following figure shows how the server sends a message to the device.

  • Configure the device SDK to subscribe to a topic.

    For more information about how to configure device authentication information, set connection initialization parameters, and initialize the connection, see the device SDK configuration in the previous section.

    The device needs to subscribe to a specific topic to receive messages sent by the server.

    The following example demonstrates how to configure the device SDK to subscribe to a topic:

    // Set the callback that is called when the initialization is successful.
    @Override
    public void onInitDone(InitResult initResult) {
        // Set the topic to which the device subscribes.
        MqttSubscribeRequest request = new MqttSubscribeRequest();
        request.topic = "/" + productKey + "/" + deviceName + "/user/cloudmsg";
        request.isSubscribe = true;
        // Send a subscription request and set the callbacks that are called when the subscription succeeds and fails, respectively.
        LinkKit.getInstance().subscribe(request, new IConnectSubscribeListener() {
            @Override
            public void onSuccess() {
                System.out.println("");
            }
    
            @Override
            public void onFailure(AError aError) {
    
            }
        });
    
        // Set the listener for listening to subscribed messages.
        IConnectNotifyListener notifyListener = new IConnectNotifyListener() {
            // Define the callback that is called when a subscribed message is received.
            @Override
            public void onNotify(String connectId, String topic, AMessage aMessage) {
                System.out.println(
                    "received message from " + topic + ":" + new String((byte[])aMessage.getData()));
            }
    
            @Override
            public boolean shouldHandle(String s, String s1) {
                return false;
            }
    
            @Override
            public void onConnectStateChange(String s, ConnectState connectState) {
    
            }
        };
        LinkKit.getInstance().registerOnNotifyListener(notifyListener);
    }
  • Configure the IoT SDK to call the Pub operation to publish a message.
    • Configure identity verification information.
       String regionId = "The ID of the region where the device is located";
       String accessKey = "The AccessKey ID of your Alibaba Cloud account";
       String accessSecret = "The AccessKey Secret of your Alibaba Cloud account";
       final String productKey = "The key of the product";
    • Set connection parameters.
      // Construct a client.
      DefaultProfile profile = DefaultProfile.getProfile(regionId, accessKey, accessSecret);
      IAcsClient client = new DefaultAcsClient(profile);
    • Set the parameters for publishing a message.
      PubRequest request = new PubRequest();
      request.setQos(0);
      // Set the topic to which the message is published.
      request.setTopicFullName("/" + productKey + "/" + deviceName + "/user/cloudmsg");
      request.setProductKey(productKey);
      // Set the message content. The message content must be encoded in Base64. Otherwise, the message content will be garbled characters.
      request.setMessageContent(Base64.encode("{\"accuracy\":0.001,\"time\":now}"));
    • Publish the message.
      try {
           PubResponse response = client.getAcsResponse(request);
           System.out.println("pub success?:" + response.getSuccess());
       } catch (Exception e) {
           System.out.println(e);
       }
      The device receives the following message:
      msg = [{"accuracy":0.001,"time":now}]

Appendix: demo

Click here to download and view the complete demo for this example.

For more information about Service Subscription, see:

Configure AMQP service subscription in the console

AMQP client access instructions

Java SDK access example

Node.js SDK access example

.NET SDK access example

Python SDK access example