All Products
Search
Document Center

ApsaraMQ for MQTT:Ekspor event status client

Last Updated:Mar 11, 2026

ApsaraMQ for MQTT mengekspor event koneksi dan pemutusan klien ke layanan Alibaba Cloud downstream melalui aturan yang dapat dikonfigurasi. Aplikasi backend mengonsumsi event tersebut untuk melacak status koneksi, memicu alur kerja otomatis, atau menganalisis pola konektivitas.

Penting

Notifikasi event status bersifat asinkron dan tidak mencerminkan status klien secara real-time. Untuk mengkueri status terkini suatu klien secara real-time, lihat Dapatkan status client ApsaraMQ for MQTT.

Cara kerja

Ketika klien ApsaraMQ for MQTT terhubung atau terputus, broker mendorong notifikasi status ke layanan Alibaba Cloud downstream berdasarkan aturan yang Anda konfigurasikan. Aplikasi backend yang diterapkan pada instans Elastic Compute Service (ECS) berlangganan notifikasi ini untuk mendeteksi transisi online dan offline klien.

Status event export flow

Karena proses ini bersifat asinkron, satu notifikasi saja tidak menunjukkan apakah klien saat ini sedang online. Untuk menentukan status sebenarnya, analisis garis waktu notifikasi event untuk klien tersebut. Untuk detailnya, lihat Tentukan status client dari event.

Kasus penggunaan

Ekspor event status memicu aksi backend ketika klien MQTT terhubung atau terputus. Alur kerja khas untuk analisis aktivitas koneksi:

  1. Status koneksi klien berubah beberapa kali.

  2. Broker membungkus setiap perubahan menjadi notifikasi berdasarkan aturan yang dikonfigurasi.

  3. Notifikasi dikirimkan ke topik ApsaraMQ for RocketMQ.

  4. Aplikasi backend mengonsumsi notifikasi dan membangun riwayat koneksi untuk klien tersebut.

Batasan

ItemBatasDeskripsi
Aturan per instans100Untuk meminta batas yang lebih tinggi, bergabunglah dengan grup DingTalk 116015007918 untuk menghubungi dukungan teknis ApsaraMQ for MQTT.
Deduplikasi aturanSatu aturan tiap jenis per resource internalMisalnya, setiap group mendukung satu aturan notifikasi status client, dan setiap topik ApsaraMQ for MQTT mendukung satu aturan data inbound dan satu aturan data outbound.
Aturan cross-regionTidak didukungSumber data dan tujuan data harus berada di wilayah yang sama.
Versi instans ApsaraMQ for MQTTHanya Kernel V3.x.xPeriksa versi kernel di daftar instans atau pada halaman Detail Instans di Konsol ApsaraMQ for MQTT.
Versi instans ApsaraMQ for RocketMQHanya ApsaraMQ for RocketMQ 4.0Berlaku untuk aturan data inbound dan data outbound antara ApsaraMQ for MQTT dan ApsaraMQ for RocketMQ.

Pemetaan resource

Notifikasi status untuk semua klien dalam group ApsaraMQ for MQTT yang sama diteruskan ke resource downstream yang sama.

Resource ApsaraMQ for MQTTLayanan Alibaba CloudResource downstreamDefinisi paket
GroupApsaraMQ for RocketMQTopikPemetaan struktur pesan antara ApsaraMQ for MQTT dan ApsaraMQ for RocketMQ

Siapkan ekspor event status

Langkah-langkah berikut menggunakan ApsaraMQ for RocketMQ sebagai layanan downstream.

Prasyarat

Sebelum memulai, pastikan Anda telah memiliki:

  • Instans ApsaraMQ for MQTT dengan versi kernel V3.x.x

  • Instans ApsaraMQ for RocketMQ 4.0 di wilayah yang sama

  • Setidaknya satu group yang dikonfigurasi pada instans ApsaraMQ for MQTT

Langkah 1: Buat aturan notifikasi status client

Di Konsol ApsaraMQ for MQTT, buat aturan dan pilih group-group yang event status client-nya ingin diekspor. Untuk langkah-langkah detail, lihat Buat aturan untuk notifikasi status client.

Langkah 2: Berlangganan notifikasi status di aplikasi backend Anda

Setelah aturan berlaku, event status dikirimkan ke topik ApsaraMQ for RocketMQ yang dikonfigurasi. Berlangganan topik tersebut di aplikasi backend Anda untuk menerima event tersebut. Untuk detail langganan, lihat Berlangganan pesan.

Jenis event

Setiap notifikasi status dikirimkan sebagai pesan ApsaraMQ for RocketMQ. Jenis event ditentukan dalam tag pesan.

Format tag: connect, disconnect, atau tcpclean

TagMaknaKondisi pemicu
connectClient terhubungClient membuat koneksi dengan broker.
disconnectClient mengirim paket DISCONNECTClient mengirim paket DISCONNECT sebelum menutup koneksi TCP, sesuai protokol MQTT. Jika SDK client tidak mengirim paket ini, broker tidak menghasilkan event disconnect.
tcpcleanKoneksi TCP ditutupKoneksi TCP ditutup, terlepas dari apakah client mengirim paket DISCONNECT atau tidak.
Penting

Untuk menentukan apakah klien sedang offline, periksa tag tcpclean, bukan disconnect. Klien yang keluar secara tak terduga mungkin tidak mengirim paket DISCONNECT, sehingga event disconnect mungkin tidak pernah muncul.

Format data event

Isi pesan berupa objek JSON dengan bidang-bidang berikut:

BidangTipeDeskripsi
clientIdStringID client ApsaraMQ for MQTT. Format: GID_XXX@@@YYYYY.
timeLongTimestamp saat event terjadi.
eventTypeStringJenis event: connect, disconnect, atau tcpclean.
channelIdStringPengidentifikasi unik koneksi TCP.
clientIpStringAlamat IP publik dan port client MQTT.

Contoh:

clientId: GID_XXX@@@YYYYY
time:1212121212
eventType:connect/disconnect/tcpclean
channelId:2b9b1281046046faafe5e0b458e4XXXX
clientIp:192.168.XX.XX:133XX

Tentukan status client dari event

Karena notifikasi bersifat asinkron, Anda tidak dapat menentukan apakah klien sedang online hanya dari satu event. Lacak garis waktu lengkap event untuk setiap clientId dan terapkan aturan berikut:

  1. Gunakan timestamp untuk pengurutan. Nilai time yang lebih besar berarti event yang lebih baru. Bandingkan timestamp hanya di antara event yang memiliki clientId yang sama.

  2. Batasi cakupan event offline berdasarkan channelId. Setiap channelId merepresentasikan satu koneksi TCP dengan tepat satu event connect dan satu event penutupan (tcpclean). Event tcpclean hanya membatalkan koneksi pada channelId yang sama — tidak memengaruhi koneksi pada channelId yang berbeda.

  3. Handle rekoneksi sementara. Klien mungkin terputus dan terhubung kembali beberapa kali, menghasilkan beberapa nilai channelId. Saat menerima event tcpclean, verifikasi bahwa channelId-nya cocok dengan koneksi yang sedang Anda lacak sebelum menandai klien sebagai offline.

Contoh skenario:

Klien terhubung, terputus sebentar, lalu terhubung kembali:

PesananEventchannelIdStatus client
1connectaaa111Online (koneksi aaa111 aktif)
2tcpcleanaaa111Offline (koneksi aaa111 ditutup)
3connectbbb222Online (koneksi bbb222 aktif)

Jika Anda menerima tcpclean untuk aaa111 setelah connect untuk bbb222, klien tetap online karena bbb222 belum memiliki tcpclean yang sesuai.

Logika pengambilan keputusan:

Untuk clientId tertentu:
  1. Pertahankan peta channelId -> event.
  2. Saat menerima event "connect":
     - Simpan event tersebut di bawah channelId-nya.
  3. Saat menerima event "tcpclean":
     - Simpan event tersebut di bawah channelId-nya.
     - Jika channelId tersebut memiliki event "connect" dan "tcpclean",
       koneksi tersebut ditutup. Hapus channelId dari peta.
  4. Periksa apakah client sedang online:
     - Jika ada channelId dalam peta yang memiliki event "connect" tetapi tidak
       memiliki event "tcpclean", client sedang online.
     - Jika semua channelId telah dihapus (atau memiliki kedua event),
       client sedang offline.

Kode contoh (Java)

Contoh berikut menggunakan konsumen ApsaraMQ for RocketMQ untuk menerima dan memproses notifikasi status klien. Konsumen berlangganan tag connect dan tcpclean pada topik RocketMQ yang ditentukan.

Untuk kode sumber lengkap, lihat MQTTClientStatusNoticeProcessDemo.java.

package com.aliyun.openservices.lmq.example.demo;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import java.util.Map;
import java.util.Properties;
import java.util.Set;

public class MQTTClientStatusNoticeProcessDemo {
    public static void main(String[] args) {
        // Inisialisasi konsumen ApsaraMQ for RocketMQ untuk menerima notifikasi status.
        Properties properties = new Properties();

        // ID group konsumen RocketMQ (berbeda dari ID group ApsaraMQ for MQTT).
        properties.setProperty(PropertyKeyConst.GROUP_ID, "<your-rocketmq-group-id>");

        // Pasangan AccessKey. Dapatkan dari Konsol RAM.
        properties.put(PropertyKeyConst.AccessKey, "<your-access-key>");
        properties.put(PropertyKeyConst.SecretKey, "<your-secret-key>");

        // Titik akhir TCP instans ApsaraMQ for RocketMQ.
        properties.put(PropertyKeyConst.NAMESRV_ADDR, "<your-rocketmq-endpoint>");

        // Topik RocketMQ yang menerima notifikasi status client MQTT.
        // Buat topik ini terlebih dahulu di Konsol ApsaraMQ for RocketMQ.
        final String parentTopic = "GID_XXXX_MQTT";

        // Di lingkungan produksi, gunakan penyimpanan eksternal (database atau Redis)
        // untuk menyimpan data status secara persisten meskipun aplikasi dimulai ulang.
        MqttClientStatusStore mqttClientStatusStore = new MemoryHashMapStoreImpl();

        Consumer consumer = ONSFactory.createConsumer(properties);

        // Berlangganan hanya event "connect" dan "tcpclean".
        consumer.subscribe(parentTopic, "connect||tcpclean",
            new MqttClientStatusNoticeListener(mqttClientStatusStore));
        consumer.start();

        // Periksa status client tertentu secara berkala.
        String clientId = "GID_XXXXxXX@@@XXXXX";
        while (true) {
            System.out.println("Client online: " + checkClientOnline(clientId, mqttClientStatusStore));
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * Memproses notifikasi status masuk dan memelihara peta channel-event.
     *
     * Di penerapan multi-server, simpan status client di penyimpanan eksternal bersama
     * (database atau Redis). Implementasikan konsumsi idempoten untuk menangani pesan duplikat.
     */
    static class MqttClientStatusNoticeListener implements MessageListener {
        private MqttClientStatusStore mqttClientStatusStore;

        public MqttClientStatusNoticeListener(
            MqttClientStatusStore mqttClientStatusStore) {
            this.mqttClientStatusStore = mqttClientStatusStore;
        }

        @Override
        public Action consume(Message message, ConsumeContext context) {
            try {
                JSONObject msgBody = JSON.parseObject(new String(message.getBody()));
                System.out.println(msgBody);
                String eventType = msgBody.getString("eventType");
                String clientId = msgBody.getString("clientId");
                String channelId = msgBody.getString("channelId");
                ClientStatusEvent event = new ClientStatusEvent();
                event.setChannelId(channelId);
                event.setClientIp(msgBody.getString("clientIp"));
                event.setEventType(eventType);
                event.setTime(msgBody.getLong("time"));

                // Simpan event baru.
                mqttClientStatusStore.addEvent(clientId, channelId, eventType, event);

                // Baca daftar event untuk koneksi ini.
                Set<ClientStatusEvent> events = mqttClientStatusStore.getEvent(clientId, channelId);
                if (events == null || events.isEmpty()) {
                    return Action.CommitMessage;
                }

                // Jika terdapat event connect dan tcpclean untuk channelId ini,
                // koneksi tersebut ditutup. Bersihkan entri tersebut.
                boolean findOnlineEvent = false;
                boolean findOfflineEvent = false;
                for (ClientStatusEvent clientStatusEvent : events) {
                    if (clientStatusEvent.isOnlineEvent()) {
                        findOnlineEvent = true;
                    } else {
                        findOfflineEvent = true;
                    }
                }
                if (findOnlineEvent && findOfflineEvent) {
                    mqttClientStatusStore.deleteEvent(clientId, channelId);
                }
                return Action.CommitMessage;
            } catch (Throwable e) {
                e.printStackTrace();
            }
            return Action.ReconsumeLater;
        }
    }

    /**
     * Memeriksa apakah client memiliki koneksi TCP aktif.
     *
     * Logika:
     * 1. Jika peta channel kosong, client sedang offline.
     * 2. Jika ada channelId yang hanya memiliki event connect (tanpa tcpclean), client sedang online.
     * 3. Jika semua channelId memiliki event connect dan tcpclean, client sedang offline.
     */
    public static boolean checkClientOnline(String clientId,
        MqttClientStatusStore mqttClientStatusStore) {
        Map<String, Set<ClientStatusEvent>> channelMap =
            mqttClientStatusStore.getEventsByClientId(clientId);
        if (channelMap == null) {
            return false;
        }
        for (Set<ClientStatusEvent> events : channelMap.values()) {
            boolean findOnlineEvent = false;
            boolean findOfflineEvent = false;
            for (ClientStatusEvent event : events) {
                if (event.isOnlineEvent()) {
                    findOnlineEvent = true;
                } else {
                    findOfflineEvent = true;
                }
            }
            if (findOnlineEvent & !findOfflineEvent) {
                return true;
            }
        }
        return false;
    }

}

Ganti placeholder berikut dengan nilai aktual:

PlaceholderDeskripsiContoh
<your-rocketmq-group-id>ID group konsumen ApsaraMQ for RocketMQGID_status_consumer
<your-access-key>ID AccessKey Alibaba Cloud dari Konsol RAMLTAI5tXxx
<your-secret-key>Rahasia AccessKey Alibaba Cloud dari Konsol RAMxXxXxXx
<your-rocketmq-endpoint>Titik akhir TCP instans ApsaraMQ for RocketMQhttp://MQ_INST_xxx.mq-internet-access.mq-internet.aliyuncs.com:80

Referensi