By running rules for client status notification of Message Queue for MQTT, you can export the obtained online and offline events of Message Queue for MQTT clients to other Alibaba Cloud services. This is an asynchronous method for status notification. This topic describes the principles and application scenarios of client status notifications. It also describes the resource mappings between Message Queue for MQTT and other Alibaba Cloud services.

How it works

When Message Queue for MQTT clients go online or offline, the Message Queue for MQTT broker pushes a status notification message to other backend cloud services based on the configured rules for client status notification. Service applications are generally deployed on Elastic Compute Service (ECS) instances. Service applications can subscribe to the status notification message from backend cloud services to obtain online and offline events of all related Message Queue for MQTT clients.

This method asynchronously perceives the client status and detects online and offline events instead of the status of clients. Therefore, cloud applications must deduce the client status based on the timeline of a series of events.

Asynchronous status notifications are based on message decoupling. Therefore, status determination based on such notifications is more complex and prone to misjudgment. However, this method can be used to analyze the running status traces of multiple Message Queue for MQTT clients based on events.

Notice The only supported backend cloud service is Message Queue for Apache RocketMQ.

Scenarios

Client status notifications are used in the scenarios where service applications need to trigger some predefined actions when Message Queue for MQTT clients go online or offline.

For example, in the scenario of client status aggregation, a Message Queue for MQTT client makes status changes such as going online and going offline. Then, the Message Queue for MQTT instance encapsulates the status changes and forwards them to a Message Queue for Apache RocketMQ instance based on the configured rules for client status notification to aggregate and collect client status data.

Note In other scenarios, we recommend that you obtain the client status by calling synchronous query operations. For more information, see Obtain the online status of the MQTT client.

Resource mapping methods

Status change notifications of all clients under the same Message Queue for MQTT group ID are forwarded to the resources of the same Alibaba Cloud service that you configure.

Table 1. Mappings
Message Queue for MQTT resource Another Alibaba Cloud service Resource of another Alibaba Cloud service Packet definition
MQTT Group ID Message Queue for Apache RocketMQ Message Queue for Apache RocketMQ topics Message structure mappings between Message Queue for MQTT and Message Queue for Apache RocketMQ

Procedure

As previously described, if asynchronous status notifications are used, you must create rules for client status notification, and export the status notification messages to backend cloud services. The Message Queue for Apache RocketMQ backend cloud service is used in the following example.

  1. Create a rule for client status notification.

    When you create a rule in the Message Queue for MQTT console, select the group ID of the group where the Message Queue for MQTT clients to which you want to pay attention are located. For more information about how to create a rule for client status notification, see Create a rule for client status notification.

  2. Service applications subscribe to this type of notifications.

    After the service applications adopt the rule created in Step 1, they can receive online or offline events of related Message Queue for MQTT clients. For more information about how to receive notifications in Message Queue for Apache RocketMQ, see Subscribe to messages. For more information about the sample code, see MQTTClientStatusNoticeProcessDemo.java.

    Event types are specified in the tag of Message Queue for Apache RocketMQ and represent online or offline events. Data format:

    MQ Tag: connect/disconnect/tcpclean

    where:

    • A connect event indicates that the Message Queue for MQTT client goes online.
    • A disconnect event indicates that the Message Queue for MQTT client actively disconnects from the Message Queue for MQTT broker. Based on the Message Queuing Telemetry Transport (MQTT) protocol, the Message Queue for MQTT client sends a disconnect message before it actively closes the TCP connection. The Message Queue for MQTT broker triggers a disconnect message after it receives the disconnect message from the Message Queue for MQTT client. If the SDK for a Message Queue for MQTT client does not send a disconnect message based on the MQTT protocol, the Message Queue for MQTT broker cannot receive the disconnect message.
    • A tcpclean event indicates that the TCP connection is closed. If the current TCP connection is closed, a tcpclean event is triggered regardless of whether the Message Queue for MQTT client has explicitly sent a disconnect message.
    Note

    A tcpclean message indicates that the TCP connection of the Message Queue for MQTT client is closed. A disconnect message only indicates that the Message Queue for MQTT client actively sends a disconnect message. Some Message Queue for MQTT clients may fail to send a disconnect message due to unexpected exits. This is subject to the implementation on the Message Queue for MQTT clients. Therefore, determine whether a Message Queue for MQTT client is offline based on the tcpclean event.

    Data content is in JSON format. The related keys have the following meanings:

    • clientId indicates a specific Message Queue for MQTT client.
    • time indicates the occurrence time of the event.
    • eventType indicates the event type, which is used by the Message Queue for MQTT client to differentiate events.
    • channelId uniquely identifies a TCP connection.
    • clientIp indicates the public egress IP address used by the Message Queue for MQTT client.

    Examples:

    clientId: GID_XXX@@@YYYYY
    time:1212121212
    eventType:connect/disconnect/tcpclean
    channelId:2b9b1281046046faafe5e0b458e4XXXX
    clientIp: 192.168.X.X:133XX     

    To determine whether a Message Queue for MQTT client is online, check the last received message and status notification messages. You cannot depend only on the last received message to determine the client status.

    Determine the client status based on the following rules:

    • The sequence of online and offline events generated by Message Queue for MQTT clients that have the same clientId is determined by time. This means that a more recent event has a greater timestamp.
    • Message Queue for MQTT clients that have the same clientId may be transiently disconnected multiple times. Therefore, when an offline notification message is received, you must check whether the message is related to the current TCP connection based on the channelId field. In short, an offline notification message can overwrite only another offline notification message that has the same channelId. If channelId values of two offline notification messages are different, the offline notification message that has a more recent occurrence time cannot overwrite the other message. A channelId represents the existence of a TCP connection. Only one connect event and one close event exist for a single TCP connection.
package com.aliyun.openservices.lmq.example.demo;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import java.util.Map;
import java.util.Properties;
import java.util.Set;

public class MQTTClientStatusNoticeProcessDemo {
    public static void main(String[] args) {
        /**
         * Initialize a Message Queue for Apache RocketMQ client. In actual business scenarios, the code is generally deployed in an application on an ECS instance. 
         */
        Properties properties = new Properties();
        /**
         * Set the group ID of the Message Queue for Apache RocketMQ client. Note that the groupId here is different from that in an Message Queue for MQTT instance. You can set the group ID based on the corresponding service instructions. 
         */
        properties.setProperty(PropertyKeyConst.GROUP_ID, "GID_XXXX");
        /**
         * The AccessKey ID, which is obtained from the Resource Access Management (RAM) console. 
         */
        properties.put(PropertyKeyConst.AccessKey, "XXXX");
        /**
         * The AccessKey secret, which is obtained from the RAM console. It must be set only for the signature authentication mode. 
         */
        properties.put(PropertyKeyConst.SecretKey, "XXXX");
        /**
         * Set a domain name for the TCP connection.
         */
        properties.put(PropertyKeyConst.NAMESRV_ADDR, "http://XXXX");
        /**
         * When a Message Queue for Apache RocketMQ consumer is used to process the status notifications of a Message Queue for MQTT client, the subscribed topic is a status notification topic. Create a topic in advance as instructed in the relevant topic about console operations in the documentation. 
         */
        final String parentTopic = "GID_XXXX_MQTT";
        /**
         * The client status data. In a production environment, we recommend that you use an external persistent storage system, such as a database or Redis system, to store status data to prevent the loss of status data upon application restart. This demo uses a standalone memory version to demonstrate. 
         */
        MqttClientStatusStore mqttClientStatusStore = new MemoryHashMapStoreImpl();
        Consumer consumer = ONSFactory.createConsumer(properties);
        /**
         * Note that the following code determines only whether the client is online. Therefore, you need only to pay attention to the connect and tcpclean events. 
         */
        consumer.subscribe(parentTopic, "connect||tcpclean", new MqttClientStatusNoticeListener(mqttClientStatusStore));
        consumer.start();
        String clientId = "GID_XXXXxXX@@@XXXXX";
        while (true) {
            System.out.println("ClientStatus :" + checkClientOnline(clientId, mqttClientStatusStore));
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * The logic for handling status notifications. 
     * In an actual deployment process, applications that consume status notification messages may be deployed on multiple ECS instances. Therefore, the status data of clients can be maintained in external shared storage such as a database or Redis system. 
     * Perform consumption idempotence processing on messages to avoid errors that may occur when a state machine repeatedly receives messages. 
     */
    static class MqttClientStatusNoticeListener implements MessageListener {
        private MqttClientStatusStore mqttClientStatusStore;

        public MqttClientStatusNoticeListener(
            MqttClientStatusStore mqttClientStatusStore) {
            this.mqttClientStatusStore = mqttClientStatusStore;
        }

        @Override
        public Action consume(Message message, ConsumeContext context) {
            try {
                JSONObject msgBody = JSON.parseObject(new String(message.getBody()));
                System.out.println(msgBody);
                String eventType = msgBody.getString("eventType");
                String clientId = msgBody.getString("clientId");
                String channelId = msgBody.getString("channelId");
                ClientStatusEvent event = new ClientStatusEvent();
                event.setChannelId(channelId);
                event.setClientIp(msgBody.getString("clientIp"));
                event.setEventType(eventType);
                event.setTime(msgBody.getLong("time"));
                /**
    * Store new events. 
                 */
                mqttClientStatusStore.addEvent(clientId, channelId, eventType, event);
                /**
    * Read the event list of the current channel. 
                 */
                Set<ClientStatusEvent> events = mqttClientStatusStore.getEvent(clientId, channelId);
                if (events == null || events.isEmpty()) {
                    return Action.CommitMessage;
                }
                /**
    * If both online and offline events in the event list are received, the channel has gone offline, and the data of this channel can be cleared. 
                 */
                boolean findOnlineEvent = false;
                boolean findOfflineEvent = false;
                for (ClientStatusEvent clientStatusEvent : events) {
                    if (clientStatusEvent.isOnlineEvent()) {
                        findOnlineEvent = true;
                    } else {
                        findOfflineEvent = true;
                    }
                }
                if (findOnlineEvent && findOfflineEvent) {
                    mqttClientStatusStore.deleteEvent(clientId, channelId);
                }
                return Action.CommitMessage;
            } catch (Throwable e) {
                e.printStackTrace();
            }
            return Action.ReconsumeLater;
        }
    }

    /**
    * Check whether an active TCP connection is present for a clientId based on the channel table. 
     * 1. If no channel table exists, the client is offline. 
     * 2. If the channel table is not empty, check whether the channel data contains only online events. If yes, an active connection is present and the client is online. 
    * If all channels have disconnection events, the client is offline. 
     *
     * @param clientId
     * @param mqttClientStatusStore
     * @return
     */
    public static boolean checkClientOnline(String clientId,
        MqttClientStatusStore mqttClientStatusStore) {
        Map<String, Set<ClientStatusEvent>> channelMap = mqttClientStatusStore.getEventsByClientId(clientId);
        if (channelMap == null) {
            return false;
        }
        for (Set<ClientStatusEvent> events : channelMap.values()) {
            boolean findOnlineEvent = false;
            boolean findOfflineEvent = false;
            for (ClientStatusEvent event : events) {
                if (event.isOnlineEvent()) {
                    findOnlineEvent = true;
                } else {
                    findOfflineEvent = true;
                }
            }
            if (findOnlineEvent & ! findOfflineEvent) {
                return true;
            }
        }
        return false;
    }

}

References

For more information about console operations, see Manage rules for client status notification.