Informasi latar belakang
Dalam skenario bisnis nyata, broker ApsaraMQ for MQTT perlu mengumpulkan dan menganalisis data terkait perubahan status klien serta mendorong pesan berdasarkan status tersebut. ApsaraMQ for MQTT memungkinkan Anda mendapatkan status klien melalui notifikasi status asinkron. Ketika klien ApsaraMQ for MQTT online atau offline, notifikasi dihasilkan pada broker ApsaraMQ for MQTT. Anda dapat menggunakan metode berikut untuk mendapatkan notifikasi tersebut:
Sambungkan ke broker ApsaraMQ for MQTT menggunakan SDK cloud. Untuk informasi lebih lanjut, lihat Dapatkan Status Klien ApsaraMQ for MQTT.
Buat aturan notifikasi status klien. Setelah membuat aturan, notifikasi status klien dikirim ke ApsaraMQ for RocketMQ. Anda dapat berlangganan pesan di ApsaraMQ for RocketMQ untuk mendapatkan status klien.
Topik ini menjelaskan cara membuat aturan notifikasi status klien untuk mendapatkan status klien.

Akses jaringan
ApsaraMQ for MQTT menyediakan Public Endpoint dan VPC Endpoint.
Public Endpoint adalah alamat IP yang digunakan untuk mengakses ApsaraMQ for MQTT melalui Internet. Umumnya, endpoint publik digunakan dalam skenario IoT dan mobile Internet.
VPC Endpoint adalah alamat IP yang digunakan untuk mengakses ApsaraMQ for MQTT di private virtual cloud (VPC). Umumnya, endpoint VPC digunakan oleh aplikasi cloud untuk terhubung ke ApsaraMQ for MQTT.
Penting Jika Anda ingin menggunakan endpoint untuk menghubungkan klien ke ApsaraMQ for MQTT, gunakan nama domain daripada alamat IP karena alamat IP berubah secara dinamis. Tim teknis ApsaraMQ for MQTT tidak bertanggung jawab atas kesalahan dan kerugian langsung atau tidak langsung dalam skenario berikut:
Anda menggunakan alamat IP untuk mengakses klien Anda ke ApsaraMQ for MQTT. Setelah tim teknis ApsaraMQ for MQTT memperbarui resolusi nama domain, alamat IP asli menjadi tidak valid.
Kebijakan firewall pada alamat IP disetel di jaringan tempat klien Anda beroperasi. Setelah tim teknis ApsaraMQ for MQTT memperbarui resolusi nama domain, alamat IP baru diblokir karena kebijakan firewall.
Prasyarat
Lingkungan pengembangan terintegrasi (IDE) telah diinstal. Untuk informasi lebih lanjut, lihat IDE. Anda dapat menggunakan IntelliJ IDEA atau Eclipse. Dalam contoh ini, IntelliJ IDEA digunakan.
Java 8 atau 11 telah diinstal. Untuk informasi lebih lanjut, lihat Unduhan Java.
Instansi ApsaraMQ for MQTT telah dibuat, dan topik serta grup telah dibuat pada instansi tersebut. Untuk informasi lebih lanjut, lihat Buat Sumber Daya.
Instansi ApsaraMQ for RocketMQ telah dibuat, dan topik serta grup telah dibuat pada instansi tersebut. Untuk informasi lebih lanjut, lihat Langkah 2: Buat Sumber Daya.
Penting Aturan notifikasi status klien ApsaraMQ for MQTT hanya dapat mengirim notifikasi status klien ke instansi ApsaraMQ for RocketMQ versi 4.x.
Aturan notifikasi status klien ApsaraMQ for MQTT tidak dapat digunakan lintas wilayah. Saat membuat aturan notifikasi status klien, pastikan semua sumber daya ApsaraMQ for MQTT dan ApsaraMQ for RocketMQ berada di wilayah yang sama.
1. Buat aturan notifikasi status klien
Masuk ke Konsol ApsaraMQ for MQTT. Di bilah navigasi sebelah kiri, klik Instances.
Di bilah navigasi atas, pilih wilayah tempat instansi yang ingin Anda kelola berada. Pada halaman Instansi, klik nama instansi untuk masuk ke halaman Instance Details.
Di bilah navigasi sebelah kiri, klik Rules. Di sudut kiri atas halaman Aturan, klik Create Rule.
Di wizard Create Rule, lakukan langkah-langkah berikut:
Di langkah Konfigurasi Informasi Dasar, masukkan ID aturan dan pilih Notifikasi Status Klien untuk parameter Jenis Aturan.

Di langkah Konfigurasi Sumber Aturan, pilih grup yang telah dibuat pada instansi ApsaraMQ for MQTT.

Di langkah Konfigurasi Tujuan Aturan, pilih instansi ApsaraMQ for RocketMQ yang telah dibuat dan topik yang telah dibuat pada instansi tersebut.

2. Persiapkan kode uji
Anda harus menggunakan kode untuk mengubah status klien dan memproses notifikasi status klien. Dalam topik ini, kode sampel dalam Java disediakan.
2.1 Unduh kode sampel
Unduh proyek demo mqtt-java-demo dan ekstrak paket proyek demo ke folder di mesin lokal Anda.
Di proyek demo yang diekstrak, temukan folder lmq-java-demo, impor folder ke IntelliJ IDEA, dan konfirmasi apakah dependensi berikut termasuk dalam file pom.xml:
<dependencies>
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcprov-jdk15on</artifactId>
<version>1.70</version>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.10</version>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.2</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.2</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83</version>
</dependency>
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>ons-client</artifactId>
<version>1.8.5.Final</version>
</dependency>
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>aliyun-java-sdk-onsmqtt</artifactId>
<version>1.0.3</version>
</dependency>
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>aliyun-java-sdk-core</artifactId>
<version>4.5.0</version>
</dependency>
</dependencies>
Konfigurasikan kredensial akses.
Dapatkan pasangan AccessKey. Untuk informasi tentang cara mendapatkan pasangan AccessKey, lihat Buat Pasangan AccessKey.
Konfigurasikan variabel lingkungan. Nama variabel lingkungan ID AccessKey yang digunakan untuk mengakses ApsaraMQ for MQTT adalah MQTT_AK_ENV, dan nama variabel lingkungan Rahasia AccessKey yang digunakan untuk mengakses ApsaraMQ for MQTT adalah MQTT_SK_ENV. Untuk informasi tentang cara mengonfigurasi variabel lingkungan, lihat Konfigurasikan Kredensial Akses.
2.2 Konfigurasikan kode untuk mengubah status klien
Di kelas MQ4IoTSendMessageToMQ4IoTUseSignatureMode.java, konfigurasikan parameter terkait sumber daya ApsaraMQ for MQTT berdasarkan komentar dalam kode.
Selama pengujian, Anda hanya perlu mensimulasikan perubahan status. Oleh karena itu, Anda dapat menghapus kode terkait pengiriman pesan. Contoh kode:
Contoh Kode untuk Perubahan Status Klien
import com.aliyun.openservices.lmq.example.util.ConnectionOptionWrapper;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class MQ4IoTSendMessageToMQ4IoTUseSignatureMode {
public static void main(String[] args) throws Exception {
/**
* ID instansi ApsaraMQ for MQTT. Anda dapat memperoleh ID di konsol ApsaraMQ for MQTT setelah Anda membeli instansi.
*/
String instanceId = "XXXXX";
/**
* Endpoint instansi ApsaraMQ for MQTT. Anda dapat memperoleh endpoint di konsol ApsaraMQ for MQTT setelah Anda membeli dan mengonfigurasi instansi. Anda harus menggunakan nama domain yang ditetapkan daripada alamat IP untuk terhubung ke instansi. Jika tidak, pengecualian mungkin terjadi pada objek klien.
*/
String endPoint = "XXXXX.mqtt.aliyuncs.com";
/**
* ID AccessKey, yang dapat diperoleh di konsol Resource Access Management (RAM).
* Pasangan AccessKey akun Alibaba Cloud memiliki izin untuk semua operasi API. Untuk mencegah risiko keamanan, kami sarankan Anda menggunakan pengguna RAM untuk memanggil operasi API atau melakukan pemeliharaan rutin.
* Kami sangat menyarankan agar Anda tidak menyimpan pasangan AccessKey di kode proyek. Jika tidak, pasangan AccessKey mungkin bocor dan semua sumber daya di akun Anda mungkin terpapar pada risiko keamanan potensial.
* Dalam contoh ini, ID AccessKey dan Rahasia AccessKey disimpan dalam variabel lingkungan. Sebelum Anda menjalankan kode sampel, pastikan bahwa variabel lingkungan MQTT_AK_ENV dan MQTT_SK_ENV dikonfigurasi.
* Contoh: export MQTT_AK_ENV=<access_key_id>
* export MQTT_SK_ENV=<access_key_secret>
* Ganti <access_key_id> dengan ID AccessKey Anda dan <access_key_secret> dengan Rahasia AccessKey Anda.
*/
String accessKey = System.getenv("MQTT_AK_ENV");
/**
* Rahasia AccessKey, yang dapat diperoleh di konsol RAM. Parameter ini diperlukan hanya jika Anda menggunakan mode otentikasi tanda tangan.
*/
String secretKey = System.getenv("MQTT_SK_ENV");
/**
* ID unik global yang sistem tetapkan untuk klien ApsaraMQ for MQTT. ID klien harus bervariasi berdasarkan koneksi TCP. Jika beberapa koneksi TCP menggunakan ID klien yang sama, pengecualian terjadi dan koneksi ditutup secara tak terduga.
* Nilai parameter clientId dalam format GroupID@@@DeviceId. GroupID menentukan ID grup yang Anda buat di konsol ApsaraMQ for MQTT dan DeviceId menentukan ID perangkat kustom. Nilai parameter clientId dapat mencapai hingga 64 karakter panjangnya.
*/
String clientId = "GID_XXXXX@@@XXXXX";
ConnectionOptionWrapper connectionOptionWrapper = new ConnectionOptionWrapper(instanceId, accessKey, secretKey, clientId);
final MemoryPersistence memoryPersistence = new MemoryPersistence();
/**
* Protokol dan port yang digunakan oleh objek klien ApsaraMQ for MQTT harus cocok. Untuk informasi lebih lanjut, lihat https://www.alibabacloud.com/help/doc-detail/44866.htm?spm=a2c4g.11186623.6.552.25302386RcuYFB.
Jika enkripsi Secure Sockets Layer (SSL) digunakan, tentukan ssl://endpoint:8883 sebagai protokol dan port.
*/
final MqttClient mqttClient = new MqttClient("tcp://" + endPoint + ":1883", clientId, memoryPersistence);
/**
* Periode timeout selama klien ApsaraMQ for MQTT menunggu respons. Periode timeout mencegah klien ApsaraMQ for MQTT menunggu respons untuk periode waktu yang tidak terbatas.
*/
mqttClient.setTimeToWait(5000);
final ExecutorService executorService = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
mqttClient.setCallback(new MqttCallbackExtended() {
@Override
public void connectComplete(boolean reconnect, String serverURI) {
System.out.println("koneksi berhasil");
}
@Override
public void connectionLost(Throwable throwable) {
throwable.printStackTrace();
}
@Override
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
System.out.println("menerima pesan dari topik " + s + " , isi adalah " + new String(mqttMessage.getPayload()));
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
System.out.println("pesan terkirim sukses topik adalah : " + iMqttDeliveryToken.getTopics()[0]);
}
});
mqttClient.connect(connectionOptionWrapper.getMqttConnectOptions());
Thread.sleep(Long.MAX_VALUE);
}
}
2.3 Konfigurasikan kode untuk memproses notifikasi status klien
Setelah notifikasi status klien dikirim ke ApsaraMQ for RocketMQ, Anda dapat berlangganan pesan yang sesuai dan kemudian memproses notifikasi berdasarkan kebutuhan bisnis Anda.
Di kelas MQTTClientStatusNoticeProcessDemo.java, konfigurasikan parameter terkait sumber daya ApsaraMQ for RocketMQ berdasarkan komentar dalam kode.
Contoh Kode untuk Memproses Notifikasi Status Klien
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
public class MQTTClientStatusNoticeProcessDemo {
public static void main(String[] args) {
/**
* Inisialisasi klien ApsaraMQ for RocketMQ sebagai penerima. Dalam sebagian besar skenario bisnis, penerima diterapkan di aplikasi backend.
*/
Properties properties = new Properties();
/**
* ID grup yang Anda buat di konsol ApsaraMQ for RocketMQ.
*/
properties.setProperty(PropertyKeyConst.GROUP_ID, "GID_XXXX");
/**
* ID AccessKey yang Anda buat di konsol RAM Alibaba Cloud untuk otentikasi identitas.
* Pasangan AccessKey akun Alibaba Cloud memiliki izin untuk semua operasi API. Untuk mencegah risiko keamanan, kami sarankan Anda menggunakan pengguna RAM untuk memanggil operasi API atau melakukan pemeliharaan rutin.
* Kami sangat menyarankan agar Anda tidak menyimpan pasangan AccessKey di kode proyek. Jika tidak, pasangan AccessKey mungkin bocor dan semua sumber daya di akun Anda mungkin terpapar pada risiko keamanan potensial.
* Dalam contoh ini, ID AccessKey dan Rahasia AccessKey disimpan dalam variabel lingkungan.
*/
properties.put(PropertyKeyConst.AccessKey, System.getenv("MQTT_AK_ENV"));
/**
* Rahasia AccessKey yang Anda buat di konsol RAM Alibaba Cloud untuk otentikasi identitas. Parameter ini diperlukan hanya jika Anda menggunakan mode otentikasi tanda tangan.
*/
properties.put(PropertyKeyConst.SecretKey, System.getenv("MQTT_SK_ENV"));
/**
* Endpoint TCP yang digunakan untuk mengakses instansi ApsaraMQ for RocketMQ. Anda dapat memperoleh endpoint TCP di halaman Detail Instansi di konsol ApsaraMQ for RocketMQ.
*/
properties.put(PropertyKeyConst.NAMESRV_ADDR, "XXXX");
/**
* Topik ApsaraMQ for RocketMQ yang ingin Anda gunakan untuk memproses notifikasi status klien dari ApsaraMQ for MQTT.
*/
final String parentTopic = "GID_XXXX_MQTT";
/**
* Data status klien. Dalam lingkungan produksi, kami sarankan Anda menggunakan sistem penyimpanan persisten eksternal, seperti database atau sistem Redis, untuk menyimpan data status dan mencegah hilangnya data status saat aplikasi dimulai ulang. Dalam contoh ini, data status disimpan di mesin lokal.
*/
MqttClientStatusStore mqttClientStatusStore = new MemoryHashMapStoreImpl();
Consumer consumer = ONSFactory.createConsumer(properties);
/**
* Dalam contoh ini, hanya data terkait apakah klien online diproses. Oleh karena itu, Anda hanya perlu memperhatikan acara connect dan tcpclean.
*/
consumer.subscribe(parentTopic, "connect||tcpclean", new MqttClientStatusNoticeListener(mqttClientStatusStore));
consumer.start();
String clientId = "GID_XXXXX@@@XXXXX";
while (true) {
System.out.println("ClientStatus :" + checkClientOnline(clientId, mqttClientStatusStore));
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* Logika yang digunakan untuk memproses notifikasi status klien.
* Selama proses penyebaran aktual, aplikasi yang memproses notifikasi status mungkin diterapkan di beberapa mesin. Oleh karena itu, data status klien dapat dipertahankan di penyimpanan bersama eksternal seperti database atau sistem Redis.
* Jika mesin negara menerima pesan berulang kali, lakukan idempotensi pada pesan untuk mencegah kesalahan yang mungkin terjadi.
*/
static class MqttClientStatusNoticeListener implements MessageListener {
private MqttClientStatusStore mqttClientStatusStore;
public MqttClientStatusNoticeListener(
MqttClientStatusStore mqttClientStatusStore) {
this.mqttClientStatusStore = mqttClientStatusStore;
}
@Override
public Action consume(Message message, ConsumeContext context) {
try {
JSONObject msgBody = JSON.parseObject(new String(message.getBody()));
System.out.println(msgBody);
String eventType = msgBody.getString("eventType");
String clientId = msgBody.getString("clientId");
String channelId = msgBody.getString("channelId");
ClientStatusEvent event = new ClientStatusEvent();
event.setChannelId(channelId);
event.setClientIp(msgBody.getString("clientIp"));
event.setEventType(eventType);
event.setTime(msgBody.getLong("time"));
/**
* Simpan acara baru terlebih dahulu.
*/
mqttClientStatusStore.addEvent(clientId, channelId, eventType, event);
/**
* Baca daftar acara saluran saat ini.
*/
Set<ClientStatusEvent> events = mqttClientStatusStore.getEvent(clientId, channelId);
if (events == null || events.isEmpty()) {
return Action.CommitMessage;
}
/**
* Jika semua acara online dan offline dalam daftar diterima dan saluran saat ini ditutup, Anda dapat menghapus data saluran.
*/
boolean findOnlineEvent = false;
boolean findOfflineEvent = false;
for (ClientStatusEvent clientStatusEvent : events) {
if (clientStatusEvent.isOnlineEvent()) {
findOnlineEvent = true;
} else {
findOfflineEvent = true;
}
}
if (findOnlineEvent && findOfflineEvent) {
mqttClientStatusStore.deleteEvent(clientId, channelId);
}
return Action.CommitMessage;
} catch (Throwable e) {
e.printStackTrace();
}
return Action.ReconsumeLater;
}
}
/**
* Periksa apakah klien ApsaraMQ for MQTT memiliki koneksi TCP aktif berdasarkan tabel saluran.
* 1. Jika tabel saluran kosong, klien ApsaraMQ for MQTT offline.
* 2. Jika tabel saluran tidak kosong, periksa apakah hanya acara online yang diterima dalam koneksi. Jika ya, koneksi aktif ada dan klien ApsaraMQ for MQTT online.
* Jika acara offline diterima di semua saluran, klien ApsaraMQ for MQTT offline.
*
* @param clientId
* @param mqttClientStatusStore
* @return
*/
public static boolean checkClientOnline(String clientId,
MqttClientStatusStore mqttClientStatusStore) {
Map<String, Set<ClientStatusEvent>> channelMap = mqttClientStatusStore.getEventsByClientId(clientId);
if (channelMap == null) {
return false;
}
for (Set<ClientStatusEvent> events : channelMap.values()) {
boolean findOnlineEvent = false;
boolean findOfflineEvent = false;
for (ClientStatusEvent event : events) {
if (event.isOnlineEvent()) {
findOnlineEvent = true;
} else {
findOfflineEvent = true;
}
}
if (findOnlineEvent & !findOfflineEvent) {
return true;
}
}
return false;
}
}
3. Verifikasi hasil
Jalankan fungsi utama di kelas MQ4IoTSendMessageToMQ4IoTUseSignatureMode.java untuk mensimulasikan klien yang online dan gunakan salah satu metode berikut untuk memeriksa status klien dan status pengiriman pesan:
Catatan Anda dapat menghentikan eksekusi fungsi utama untuk mensimulasikan klien yang offline.
Periksa Status Klien. Masuk ke halaman detail instansi di konsol ApsaraMQ for MQTT dan klik Device Status Query di bilah navigasi sebelah kiri. Di halaman Kueri Status Perangkat, periksa apakah klien online menggunakan ID perangkat.

Periksa Apakah Pesan Dikirim. Masuk ke halaman detail instansi di konsol ApsaraMQ for RocketMQ dan klik Message Query di bilah navigasi sebelah kiri. Di halaman Kueri Pesan, periksa apakah pesan dikirim berdasarkan topik.

Jalankan fungsi utama di kelas MQTTClientStatusNoticeProcessDemo.java. Jika pesan diterima, nilai dari ClientStatus berubah dari false menjadi true, seperti yang ditunjukkan pada gambar berikut.
