You can configure rules for client status notification of Message Queue for MQTT to export the offline and online events of Message Queue for MQTT clients to other Alibaba Cloud services. Notifications of offline and online events are asynchronous with the client status. This topic describes the working mechanism, scenarios, and limits of client status notifications. It also describes the resource mappings between Message Queue for MQTT and other Alibaba Cloud services.

Working mechanism

When a Message Queue for MQTT client comes online or goes offline, the Message Queue for MQTT broker pushes a client status notification to an Alibaba Cloud service based on the rule that you configured. Backend service applications deployed on Elastic Compute Service (ECS) instances can subscribe to client status notifications from the Alibaba Cloud service to receive notifications when Message Queue for MQTT clients come online or go offline. Notifications of offline and online events

This method is an asynchronous process and does not provide real-time information. An application can determine the actual status of a client only based on the timeline of event notifications.

This is a complicated and inaccurate process for determining the actual status of a client. However, the notifications of offline and online events can be used to analyze the status traces of multiple Message Queue for MQTT clients.

Scenarios

Client status notifications are used to trigger some predefined actions of service applications when Message Queue for MQTT clients come online or go offline.

For example, when the status of a Message Queue for MQTT client changes for several times, the Message Queue for MQTT broker complies with the configured rule for client status notification to encapsulate the status changes into notifications and then sends these notifications to a Message Queue for Apache RocketMQ instance. This way, all status changes of the client can be collected and analyzed.

Note In scenarios other than the preceding example, we recommend that you call API operations to query the status of Message Queue for MQTT clients in real time. For more information, see Obtain the status of a Message Queue for MQTT client.

Limits

You can export data only from Message Queue for Apache RocketMQ to Message Queue for MQTT.

Resource mapping methods

You can configure an Alibaba Cloud service to which Message Queue for MQTT forwards status notifications of all Message Queue for MQTT clients under the same group ID.

Table 1. Mappings
Message Queue for MQTT resource Alibaba Cloud service Resource of the Alibaba Cloud service Property mappings
Group ID of Message Queue for MQTT Message Queue for Apache RocketMQ Topics of Message Queue for Apache RocketMQ Message structure mappings between Message Queue for MQTT and Message Queue for Apache RocketMQ

Procedure

As described in the previous sections, if you want to obtain asynchronous client status notifications, you must create a rule for client status notification before you can export the client status notifications to other Alibaba Cloud services. Message Queue for Apache RocketMQ is used as the Alibaba Cloud service in the following example.

  1. Create a rule for client status notification.

    When you create a rule in the Message Queue for MQTT console, find the Message Queue for MQTT clients whose status you want to query, and select the group ID of the clients. For more information about how to create a rule for client status notification, see Create a rule for client status notification.

  2. Ensure that the backend service application has subscribed to the client status notifications.

    The service application can receive offline and online events of related Message Queue for MQTT clients when the rule that you create in Step 1 takes effect. For more information about how to obtain messages from Message Queue for Apache RocketMQ, see Subscribe to messages. For more information about the sample code, see MQTTClientStatusNoticeProcessDemo.java.

    Event types are specified in the tag of Message Queue for Apache RocketMQ. Tag format:

    MQ Tag: connect/disconnect/tcpclean

    The following items describe the tags:

    • connect indicates that the Message Queue for MQTT client comes online.
    • disconnect indicates that the Message Queue for MQTT client ends the connection with the Message Queue for MQTT broker. Based on the MQTT protocol, the Message Queue for MQTT client sends a disconnect packet before it ends the TCP connection. The Message Queue for MQTT broker triggers a client status notification only after it receives the disconnect packet from the Message Queue for MQTT client.
    • tcpclean indicates that the TCP connection is closed. If the TCP connection is closed, the tag contains tcpclean regardless of whether the Message Queue for MQTT client has explicitly sent a disconnect packet.
    Note

    tcpclean indicates that the Message Queue for MQTT client is disconnected on the network layer. disconnect only indicates that the Message Queue for MQTT client sends a disconnect packet. disconnect may not appear in the tag when a Message Queue for MQTT client exits unexpectedly and does not send the disconnect packet. Therefore, to determine whether a Message Queue for MQTT client is offline, check if the tag contains tcpclean.

    The tag data is in JSON format. It contains the following keys:

    • clientId indicates a specific Message Queue for MQTT client.
    • time indicates the occurrence time of the event.
    • eventType indicates the event type, which is used to differentiate events of the Message Queue for MQTT client.
    • channelId uniquely identifies a TCP connection.
    • clientIp indicates the public egress IP address used by the Message Queue for MQTT client.

    The following sample code provides an example:

    clientId: GID_XXX@@@YYYYY
    time:1212121212
    eventType:connect/disconnect/tcpclean
    channelId:2b9b1281046046faafe5e0b458e4XXXX
    clientIp: 192.168.XX.XX:133XX     

    To determine the status of a Message Queue for MQTT client, check the timeline of received client status notifications. You cannot make a decision based on only the last received notification.

    Determine the client status based on the following rules:

    • Timestamps indicate the sequence of status events that a Message Queue for MQTT client specified by the clientId generates. A more recent event has a larger timestamp value.
    • A Message Queue for MQTT client specified by a clientId may experience transient connectivity errors for several times. Therefore, when you receive an offline event notification, check the channelId value to determine whether the notification is related to the current TCP connection. An offline event notification can overwrite only offline event notifications with the same channelId. If two offline event notifications have different channelId values, the notification with a larger time value cannot overwrite the other one. A channelId identifies a TCP connection. A TCP connection has only one connect event and one close event.
package com.aliyun.openservices.lmq.example.demo;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import java.util.Map;
import java.util.Properties;
import java.util.Set;

public class MQTTClientStatusNoticeProcessDemo {
    public static void main(String[] args) {
        /**
         * Initialize a Message Queue for Apache RocketMQ consumer. In actual business scenarios, the code is generally written in a backend service application. 
         */
        Properties properties = new Properties();
        /**
         * Set the group ID of the Message Queue for Apache RocketMQ client. Note that the group ID is different from that in the Message Queue for MQTT instance. You can set the group ID based on the corresponding service instructions. 
         */
        properties.setProperty(PropertyKeyConst.GROUP_ID, "GID_XXXX");
        /**
         * The AccessKey ID of the Alibaba Cloud account, which is obtained from the Resource Access Management (RAM) console. 
         */
        properties.put(PropertyKeyConst.AccessKey, "XXXX");
        /**
         * The AccessKey secret, which is obtained from the RAM console. It must be set for the signature authentication mode. 
         */
        properties.put(PropertyKeyConst.SecretKey, "XXXX");
        /**
         * Set a domain name for the TCP connection. 
         */
        properties.put(PropertyKeyConst.NAMESRV_ADDR, "http://XXXX");
        /**
         * When a Message Queue for Apache RocketMQ consumer is used to process the status notifications of a Message Queue for MQTT client, the consumer must subscribe to a topic of client status notifications. Create the topic in advance as instructed in the documentation about console operations. 
         */
        final String parentTopic = "GID_XXXX_MQTT";
        /**
         * The client status data. In a production environment, we recommend that you use an external persistent storage system, such as a database or Redis system, to store the client status data to prevent data loss upon application restart. A standalone memory is used in this demo. 
         */
        MqttClientStatusStore mqttClientStatusStore = new MemoryHashMapStoreImpl();
        Consumer consumer = ONSFactory.createConsumer(properties);
        /**
         * Note that the following code determines only whether the client is online. Therefore, you can only check the connect and tcpclean events. 
         */
        consumer.subscribe(parentTopic, "connect||tcpclean", new MqttClientStatusNoticeListener(mqttClientStatusStore));
        consumer.start();
        String clientId = "GID_XXXXxXX@@@XXXXX";
        while (true) {
            System.out.println("ClientStatus :" + checkClientOnline(clientId, mqttClientStatusStore));
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * The logic for handling client status notifications. 
     * In an actual deployment process, the application that consumes client status notifications may be deployed on multiple ECS instances. Therefore, the client status data can be maintained in the external shared storage such as a database or Redis system. 
     * Process messages using methods such as consumption idempotence to prevent the state machine from making wrong state changes based on repeated messages. 
     */
    static class MqttClientStatusNoticeListener implements MessageListener {
        private MqttClientStatusStore mqttClientStatusStore;

        public MqttClientStatusNoticeListener(
            MqttClientStatusStore mqttClientStatusStore) {
            this.mqttClientStatusStore = mqttClientStatusStore;
        }

        @Override
        public Action consume(Message message, ConsumeContext context) {
            try {
                JSONObject msgBody = JSON.parseObject(new String(message.getBody()));
                System.out.println(msgBody);
                String eventType = msgBody.getString("eventType");
                String clientId = msgBody.getString("clientId");
                String channelId = msgBody.getString("channelId");
                ClientStatusEvent event = new ClientStatusEvent();
                event.setChannelId(channelId);
                event.setClientIp(msgBody.getString("clientIp"));
                event.setEventType(eventType);
                event.setTime(msgBody.getLong("time"));
                /**
                 * Store new events. 
                 */
                mqttClientStatusStore.addEvent(clientId, channelId, eventType, event);
                /**
                 * Read the event list of the current TCP connection with the correct channelId. 
                 */
                Set<ClientStatusEvent> events = mqttClientStatusStore.getEvent(clientId, channelId);
                if (events == null || events.isEmpty()) {
                    return Action.CommitMessage;
                }
                /**
                 * If notifications about both online and offline events in the event list are received, the current TCP connection is closed. The data of this connection can be cleared. 
                 */
                boolean findOnlineEvent = false;
                boolean findOfflineEvent = false;
                for (ClientStatusEvent clientStatusEvent : events) {
                    if (clientStatusEvent.isOnlineEvent()) {
                        findOnlineEvent = true;
                    } else {
                        findOfflineEvent = true;
                    }
                }
                if (findOnlineEvent && findOfflineEvent) {
                    mqttClientStatusStore.deleteEvent(clientId, channelId);
                }
                return Action.CommitMessage;
            } catch (Throwable e) {
                e.printStackTrace();
            }
            return Action.ReconsumeLater;
        }
    }

    /**
     * Check whether the client specified by the clientId has an active TCP connection based on the channel table. 
     * 1. If no channel table is available, the client must be offline. 
     * 2. If a channel table is available, check whether any connection in the table has only online events. If yes, the connection is active and the Message Queue for MQTT client is online. 
    * If all connections in the table have disconnection events, the client must be offline. 
     *
     * @param clientId
     * @param mqttClientStatusStore
     * @return
     */
    public static boolean checkClientOnline(String clientId,
        MqttClientStatusStore mqttClientStatusStore) {
        Map<String, Set<ClientStatusEvent>> channelMap = mqttClientStatusStore.getEventsByClientId(clientId);
        if (channelMap == null) {
            return false;
        }
        for (Set<ClientStatusEvent> events : channelMap.values()) {
            boolean findOnlineEvent = false;
            boolean findOfflineEvent = false;
            for (ClientStatusEvent event : events) {
                if (event.isOnlineEvent()) {
                    findOnlineEvent = true;
                } else {
                    findOfflineEvent = true;
                }
            }
            if (findOnlineEvent & !findOfflineEvent) {
                return true;
            }
        }
        return false;
    }

}

References

For more information about console operations, see Manage rules for client status notification.