All Products
Search
Document Center

ApsaraMQ for MQTT:Export status events of ApsaraMQ for MQTT clients

Last Updated:Oct 16, 2023

You can configure rules for client status notifications in ApsaraMQ for MQTT to export the online and offline events of ApsaraMQ forMQTT clients to other Alibaba Cloud services. Notifications of offline and online events are asynchronous with the client status. This topic describes the working mechanism, scenarios, and limits of client status notifications. This topic also describes the resource mappings between ApsaraMQ for MQTT and other Alibaba Cloud services.

Working mechanism

When an ApsaraMQ for MQTT client comes online or goes offline, the ApsaraMQ for MQTT broker pushes a client status notification to an Alibaba Cloud service based on the rule that you configured. Backend service applications deployed on Elastic Compute Service (ECS) instances can subscribe to client status notifications from the Alibaba Cloud service to receive notifications when ApsaraMQ for MQTT clients come online or go offline.上下线事件流出

This method is an asynchronous process and does not provide real-time information. An application can determine the actual status of a client only based on the timeline of event notifications.

This is a complicated and inaccurate method to determine the actual status of a client. However, the notifications of offline and online events can be used to analyze the status traces of multiple ApsaraMQ for MQTT clients.

Scenarios

Client status notifications are used to trigger some predefined actions of service applications when ApsaraMQ for MQTT clients come online or go offline.

For example, if the status of an ApsaraMQ for MQTT client changes several times, the ApsaraMQ for MQTT broker encapsulates the status changes into notifications based on the configured rule for client status notifications, and then sends the notifications to an ApsaraMQ for RocketMQ instance. This way, all status data of the client can be collected and analyzed.

Note

In scenarios other than the preceding example, we recommend that you call API operations to query the status of ApsaraMQ for MQTT clients in real time. For more information, see Obtain the status of an ApsaraMQ for MQTT client.

Limits

Item

Limit

Description

Number of rules created for a single instance

100

If you want to create more than 100 rules for an instance, join the DingTalk group 35228338 to contact the ApsaraMQ for MQTT technical support.

Rule deduplication

Only one rule of the same type can be created for each internal resource.

For example, you can create only one rule for client status notifications for each group, and one data inbound rule and one data outbound rule for each topic on ApsaraMQ for MQTT.

Region

You cannot create rules across regions. The instances to which the data source and data destination in a rule belong must reside in the same region.

For example, if you create a data outbound rule in which the data source is specified as ApsaraMQ for MQTT in the China (Hangzhou) region, you can select only an ApsaraMQ for RocketMQ instance in the China (Hangzhou) region as the data destination.

ApsaraMQ for MQTT instance version

You can create rules for only instances whose kernel version is V3.x.x.

You can view the kernel version of an ApsaraMQ for MQTT instance in the instance list or on the Instance Details page in the ApsaraMQ for MQTT console.

ApsaraMQ for RocketMQ instance version

Only ApsaraMQ for RocketMQ 4.0 instances are supported.

When you exchange data between ApsaraMQ for MQTT and ApsaraMQ for RocketMQ by using a data inbound rule or a data outbound rule, you can use only ApsaraMQ for RocketMQ 4.0 instances.

Resource mapping methods

Status notifications of all clients in the same group on ApsaraMQ for MQTT are forwarded to the resources of the same Alibaba Cloud service that you configure.

Table 1. Mappings

ApsaraMQ for MQTT resource

Alibaba Cloud service

Resource of the Alibaba Cloud service

Packet definition

Groups on ApsaraMQ for MQTT

ApsaraMQ for RocketMQ

Topics on ApsaraMQ for RocketMQ

Message structure mappings between ApsaraMQ for MQTT and ApsaraMQ for RocketMQ

Procedure

If you want to use the asynchronous method for client status notifications, you must create a rule for the notifications before you can export them to the backend Alibaba Cloud service. The following example uses ApsaraMQ for RocketMQ as the backend Alibaba Cloud service to describe how to use the asynchronous method for client status notifications.

  1. Create a rule for client status notifications.

    When you create the rule, select the groups whose client status events you want to receive in the ApsaraMQ for MQTT console. For more information, see Create a rule for client status notifications.

  2. Subscribe to the client status notifications on the service application.

    If the rule that you created in Step 1 takes effect, the service application can receive status events of the corresponding ApsaraMQ for MQTT clients. For information about how to use ApsaraMQ for RocketMQ to receive notifications, see Subscribe to messages. For information about the sample code, see MQTTClientStatusNoticeProcessDemo.java.

    Event types are specified in the tags that are attached to ApsaraMQ for RocketMQ. Tag format:

    MQ Tag:connect/disconnect/tcpclean

    The following items describe the information in the tags:

    • connect indicates that the ApsaraMQ for MQTT client comes online.

    • disconnect indicates that the ApsaraMQ for MQTT client ends the connection with the ApsaraMQ for MQTT broker. Based on the MQTT protocol, the ApsaraMQ for MQTT client sends a disconnect packet before it ends the TCP connection. The ApsaraMQ for MQTT broker triggers a client status notification only after the broker receives the disconnect packet from the ApsaraMQ for MQTT client. If the SDK of an ApsaraMQ for MQTT client does not send a disconnect packet based on the MQTT protocol, the ApsaraMQ for MQTT broker cannot receive the message.

    • tcpclean indicates that the TCP connection is closed. If the TCP connection is closed, the tag contains tcpclean, regardless of whether the ApsaraMQ for MQTT client has sent a disconnect packet.

    Note

    tcpclean indicates that the ApsaraMQ for MQTT client is disconnected on the network layer. disconnect only indicates that the ApsaraMQ for MQTT client sends a disconnect packet. disconnect may not appear in the tag when an ApsaraMQ for MQTT client exits unexpectedly without sending the disconnect packet. Therefore, to determine whether an ApsaraMQ for MQTT client is offline, check if the tag contains tcpclean.

    Data in tags is in the JSON format and contains the following keys:

    • clientId indicates a specific ApsaraMQ for MQTT client.

    • time indicates the time when the event occurs.

    • eventType indicates the event type, which is used by the ApsaraMQ for MQTT client to differentiate events.

    • channelId indicates the unique identifier of the TCP connection.

    • clientIp indicates the public IP address that is used by the ApsaraMQ for MQTT client.

    Example:

    clientId: GID_XXX@@@YYYYY
    time:1212121212
    eventType:connect/disconnect/tcpclean
    channelId:2b9b1281046046faafe5e0b458e4XXXX
    clientIp:192.168.XX.XX:133XX  

    To determine the status of an ApsaraMQ for MQTT client, check the timeline of the received client status notifications. You cannot determine the client status based on only the last received notification.

    Determine the client status based on the following rules:

    • Timestamps indicate the sequence of status events that an ApsaraMQ for MQTT client that is specified by the clientId generates. A more recent event has a larger timestamp value.

    • Transient connectivity errors may occur in an ApsaraMQ for MQTT client that is specified by a clientId several times. Therefore, when you receive an offline event notification, check the channelId value to determine whether the notification is related to the current TCP connection. An offline event notification can overwrite only offline event notifications with the same channelId. If two offline event notifications have different channelId values, the notification with a larger time value cannot overwrite the other one. A channelId identifies a TCP connection. A TCP connection has only one connect event and one close event.

Sample code for asynchronous status notifications in Java

package com.aliyun.openservices.lmq.example.demo;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import java.util.Map;
import java.util.Properties;
import java.util.Set;

public class MQTTClientStatusNoticeProcessDemo {
    public static void main(String[] args) {
        /**
         * Initialize an ApsaraMQ for RocketMQ client as a receiver. In most business scenarios, the code is deployed in a backend application. 
         */
        Properties properties = new Properties();
        /**
         * Specify the group ID of the ApsaraMQ for RocketMQ client. Note that the group ID that you specify for this parameter is different from the ID of the group on the ApsaraMQ for MQTT instance. You must specify group IDs based on the corresponding service instructions. 
         */
        properties.setProperty(PropertyKeyConst.GROUP_ID, "GID_XXXX");
        /**
         * The AccessKey ID of the Alibaba Cloud account. You can obtain the AccessKey ID from the Resource Access Management (RAM) console. 
         */
        properties.put(PropertyKeyConst.AccessKey, "XXXX");
        /**
         * The AccessKey secret of the Alibaba Cloud account. You can obtain the AccessKey secret from the RAM console. The AccessKey secret is required only if the signature authentication mode is used. 
         */
        properties.put(PropertyKeyConst.SecretKey, "XXXX");
        /**
         * Specify the TCP endpoint. 
         */
        properties.put(PropertyKeyConst.NAMESRV_ADDR, "http://XXXX");
        /**
         * If you use an ApsaraMQ for RocketMQ consumer to process status notifications of the ApsaraMQ for MQTT client, the consumer must subscribe to the topic that is used to receive the status notifications. You must create the topic in advance by referring to the documentation in the ApsaraMQ for RocketMQ console. 
         */
        final String parentTopic = "GID_XXXX_MQTT";
        /**
         * The client status data. In a production environment, we recommend that you use an external persistent storage system, such as a database or Redis system, to store status data to prevent the loss of status data upon application restart. In this example, the status data is stored in an on-premises machine. 
         */
        MqttClientStatusStore mqttClientStatusStore = new MemoryHashMapStoreImpl();
        Consumer consumer = ONSFactory.createConsumer(properties);
        /**
         * The following code determines only whether the client is online. Therefore, you need to only pay attention to the connect and tcpclean events. 
         */
        consumer.subscribe(parentTopic, "connect||tcpclean", new MqttClientStatusNoticeListener(mqttClientStatusStore));
        consumer.start();
        String clientId = "GID_XXXXxXX@@@XXXXX";
        while (true) {
            System.out.println("ClientStatus :" + checkClientOnline(clientId, mqttClientStatusStore));
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * The logic that is used to process client status notifications. 
     * During an actual deployment process, applications that consume status notification messages may be deployed on multiple servers. Therefore, the status data of clients can be maintained in external shared storage such as a database or Redis system. 
     * If a state machine repeatedly receives messages, perform consumption idempotence processing on messages to prevent errors that may occur. 
     */
    static class MqttClientStatusNoticeListener implements MessageListener {
        private MqttClientStatusStore mqttClientStatusStore;

        public MqttClientStatusNoticeListener(
            MqttClientStatusStore mqttClientStatusStore) {
            this.mqttClientStatusStore = mqttClientStatusStore;
        }

        @Override
        public Action consume(Message message, ConsumeContext context) {
            try {
                JSONObject msgBody = JSON.parseObject(new String(message.getBody()));
                System.out.println(msgBody);
                String eventType = msgBody.getString("eventType");
                String clientId = msgBody.getString("clientId");
                String channelId = msgBody.getString("channelId");
                ClientStatusEvent event = new ClientStatusEvent();
                event.setChannelId(channelId);
                event.setClientIp(msgBody.getString("clientIp"));
                event.setEventType(eventType);
                event.setTime(msgBody.getLong("time"));
                /**
                 * Store new events first. 
                 */
                mqttClientStatusStore.addEvent(clientId, channelId, eventType, event);
                /**
                 * Read the event list of the current connection. 
                 */
                Set<ClientStatusEvent> events = mqttClientStatusStore.getEvent(clientId, channelId);
                if (events == null || events.isEmpty()) {
                    return Action.CommitMessage;
                }
                /**
                 * If all online and offline events in the list are received and the current connection is closed, the data of the connection can be cleared. 
                 */
                boolean findOnlineEvent = false;
                boolean findOfflineEvent = false;
                for (ClientStatusEvent clientStatusEvent : events) {
                    if (clientStatusEvent.isOnlineEvent()) {
                        findOnlineEvent = true;
                    } else {
                        findOfflineEvent = true;
                    }
                }
                if (findOnlineEvent && findOfflineEvent) {
                    mqttClientStatusStore.deleteEvent(clientId, channelId);
                }
                return Action.CommitMessage;
            } catch (Throwable e) {
                e.printStackTrace();
            }
            return Action.ReconsumeLater;
        }
    }

    /**
     * Check whether an active TCP connection is present for an ApsaraMQ for MQTT client based on the channel table. 
     * 1. If the channel table is empty, the ApsaraMQ for MQTT client is offline. 
     * 2. If the channel table is not empty, check whether only online events are received in a connection. If yes, an active connection is present and the ApsaraMQ for MQTT client is online. 
     * If all channels have disconnection events, the client is offline. 
     *
     * @param clientId
     * @param mqttClientStatusStore
     * @return
     */
    public static boolean checkClientOnline(String clientId,
        MqttClientStatusStore mqttClientStatusStore) {
        Map<String, Set<ClientStatusEvent>> channelMap = mqttClientStatusStore.getEventsByClientId(clientId);
        if (channelMap == null) {
            return false;
        }
        for (Set<ClientStatusEvent> events : channelMap.values()) {
            boolean findOnlineEvent = false;
            boolean findOfflineEvent = false;
            for (ClientStatusEvent event : events) {
                if (event.isOnlineEvent()) {
                    findOnlineEvent = true;
                } else {
                    findOfflineEvent = true;
                }
            }
            if (findOnlineEvent & !findOfflineEvent) {
                return true;
            }
        }
        return false;
    }

}

References

For information about operations in the console, see Manage rules for client status notifications.