All Products
Search
Document Center

ApsaraMQ for MQTT:Export client status events

Last Updated:Mar 10, 2026

ApsaraMQ for MQTT exports client connect and disconnect events to downstream Alibaba Cloud services through configurable rules. Backend applications consume these events to track connection status, trigger automated workflows, or analyze connectivity patterns.

Important

Status event notifications are asynchronous and do not reflect real-time client status. To query the current status of a client in real time, see Obtain the status of an ApsaraMQ for MQTT client.

How it works

When an ApsaraMQ for MQTT client connects or disconnects, the broker pushes a status notification to a downstream Alibaba Cloud service based on the rule you configured. Backend applications deployed on Elastic Compute Service (ECS) instances subscribe to these notifications to detect client online and offline transitions.

Status event export flow

Because this process is asynchronous, a single notification does not indicate whether a client is currently online. To determine actual status, analyze the timeline of event notifications for that client. For details, see Determine client status from events.

Use cases

Status event export triggers backend actions when MQTT clients connect or disconnect. A typical workflow for connection activity analysis:

  1. A client's connection status changes multiple times.

  2. The broker encapsulates each change into a notification based on the configured rule.

  3. The notifications are delivered to an ApsaraMQ for RocketMQ topic.

  4. A backend application consumes the notifications and builds a connection history for the client.

Limits

ItemLimitDescription
Rules per instance100To request a higher limit, join DingTalk group 116015007918 to contact ApsaraMQ for MQTT technical support.
Rule deduplicationOne rule of each type per internal resourceFor example, each group supports one client status notification rule, and each ApsaraMQ for MQTT topic supports one data inbound rule and one data outbound rule.
Cross-region rulesNot supportedThe data source and data destination must reside in the same region.
ApsaraMQ for MQTT instance versionKernel V3.x.x onlyCheck the kernel version in the instance list or on the Instance Details page in the ApsaraMQ for MQTT console.
ApsaraMQ for RocketMQ instance versionApsaraMQ for RocketMQ 4.0 onlyApplies to data inbound and data outbound rules between ApsaraMQ for MQTT and ApsaraMQ for RocketMQ.

Resource mappings

Status notifications for all clients in the same ApsaraMQ for MQTT group are forwarded to the same downstream resource.

ApsaraMQ for MQTT resourceAlibaba Cloud serviceDownstream resourcePacket definition
GroupApsaraMQ for RocketMQTopicMessage structure mappings between ApsaraMQ for MQTT and ApsaraMQ for RocketMQ

Set up status event export

The following steps use ApsaraMQ for RocketMQ as the downstream service.

Prerequisites

Before you begin, make sure that you have:

  • An ApsaraMQ for MQTT instance with kernel version V3.x.x

  • An ApsaraMQ for RocketMQ 4.0 instance in the same region

  • At least one group configured on the ApsaraMQ for MQTT instance

Step 1: Create a client status notification rule

In the ApsaraMQ for MQTT console, create a rule and select the groups whose client status events you want to export. For detailed steps, see Create a rule for client status notifications.

Step 2: Subscribe to status notifications in your backend application

After the rule takes effect, status events are delivered to the configured ApsaraMQ for RocketMQ topic. Subscribe to that topic in your backend application to receive the events. For subscription details, see Subscribe to messages.

Event types

Each status notification is delivered as an ApsaraMQ for RocketMQ message. The event type is specified in the message tag.

Tag format: connect, disconnect, or tcpclean

TagMeaningTrigger condition
connectClient connectedThe client establishes a connection with the broker.
disconnectClient sent a DISCONNECT packetThe client sends a DISCONNECT packet before closing the TCP connection, per the MQTT protocol. If the client SDK does not send this packet, the broker does not generate a disconnect event.
tcpcleanTCP connection closedThe TCP connection closes, regardless of whether the client sent a DISCONNECT packet.
Important

To determine whether a client is offline, check for the tcpclean tag, not disconnect. A client that exits unexpectedly may not send a DISCONNECT packet, so the disconnect event may never appear.

Event data format

The message body is a JSON object with the following fields:

FieldTypeDescription
clientIdStringThe ApsaraMQ for MQTT client ID. Format: GID_XXX@@@YYYYY.
timeLongThe timestamp when the event occurred.
eventTypeStringThe event type: connect, disconnect, or tcpclean.
channelIdStringThe unique identifier of the TCP connection.
clientIpStringThe public IP address and port of the MQTT client.

Example:

clientId: GID_XXX@@@YYYYY
time:1212121212
eventType:connect/disconnect/tcpclean
channelId:2b9b1281046046faafe5e0b458e4XXXX
clientIp:192.168.XX.XX:133XX

Determine client status from events

Because notifications are asynchronous, you cannot determine whether a client is online from a single event. Track the full timeline of events for each clientId and apply the following rules:

  1. Use timestamps for ordering. A larger time value means a more recent event. Compare timestamps only within events that share the same clientId.

  2. Scope offline events by channelId. Each channelId represents one TCP connection with exactly one connect event and one close event (tcpclean). A tcpclean event invalidates only the connection on the same channelId -- it does not affect connections on a different channelId.

  3. Handle transient reconnections. A client may disconnect and reconnect multiple times, producing multiple channelId values. When you receive a tcpclean event, verify that its channelId matches the connection you are tracking before marking the client as offline.

Example scenario:

A client connects, disconnects briefly, and reconnects:

OrderEventchannelIdClient status
1connectaaa111Online (connection aaa111 active)
2tcpcleanaaa111Offline (connection aaa111 closed)
3connectbbb222Online (connection bbb222 active)

If you receive the tcpclean for aaa111 after the connect for bbb222, the client is still online because bbb222 has no corresponding tcpclean.

Decision logic:

For a given clientId:
  1. Maintain a map of channelId -> events.
  2. On receiving a "connect" event:
     - Store the event under its channelId.
  3. On receiving a "tcpclean" event:
     - Store the event under its channelId.
     - If the channelId has both a "connect" and a "tcpclean" event,
       that connection is closed. Remove the channelId from the map.
  4. Check if the client is online:
     - If any channelId in the map has a "connect" event but no
       "tcpclean" event, the client is online.
     - If all channelIds have been removed (or have both events),
       the client is offline.

Sample code (Java)

The following example uses an ApsaraMQ for RocketMQ consumer to receive and process client status notifications. The consumer subscribes to connect and tcpclean tags on the designated RocketMQ topic.

For the complete source code, see MQTTClientStatusNoticeProcessDemo.java.

package com.aliyun.openservices.lmq.example.demo;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import java.util.Map;
import java.util.Properties;
import java.util.Set;

public class MQTTClientStatusNoticeProcessDemo {
    public static void main(String[] args) {
        // Initialize an ApsaraMQ for RocketMQ consumer to receive status notifications.
        Properties properties = new Properties();

        // The RocketMQ consumer group ID (different from the ApsaraMQ for MQTT group ID).
        properties.setProperty(PropertyKeyConst.GROUP_ID, "<your-rocketmq-group-id>");

        // AccessKey pair. Get these from the RAM console.
        properties.put(PropertyKeyConst.AccessKey, "<your-access-key>");
        properties.put(PropertyKeyConst.SecretKey, "<your-secret-key>");

        // TCP endpoint of the ApsaraMQ for RocketMQ instance.
        properties.put(PropertyKeyConst.NAMESRV_ADDR, "<your-rocketmq-endpoint>");

        // The RocketMQ topic that receives MQTT client status notifications.
        // Create this topic in the ApsaraMQ for RocketMQ console beforehand.
        final String parentTopic = "GID_XXXX_MQTT";

        // In production, use an external store (database or Redis) to persist status data
        // across application restarts.
        MqttClientStatusStore mqttClientStatusStore = new MemoryHashMapStoreImpl();

        Consumer consumer = ONSFactory.createConsumer(properties);

        // Subscribe to "connect" and "tcpclean" events only.
        consumer.subscribe(parentTopic, "connect||tcpclean",
            new MqttClientStatusNoticeListener(mqttClientStatusStore));
        consumer.start();

        // Poll the status of a specific client.
        String clientId = "GID_XXXXxXX@@@XXXXX";
        while (true) {
            System.out.println("Client online: " + checkClientOnline(clientId, mqttClientStatusStore));
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * Processes incoming status notifications and maintains the channel-event map.
     *
     * In a multi-server deployment, store client status in a shared external store
     * (database or Redis). Implement idempotent consumption to handle duplicate messages.
     */
    static class MqttClientStatusNoticeListener implements MessageListener {
        private MqttClientStatusStore mqttClientStatusStore;

        public MqttClientStatusNoticeListener(
            MqttClientStatusStore mqttClientStatusStore) {
            this.mqttClientStatusStore = mqttClientStatusStore;
        }

        @Override
        public Action consume(Message message, ConsumeContext context) {
            try {
                JSONObject msgBody = JSON.parseObject(new String(message.getBody()));
                System.out.println(msgBody);
                String eventType = msgBody.getString("eventType");
                String clientId = msgBody.getString("clientId");
                String channelId = msgBody.getString("channelId");
                ClientStatusEvent event = new ClientStatusEvent();
                event.setChannelId(channelId);
                event.setClientIp(msgBody.getString("clientIp"));
                event.setEventType(eventType);
                event.setTime(msgBody.getLong("time"));

                // Store the new event.
                mqttClientStatusStore.addEvent(clientId, channelId, eventType, event);

                // Read the event list for this connection.
                Set<ClientStatusEvent> events = mqttClientStatusStore.getEvent(clientId, channelId);
                if (events == null || events.isEmpty()) {
                    return Action.CommitMessage;
                }

                // If both connect and tcpclean events exist for this channelId,
                // the connection is closed. Clean up the entry.
                boolean findOnlineEvent = false;
                boolean findOfflineEvent = false;
                for (ClientStatusEvent clientStatusEvent : events) {
                    if (clientStatusEvent.isOnlineEvent()) {
                        findOnlineEvent = true;
                    } else {
                        findOfflineEvent = true;
                    }
                }
                if (findOnlineEvent && findOfflineEvent) {
                    mqttClientStatusStore.deleteEvent(clientId, channelId);
                }
                return Action.CommitMessage;
            } catch (Throwable e) {
                e.printStackTrace();
            }
            return Action.ReconsumeLater;
        }
    }

    /**
     * Checks whether the client has an active TCP connection.
     *
     * Logic:
     * 1. If the channel map is empty, the client is offline.
     * 2. If any channelId has only a connect event (no tcpclean), the client is online.
     * 3. If all channelIds have both connect and tcpclean events, the client is offline.
     */
    public static boolean checkClientOnline(String clientId,
        MqttClientStatusStore mqttClientStatusStore) {
        Map<String, Set<ClientStatusEvent>> channelMap =
            mqttClientStatusStore.getEventsByClientId(clientId);
        if (channelMap == null) {
            return false;
        }
        for (Set<ClientStatusEvent> events : channelMap.values()) {
            boolean findOnlineEvent = false;
            boolean findOfflineEvent = false;
            for (ClientStatusEvent event : events) {
                if (event.isOnlineEvent()) {
                    findOnlineEvent = true;
                } else {
                    findOfflineEvent = true;
                }
            }
            if (findOnlineEvent & !findOfflineEvent) {
                return true;
            }
        }
        return false;
    }

}

Replace the following placeholders with actual values:

PlaceholderDescriptionExample
<your-rocketmq-group-id>ApsaraMQ for RocketMQ consumer group IDGID_status_consumer
<your-access-key>Alibaba Cloud AccessKey ID from the RAM consoleLTAI5tXxx
<your-secret-key>Alibaba Cloud AccessKey secret from the RAM consolexXxXxXx
<your-rocketmq-endpoint>TCP endpoint of the ApsaraMQ for RocketMQ instancehttp://MQ_INST_xxx.mq-internet-access.mq-internet.aliyuncs.com:80

References