An online or offline event of the Message Queue for MQTT client triggers the Message Queue for MQTT broker to generate a notification message. You can export this message from Message Queue for MQTT to other Alibaba Cloud services. You can also use Message Queue for MQTT SDK for Java to implement messaging between the Message Queue for MQTT client and a backend application. In this example, notification messages are exported to Message Queue for Apache RocketMQ, which is the only Alibaba Cloud service that Message Queue for MQTT supports.

Prerequisites

  • The integrated development environment (IDE) is installed. For more information, see IDE. You can use IntelliJ IDEA or Eclipse. In the example, IntelliJ IDEA is used.
  • The Java Development Kit (JDK) is installed. For more information, see JDK.
  • A Message Queue for Apache RocketMQ instance, a topic, and a group ID are created in the Internet region. For more information, see Create resources.

Background information

You can configure a rule for client status notification to export data about online and offline event notifications of the Message Queue for MQTT client to other Alibaba Cloud services.

This topic describes how Message Queue for MQTT sends the notification messages about the online and offline event of the Message Queue for MQTT client to a backend application. In this example, Message Queue for MQTT SDK for Java that is designed for the Internet is used.

You can use third-party open source SDKs for multiple programming languages to send and subscribe to messages in this scenario. For more information, see Download the SDK.

quick_start_client_stats_notify

As shown in the preceding figure, your backend application and the Message Queue for MQTT client are developed in Java. When the Message Queue for MQTT client that you deploy on the Internet goes online or offline, the online or offline event triggers the Message Queue for MQTT broker to generate a notification message. You can configure a rule for client status notification to send the notification message to the backend application by using Message Queue for Apache RocketMQ. The Message Queue for MQTT client and broker implement messaging by using Message Queue for MQTT SDK for Java. The Message Queue for Apache RocketMQ client and broker implement messaging by using Message Queue for Apache RocketMQ SDK for Java.

For more information about the client status notification feature, see Export online and offline events of Message Queue for MQTT clients.

Note Messages cannot be sent between a Message Queue for Apache RocketMQ topic in one region and a Message Queue for MQTT topic in another region. Therefore, all resources that are involved in this topic must be created in the Internet region. For more information, see Region-specific topics.

Network access

Message Queue for MQTT provides the Public Endpoint and VPC Endpoint. We recommend that you use different types of endpoints based on scenarios:
  • In Internet of Things (IoT) and mobile Internet scenarios, we recommend that you use the Public Endpoint to connect your Message Queue for MQTT client to Message Queue for MQTT.
  • The VPC Endpoint is used only in specific special scenarios. If your applications are deployed on cloud servers, we recommend that you use a server-side messaging service, such as Message Queue for Apache RocketMQ.
Notice To connect a Message Queue for MQTT client to Message Queue for MQTT by using an endpoint, use the domain name instead of the IP address because the IP address can change at any time. The service team of Message Queue for MQTT is not liable for direct or indirect faults or losses that arise in the following scenarios:
  • Your Message Queue for MQTT client uses an IP address instead of a domain name to access the service. The original IP address becomes invalid after the service team of Message Queue for MQTT updates domain name resolution.
  • A firewall policy on IP addresses is set in the network where your Message Queue for MQTT client is running. New IP addresses are blocked due to the firewall policy after the product team of Message Queue for MQTT updates domain name resolution.
In the example, the public endpoint of Message Queue for MQTT is used. For more information about the comparison of scenarios and mappings of message attributes between Message Queue for MQTT and Message Queue for Apache RocketMQ, see the following two topics:

Procedure

The following figure shows the procedure of sending and subscribing to status notification messages of a Message Queue for MQTT client.

quick_start_client_status_notify_process

Step 1: Create a Message Queue for MQTT instance and obtain its endpoint

  1. Log on to the Message Queue for MQTT console.
  2. In the left-side navigation pane, click Instances.
  3. In the top navigation bar, select a region.
  4. On the Instances page, click Create Instance in the upper-left corner.
  5. In the panel that appears, select a billing method as needed.
    Message Queue for MQTT supports the Pay-as-you-go and Subscription billing methods. For more information about the two billing methods, see Overview.
    • Create a pay-as-you-go instance:
      1. Set Billing Method to Pay-as-you-go and click OK.
      2. In the panel that appears, set the parameters as needed and click Buy Now.
    • Create a subscription instance:
      1. Set Billing Method to Subscription and click OK.
      2. In the panel that appears, set the parameters as needed and click Buy Now.
      3. In the panel that appears, click Purchase.
    After you complete the purchase, refresh the Instances page in the Message Queue for MQTT console. The instance that you created appears in the instance list.
  6. On the Instances page, click the name of the instance that you purchased or click Details in the Actions column to go to the Instance Details page.
  7. On the Instance Details page, click the Endpoints tab. On this tab, you can view the endpoint information. In the example, the public endpoint is used.

Step 2: Create a parent topic

The MQTT protocol supports multi-level topics. You must create a parent topic in the Message Queue for MQTT console or by calling an API operation. You do not need to create a subtopic. For more information about topics, see Terms. In this example, a parent topic is created in the Message Queue for MQTT console.

  1. Log on to the Message Queue for MQTT console.
  2. In the left-side navigation pane, click Instances.
  3. In the top navigation bar, select a region.
  4. Find the instance for which you want to create a topic in the instance list. In the Actions column, choose More > Topics.
  5. In the upper-left corner of the Topics page, click Create Topic.
  6. In the Create Topic panel, set the Name and Description parameters for the topic and click OK in the lower-left corner.
    You can view the topic that you create on the Topics page.

Step 3: Create a group ID

For more information about group IDs, see Terms.

  1. Log on to the Message Queue for MQTT console.
  2. In the left-side navigation pane, click Instances.
  3. In the top navigation bar, select a region.
  4. Find the instance for which you want to create a group ID in the instance list. In the Actions column, choose More > Groups.
  5. In the upper-left corner of the Groups page, click Create Group.
  6. In the Create Group panel, set the Group ID parameter and click OK.
    You can view the created group ID on the Groups page.

Step 4: Create a rule for client status notification

The parameter values that you set in the rule must be the same as those that correspond to the resources you create.

  1. Log on to the Message Queue for MQTT console.
  2. In the left-side navigation pane, click Instances.
  3. In the top navigation bar, select a region.
  4. Find the instance for which you want to create a data outbound rule in the instance list. In the Actions column, choose More > Rules.
  5. In the upper-left corner of the Rules page, click Create Rule.
  6. On the Create Rule page, perform the following steps:
    1. In the Configure Basic Information step, set the following parameters and click Next.
      Parameter Example Description
      Rule ID 111111 The global unique identifier of a rule.
      • It can contain only letters, digits, hyphens (-), and underscores (_) and must contain at least one letter or digit.
      • It must be 3 to 64 characters in length. If the value contains more than 64 characters, it is automatically truncated.
      • It cannot be updated after the rule is created.
      Description migrate from rocketmq The description of the rule.
      Status Enable Specifies whether to enable the current rule. Valid values:
      • Enable
      • Disable
      Rule Type Online/Offline Notification The type of the rule. Valid values:
    2. In the Configure Rule Source step, specify the data source and click Next.
      Parameter Example Description
      Group ID GID_Client_Status The group ID of the devices from which the status event data is exported. For more information about group IDs, see Terms.
    3. In the Configure Rule Destination step, specify the destination to which the data is forwarded and click Create.
      Parameter Example Description
      Destination Service Type Message Queue for Apache RocketMQ The cloud service to which you want to forward the status event data of a Message Queue for MQTT client.
      Note Only Message Queue for Apache RocketMQ is supported.
      Message Queue for Apache RocketMQ Instance MQ_INST_13801563067*****_BbyOD2jQ The ID of the Message Queue for Apache RocketMQ instance to which the data is forwarded.
      Topic TopicB The Message Queue for Apache RocketMQ topic to which the data is forwarded. In this example, the notifications about the online or offline events of a Message Queue for MQTT client are forwarded to TopicB.
    You can view the client status notification rule that you create on Rules page.

Step 5: Send and subscribe to messages by using Message Queue for MQTT SDK for Java

  1. Download a third-party open source SDK for Java. Download address: Eclipse Paho Java Client.
  2. Download the demo of Alibaba Cloud Message Queue for MQTT SDK for Java for reference during code development. Download address: mqtt-java-demo.
  3. Decompress this demo project package to a specific folder.
  4. In IntelliJ IDEA, import the decompressed files to create a project and check whether the following dependencies are included in the pom.xml file.
    <dependencies>
            <dependency>
                <groupId>commons-codec</groupId>
                <artifactId>commons-codec</artifactId>
                <version>1.10</version>
            </dependency>
            <dependency>
                <groupId>org.eclipse.paho</groupId>
                <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
                <version>1.2.2</version>
            </dependency>
            <dependency>
                <groupId>org.apache.httpcomponents</groupId>
                <artifactId>httpclient</artifactId>
                <version>4.5.2</version>
            </dependency>
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.48</version>
            </dependency>
            <dependency>
                <groupId>com.aliyun.openservices</groupId>
                <artifactId>ons-client</artifactId>
                <version>1.8.5.Final</version>
            </dependency>
            <dependency>
                <groupId>com.aliyun</groupId>
                <artifactId>aliyun-java-sdk-onsmqtt</artifactId>
                <version>1.0.3</version>
            </dependency>
            <dependency>
                <groupId>com.aliyun</groupId>
                <artifactId>aliyun-java-sdk-core</artifactId>
                <version>4.5.0</version>
            </dependency>
    </dependencies>
    Note For more information about the ons-client version, see Release notes.
  5. In the MQTTClientStatusNoticeProcessDemo.java class, specify parameter values as instructed in comments. Most of these parameter values correspond to the Message Queue for MQTT resources that you create in Steps 1 to Step 3 and related resources that you create in Message Queue for Apache RocketMQ. Then, execute the main() function to send and subscribe to messages.
    The following code shows an example:
    package com.aliyun.openservices.lmq.example.demo;
    
    import com.alibaba.fastjson.JSON;
    import com.alibaba.fastjson.JSONObject;
    import com.aliyun.openservices.ons.api.Action;
    import com.aliyun.openservices.ons.api.ConsumeContext;
    import com.aliyun.openservices.ons.api.Consumer;
    import com.aliyun.openservices.ons.api.Message;
    import com.aliyun.openservices.ons.api.MessageListener;
    import com.aliyun.openservices.ons.api.ONSFactory;
    import com.aliyun.openservices.ons.api.PropertyKeyConst;
    import java.util.Map;
    import java.util.Properties;
    import java.util.Set;
    
    public class MQTTClientStatusNoticeProcessDemo {
        public static void main(String[] args) {
            /**
             * Initialize a Message Queue for Apache RocketMQ client as a receiver. In most actual scenarios, the code is deployed in a backend application.
             */
            Properties properties = new Properties();
            /**
             * The group ID that you create in the Message Queue for Apache RocketMQ console.
             */
            properties.setProperty(PropertyKeyConst.GROUP_ID, "GID_XXXX");
            /**
             // The AccessKey ID that you created in the Alibaba Cloud RAM console for identity verification.
             */
            properties.put(PropertyKeyConst.AccessKey, "XXXX");
             /**
             // The AccessKey secret that you created in the Alibaba Cloud RAM console for identity verification. The AccessKey secret is required only for signature authentication.
             */
            properties.put(PropertyKeyConst.SecretKey, "XXXX");
            /**
             * The endpoint of the Message Queue for Apache RocketMQ instance that is used in the TCP connection. You can obtain the endpoint on the Instance Details page in the Message Queue for Apache RocketMQ console.
             */
            properties.put(PropertyKeyConst.NAMESRV_ADDR, "XXXX");
            /**
             * Specify a topic that you use to receive online or offline events when you use the Message Queue for Apache RocketMQ client to receive status notification messages of Message Queue for MQTT clients.
             */
            final String parentTopic = "GID_XXXX_MQTT";
            /**
             * The client status data. In a production environment, we recommend that you use an external persistent storage system, such as a database or Redis system, to store status data to prevent the loss of status data upon application restart. In this example, a standalone memory version is used to store the status data.
             */
            MqttClientStatusStore mqttClientStatusStore = new MemoryHashMapStoreImpl();
            Consumer consumer = ONSFactory.createConsumer(properties);
            /**
             * The following code determines only whether the client is online. Therefore, you need only to pay attention to the connect and tcpclean events.
             */
            consumer.subscribe(parentTopic, "connect||tcpclean", new MqttClientStatusNoticeListener(mqttClientStatusStore));
            consumer.start();
            String clientId = "GID_XXXXX@@@XXXXX";
            while (true) {
                System.out.println("ClientStatus :" + checkClientOnline(clientId, mqttClientStatusStore));
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    
        /**
         * The logic for handling online and offline events.
         * In an actual deployment process, applications that consume status notification messages may be deployed on multiple servers. Therefore, the status data of clients can be maintained in external shared storage such as a database or Redis system.
         * Perform consumption idempotence processing on messages to avoid errors that may occur when a state machine repeatedly receives messages.
         */
        static class MqttClientStatusNoticeListener implements MessageListener {
            private MqttClientStatusStore mqttClientStatusStore;
    
            public MqttClientStatusNoticeListener(
                MqttClientStatusStore mqttClientStatusStore) {
                this.mqttClientStatusStore = mqttClientStatusStore;
            }
    
            @Override
            public Action consume(Message message, ConsumeContext context) {
                try {
                    JSONObject msgBody = JSON.parseObject(new String(message.getBody()));
                    System.out.println(msgBody);
                    String eventType = msgBody.getString("eventType");
                    String clientId = msgBody.getString("clientId");
                    String channelId = msgBody.getString("channelId");
                    ClientStatusEvent event = new ClientStatusEvent();
                    event.setChannelId(channelId);
                    event.setClientIp(msgBody.getString("clientIp"));
                    event.setEventType(eventType);
                    event.setTime(msgBody.getLong("time"));
                    /**
                     * Store new events first.
                     */
                    mqttClientStatusStore.addEvent(clientId, channelId, eventType, event);
                    /**
                     * Read the event list of the current connection.
                     */
                    Set<ClientStatusEvent> events = mqttClientStatusStore.getEvent(clientId, channelId);
                    if (events == null || events.isEmpty()) {
                        return Action.CommitMessage;
                    }
                    /**
                     * If all online and offline events in the list are received and the current connection is closed, the data of the connection can be cleared.
                     */
                    boolean findOnlineEvent = false;
                    boolean findOfflineEvent = false;
                    for (ClientStatusEvent clientStatusEvent : events) {
                        if (clientStatusEvent.isOnlineEvent()) {
                            findOnlineEvent = true;
                        } else {
                            findOfflineEvent = true;
                        }
                    }
                    if (findOnlineEvent && findOfflineEvent) {
                        mqttClientStatusStore.deleteEvent(clientId, channelId);
                    }
                    return Action.CommitMessage;
                } catch (Throwable e) {
                    e.printStackTrace();
                }
                return Action.ReconsumeLater;
            }
        }
    
        /**
         * Check whether an active TCP connection is present for a clientId based on the channel table.
         * 1.If the channel table is empty, the Message Queue for MQTT client is offline.
         * 2.If the channel table is not empty, check whether only online events are received in an connection. If yes, an active connection is present and the Message Queue for MQTT client is online.
         * If offline events are received in all connections, the Message Queue for MQTT client must be offline.
         *
         * @param clientId
         * @param mqttClientStatusStore
         * @return
         */
        public static boolean checkClientOnline(String clientId,
            MqttClientStatusStore mqttClientStatusStore) {
            Map<String, Set<ClientStatusEvent>> channelMap = mqttClientStatusStore.getEventsByClientId(clientId);
            if (channelMap == null) {
                return false;
            }
            for (Set<ClientStatusEvent> events : channelMap.values()) {
                boolean findOnlineEvent = false;
                boolean findOfflineEvent = false;
                for (ClientStatusEvent event : events) {
                    if (event.isOnlineEvent()) {
                        findOnlineEvent = true;
                    } else {
                        findOfflineEvent = true;
                    }
                }
                if (findOnlineEvent & !findOfflineEvent) {
                    return true;
                }
            }
            return false;
        }
    
    }

Verify the result

After the producer sends the message and the consumer subscribes to the message, you can query the trace of the message in the Message Queue for MQTT console to verify whether the message is sent and received. For more information, see Query the message traces.

References