Latar Belakang
Dalam banyak skenario bisnis, layanan backend Anda perlu melacak dan menganalisis saat klien terhubung atau terputus, serta mendorong pesan berdasarkan status online-nya. ApsaraMQ for MQTT menyediakan notifikasi event siklus hidup asinkron untuk membantu Anda melacak status online klien. Saat klien MQTT terhubung atau terputus, layanan MQTT memicu pesan notifikasi. Anda dapat menerima notifikasi tersebut dengan salah satu cara berikut:
-
Gunakan cloud SDK untuk terhubung ke layanan ApsaraMQ for MQTT dan mengambil status online klien. Untuk informasi selengkapnya, lihat Dapatkan status online klien ApsaraMQ for MQTT.
-
Buat aturan notifikasi status klien untuk mendorong notifikasi status ke ApsaraMQ for RocketMQ. Anda kemudian dapat berlangganan pesan di ApsaraMQ for RocketMQ untuk mengambil status online klien.
Topik ini menjelaskan cara membuat aturan notifikasi status klien yang memungkinkan aplikasi backend mengambil status online klien.

Akses Jaringan
ApsaraMQ for MQTT menyediakan Public Endpoint dan VPC Endpoint.
Public Endpoint adalah alamat IP yang digunakan untuk mengakses ApsaraMQ for MQTT melalui Internet. Umumnya, endpoint publik digunakan dalam skenario IoT dan Internet seluler.
VPC Endpoint adalah alamat IP yang digunakan untuk mengakses ApsaraMQ for MQTT di dalam virtual private cloud (VPC). Umumnya, endpoint VPC digunakan oleh aplikasi cloud untuk terhubung ke ApsaraMQ for MQTT.
Penting Jika Anda ingin menggunakan endpoint untuk menghubungkan klien ke ApsaraMQ for MQTT, gunakan nama domain, bukan alamat IP, karena alamat IP berubah secara dinamis. Tim teknis ApsaraMQ for MQTT tidak bertanggung jawab atas titik kegagalan dan kerugian langsung maupun tidak langsung dalam skenario berikut:
Anda menggunakan alamat IP untuk mengakses klien ke ApsaraMQ for MQTT. Setelah tim teknis ApsaraMQ for MQTT memperbarui resolusi nama domain, alamat IP lama menjadi tidak valid.
Kebijakan firewall berbasis alamat IP diterapkan di jaringan tempat klien Anda berjalan. Setelah tim teknis ApsaraMQ for MQTT memperbarui resolusi nama domain, alamat IP baru diblokir akibat kebijakan firewall tersebut.
Prasyarat
Penting
-
Aturan notifikasi status klien untuk ApsaraMQ for MQTT hanya mendukung instans seri ApsaraMQ for RocketMQ 4.x.
-
Anda tidak dapat menggunakan aturan notifikasi status klien untuk ApsaraMQ for MQTT lintas wilayah. Oleh karena itu, sumber daya ApsaraMQ for MQTT dan ApsaraMQ for RocketMQ harus berada di wilayah yang sama.
1. Buat aturan notifikasi status klien
Masuk ke Konsol ApsaraMQ for MQTT. Di panel navigasi sebelah kiri, klik Instances.
Di bilah navigasi atas, pilih wilayah tempat instans yang ingin Anda kelola berada. Di halaman Instances, klik nama instans untuk membuka halaman Instance Details.
-
Di panel navigasi sebelah kiri, klik Rules.
-
Di halaman Rules, klik tab RocketMQ Rule, lalu klik Create Rule.
-
Di halaman Create Rule, lakukan langkah-langkah berikut.
-
Konfigurasikan Informasi Dasar. Masukkan ID aturan dan pilih tipe aturan untuk notifikasi status klien.
ID aturan harus terdiri dari 3 hingga 64 karakter dan hanya boleh berisi huruf, angka, tanda hubung (-), dan garis bawah (_). Setelah konfigurasi selesai, klik Next.
-
Konfigurasikan Sumber Aturan. Pilih group ID yang telah Anda buat untuk ApsaraMQ for MQTT.
Klik Next.
-
Konfigurasikan Tujuan Aturan. Pilih instans dan topik yang telah Anda buat untuk ApsaraMQ for RocketMQ.
Setelah pemilihan selesai, klik Create.
2. Siapkan kode pengujian
Anda akan menggunakan kode untuk menangani perubahan status klien dan memproses notifikasi status. Topik ini menyediakan kode demo Java yang diperlukan.
2.1 Unduh kode contoh
2.2 Kode koneksi dan pemutusan klien
Pada kelas MQ4IoTSendMessageToMQ4IoTUseSignatureMode.java, isi parameter untuk sumber daya ApsaraMQ for MQTT sesuai petunjuk dalam komentar kode.
Untuk pengujian ini, Anda hanya perlu mensimulasikan operasi koneksi dan pemutusan, sehingga tidak perlu mengirim pesan. Anda dapat menghapus kode pengiriman pesan. Berikut adalah kode contohnya.
Kode contoh untuk koneksi dan pemutusan klien
import com.aliyun.openservices.lmq.example.util.ConnectionOptionWrapper;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class MQ4IoTSendMessageToMQ4IoTUseSignatureMode {
public static void main(String[] args) throws Exception {
/**
* ID instans ApsaraMQ for MQTT. Dapatkan ini dari konsol setelah Anda membeli instans.
*/
String instanceId = "XXXXX";
/**
* Titik akhir. Dapatkan titik akhir setelah Anda membeli dan mengonfigurasi instans ApsaraMQ for MQTT.
* Anda harus menggunakan nama domain yang ditetapkan untuk titik akhir. Menggunakan alamat IP langsung dapat menyebabkan exception pada klien.
*/
String endPoint = "XXXXX.mqtt.aliyuncs.com";
/**
* ID AccessKey Akun Alibaba Cloud Anda. Dapatkan ini dari konsol.
* Pasangan Kunci Akses akun Alibaba Cloud memiliki izin atas semua operasi API. Kami menyarankan agar Anda menggunakan Pengguna RAM untuk akses API atau O&M rutin guna mencegah risiko keamanan.
* Kami sangat menyarankan agar Anda tidak menyematkan ID AccessKey dan Rahasia AccessKey secara langsung dalam kode proyek Anda. Hal ini dapat menimbulkan risiko keamanan jika kredensial bocor.
* Contoh ini menunjukkan cara menyimpan ID AccessKey dan Rahasia AccessKey dalam variabel lingkungan. Sebelum menjalankan kode ini, konfigurasikan variabel lingkungan MQTT_AK_ENV dan MQTT_SK_ENV.
* Misalnya: export MQTT_AK_ENV=<access_key_id>
* export MQTT_SK_ENV=<access_key_secret>
* Ganti <access_key_id> dengan ID AccessKey Anda dan <access_key_secret> dengan Rahasia AccessKey Anda.
*/
String accessKey = System.getenv("MQTT_AK_ENV");
/**
* Rahasia AccessKey Akun Alibaba Cloud Anda. Dapatkan ini dari konsol. Ini hanya diperlukan untuk otentikasi berbasis signature.
*/
String secretKey = System.getenv("MQTT_SK_ENV");
/**
* ID klien untuk ApsaraMQ for MQTT, yang ditetapkan oleh sistem bisnis. Setiap koneksi TCP harus memiliki ID klien yang unik.
* Jika objek klien berbeda (koneksi TCP) menggunakan ID klien yang sama, akan terjadi exception koneksi.
* ID klien terdiri dari dua bagian dalam format GroupID@@@DeviceId. Group ID diperoleh dari konsol ApsaraMQ for MQTT, dan DeviceId ditetapkan oleh sistem bisnis Anda. Panjang total ID klien tidak boleh melebihi 64 karakter.
*/
String clientId = "GID_XXXXX@@@XXXXX";
ConnectionOptionWrapper connectionOptionWrapper = new ConnectionOptionWrapper(instanceId, accessKey, secretKey, clientId);
final MemoryPersistence memoryPersistence = new MemoryPersistence();
/**
* Protokol dan port yang digunakan klien harus sesuai. Untuk informasi selengkapnya, lihat https://www.alibabacloud.com/help/en/message-queue-for-mqtt/latest/limits
* Untuk enkripsi SSL, atur endpoint menjadi ssl://endpoint:8883.
*/
final MqttClient mqttClient = new MqttClient("tcp://" + endPoint + ":1883", clientId, memoryPersistence);
/**
* Tetapkan timeout untuk klien guna mencegahnya terblokir tanpa batas waktu.
*/
mqttClient.setTimeToWait(5000);
final ExecutorService executorService = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
mqttClient.setCallback(new MqttCallbackExtended() {
@Override
public void connectComplete(boolean reconnect, String serverURI) {
System.out.println("connect success");
}
@Override
public void connectionLost(Throwable throwable) {
throwable.printStackTrace();
}
@Override
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
System.out.println("receive msg from topic " + s + " , body is " + new String(mqttMessage.getPayload()));
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
System.out.println("send msg succeed topic is : " + iMqttDeliveryToken.getTopics()[0]);
}
});
mqttClient.connect(connectionOptionWrapper.getMqttConnectOptions());
Thread.sleep(Long.MAX_VALUE);
}
}
2.3 Memproses pesan notifikasi status
Setelah pesan notifikasi status didorong ke ApsaraMQ for RocketMQ, konsumen harus berlangganan pesan tersebut dan memprosesnya.
Pada kelas MQTTClientStatusNoticeProcessDemo.java, tentukan parameter untuk sumber daya ApsaraMQ for RocketMQ sesuai petunjuk dalam komentar kode. Kode berikut memberikan contohnya.
Kode contoh untuk memproses pesan notifikasi status
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
public class MQTTClientStatusNoticeProcessDemo {
public static void main(String[] args) {
/**
* Inisialisasi klien penerima ApsaraMQ for RocketMQ. Dalam skenario nyata, ini biasanya diterapkan di aplikasi backend.
*/
Properties properties = new Properties();
/**
* Tetapkan group ID ApsaraMQ for RocketMQ. Buat ini di konsol ApsaraMQ for RocketMQ.
*/
properties.setProperty(PropertyKeyConst.GROUP_ID, "GID_XXXX");
/**
* ID AccessKey untuk otentikasi Alibaba Cloud. Buat ini di konsol RAM Alibaba Cloud.
* Pasangan Kunci Akses akun Alibaba Cloud memiliki izin atas semua operasi API. Kami menyarankan agar Anda menggunakan Pengguna RAM untuk akses API atau O&M rutin guna mencegah risiko keamanan.
* Kami sangat menyarankan agar Anda tidak menyematkan ID AccessKey dan Rahasia AccessKey secara langsung dalam kode proyek Anda. Hal ini dapat menimbulkan risiko keamanan jika kredensial bocor.
* Contoh ini menunjukkan cara menyimpan ID AccessKey dan Rahasia AccessKey dalam variabel lingkungan.
*/
properties.put(PropertyKeyConst.AccessKey, System.getenv("MQTT_AK_ENV"));
/**
* Rahasia AccessKey untuk otentikasi Alibaba Cloud. Buat ini di konsol RAM Alibaba Cloud. Ini hanya diperlukan untuk otentikasi berbasis signature.
*/
properties.put(PropertyKeyConst.SecretKey, System.getenv("MQTT_SK_ENV"));
/**
* Tetapkan titik akhir TCP untuk instans ApsaraMQ for RocketMQ. Dapatkan ini dari halaman detail instans di konsol ApsaraMQ for RocketMQ.
*/
properties.put(PropertyKeyConst.NAMESRV_ADDR, "XXXX");
/**
* Saat menggunakan konsumen ApsaraMQ for RocketMQ untuk memproses notifikasi status klien MQTT, berlanggananlah ke topik notifikasi status.
*/
final String parentTopic = "GID_XXXX_MQTT";
/**
* Data status klien. Di lingkungan produksi, kami menyarankan menggunakan penyimpanan persisten eksternal seperti database atau Redis untuk menyimpan informasi ini.
* Hal ini mencegah kehilangan data jika aplikasi dimulai ulang. Contoh ini menggunakan implementasi in-memory sederhana.
*/
MqttClientStatusStore mqttClientStatusStore = new MemoryHashMapStoreImpl();
Consumer consumer = ONSFactory.createConsumer(properties);
/**
* Contoh ini hanya memproses apakah klien sedang online, sehingga kami hanya perlu memperhatikan event 'connect' dan 'tcpclean'.
*/
consumer.subscribe(parentTopic, "connect||tcpclean", new MqttClientStatusNoticeListener(mqttClientStatusStore));
consumer.start();
String clientId = "GID_XXXXX@@@XXXXX";
while (true) {
System.out.println("ClientStatus :" + checkClientOnline(clientId, mqttClientStatusStore));
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* Logika untuk memproses notifikasi koneksi dan pemutusan.
* Dalam penerapan nyata, aplikasi yang mengonsumsi notifikasi status mungkin diterapkan di beberapa mesin.
* Oleh karena itu, data status online klien harus dipertahankan di penyimpanan eksternal bersama seperti database atau Redis.
* Selain itu, Anda harus menangani idempotensi pesan untuk mencegah error mesin keadaan akibat pesan duplikat.
*/
static class MqttClientStatusNoticeListener implements MessageListener {
private MqttClientStatusStore mqttClientStatusStore;
public MqttClientStatusNoticeListener(
MqttClientStatusStore mqttClientStatusStore) {
this.mqttClientStatusStore = mqttClientStatusStore;
}
@Override
public Action consume(Message message, ConsumeContext context) {
try {
JSONObject msgBody = JSON.parseObject(new String(message.getBody()));
System.out.println(msgBody);
String eventType = msgBody.getString("eventType");
String clientId = msgBody.getString("clientId");
String channelId = msgBody.getString("channelId");
ClientStatusEvent event = new ClientStatusEvent();
event.setChannelId(channelId);
event.setClientIp(msgBody.getString("clientIp"));
event.setEventType(eventType);
event.setTime(msgBody.getLong("time"));
/**
* Pertama, simpan event baru tersebut.
*/
mqttClientStatusStore.addEvent(clientId, channelId, eventType, event);
/**
* Baca daftar event untuk channel saat ini.
*/
Set<ClientStatusEvent> events = mqttClientStatusStore.getEvent(clientId, channelId);
if (events == null || events.isEmpty()) {
return Action.CommitMessage;
}
/**
* Jika daftar event telah menerima event online dan offline, maka channel saat ini telah terputus.
* Anda kemudian dapat menghapus data untuk channel ini.
*/
boolean findOnlineEvent = false;
boolean findOfflineEvent = false;
for (ClientStatusEvent clientStatusEvent : events) {
if (clientStatusEvent.isOnlineEvent()) {
findOnlineEvent = true;
} else {
findOfflineEvent = true;
}
}
if (findOnlineEvent && findOfflineEvent) {
mqttClientStatusStore.deleteEvent(clientId, channelId);
}
return Action.CommitMessage;
} catch (Throwable e) {
e.printStackTrace();
}
return Action.ReconsumeLater;
}
}
/**
* Periksa apakah ID klien memiliki koneksi TCP aktif berdasarkan tabel status.
* 1. Jika tidak ada peta channel, klien pasti offline.
* 2. Jika peta channel tidak kosong, periksa apakah data channel hanya berisi event online. Jika ya, artinya ada koneksi aktif dan klien sedang online.
* Jika semua channel memiliki event pemutusan, klien pasti offline.
*
* @param clientId
* @param mqttClientStatusStore
* @return
*/
public static boolean checkClientOnline(String clientId,
MqttClientStatusStore mqttClientStatusStore) {
Map<String, Set<ClientStatusEvent>> channelMap = mqttClientStatusStore.getEventsByClientId(clientId);
if (channelMap == null) {
return false;
}
for (Set<ClientStatusEvent> events : channelMap.values()) {
boolean findOnlineEvent = false;
boolean findOfflineEvent = false;
for (ClientStatusEvent event : events) {
if (event.isOnlineEvent()) {
findOnlineEvent = true;
} else {
findOfflineEvent = true;
}
}
if (findOnlineEvent & !findOfflineEvent) {
return true;
}
}
return false;
}
}
3. Verifikasi hasil
-
Jalankan fungsi main di kelas MQ4IoTSendMessageToMQ4IoTUseSignatureMode.java untuk mensimulasikan klien yang online. Anda dapat melakukan operasi berikut untuk memeriksa status klien dan status pengiriman pesan.
Catatan
Untuk mensimulasikan pemutusan klien, hentikan eksekusi metode main.
-
Periksa status klien. Di Konsol ApsaraMQ for MQTT, buka halaman Device Status Query. Gunakan Device ID untuk memeriksa klien dan verifikasi bahwa klien sedang online.
-
Memeriksa pengiriman paket event. Di Konsol ApsaraMQ for RocketMQ, buka halaman Message Query, lalu cari berdasarkan topik dan pastikan paket event online telah terkirim.
Dalam hasil kueri, Tag untuk pesan ini adalah connect, yang menunjukkan event klien online.
-
Jalankan metode main di kelas MQTTClientStatusNoticeProcessDemo.java. Konsumen menerima pesan event online, dan status ClientStatus berubah dari false menjadi true.
D:\tools\jdk1.8.0_361\bin\java.exe ...
ClientStatus :false
ClientStatus :false
ClientStatus :false
{"clientId":"GID_xxx1","clientIp":"2xxx9","eventIndex":1726205886624,"eventType":"connect","time":1726205886624,"channelId":"3xxx1245b8cd"}
ClientStatus :true
ClientStatus :true
ClientStatus :true
ClientStatus :true
ClientStatus :true