Saat perangkat IoT atau aplikasi seluler perlu bertukar pesan secara langsung tanpa perantara backend, Anda dapat menghubungkan publisher dan subscriber sebagai klien ApsaraMQ for MQTT. Setiap klien menggunakan Eclipse Paho Java SDK untuk terhubung ke broker ApsaraMQ for MQTT melalui Internet, mempublikasikan pesan ke topik, dan menerima pesan dari topik yang telah di-subscribe.

Prasyarat
Sebelum memulai, pastikan Anda telah:
Membuat sumber daya ApsaraMQ for MQTT (instans, ID grup, dan topik induk)
Memiliki pasangan AccessKey untuk autentikasi
JDK telah terinstal
IDE seperti IntelliJ IDEA atau Eclipse
Endpoints
Tentukan endpoint instans ApsaraMQ for MQTT Anda saat terhubung melalui client SDK. Format endpoint bergantung pada metode akses jaringan yang digunakan:
| Metode akses | Format endpoint | Kasus penggunaan umum |
|---|---|---|
| Publik (Internet) | <instance-id>.mqtt.aliyuncs.com | Perangkat IoT, aplikasi seluler |
| VPC (jaringan pribadi) | <instance-id>-internal-vpc.mqtt.aliyuncs.com | Aplikasi backend di cloud |
Temukan endpoint Anda di tab Endpoints pada halaman Instance Details di Konsol ApsaraMQ for MQTT. ID instans ditampilkan di bagian Basic Information.
Selalu gunakan nama domain, bukan alamat IP. Alamat IP dapat berubah sewaktu-waktu tanpa pemberitahuan ketika resolusi nama domain diperbarui. Alibaba Cloud tidak bertanggung jawab atas kegagalan koneksi yang disebabkan oleh hardcoding alamat IP atau aturan firewall berbasis IP.
Langkah 1: Tambahkan dependensi Maven
Klon atau unduh proyek demo, lalu verifikasi bahwa pom.xml mencakup dependensi berikut:
<dependencies>
<!-- Eclipse Paho: library klien MQTT 3.1.1 -->
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.2</version>
</dependency>
<!-- Commons Codec: perhitungan signature HMAC untuk autentikasi -->
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.10</version>
</dependency>
<!-- Apache HttpClient: permintaan HTTP untuk autentikasi berbasis token -->
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.2</version>
</dependency>
<!-- Fastjson: penguraian JSON untuk tanggapan API -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83</version>
</dependency>
<!-- Alibaba Cloud SDK untuk MQTT: panggilan API sisi server -->
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>aliyun-java-sdk-onsmqtt</artifactId>
<version>1.0.3</version>
</dependency>
<!-- Alibaba Cloud Core SDK: infrastruktur SDK bersama -->
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>aliyun-java-sdk-core</artifactId>
<version>4.5.0</version>
</dependency>
</dependencies>Untuk kode sumber dan dokumentasi Eclipse Paho Java Client, lihat Eclipse Paho Java Client.
Langkah 2: Atur variabel lingkungan
Simpan pasangan AccessKey Anda dalam variabel lingkungan untuk mencegah paparan kredensial yang tidak disengaja dalam kode sumber.
export MQTT_AK_ENV=<your-access-key-id>
export MQTT_SK_ENV=<your-access-key-secret>| Placeholder | Deskripsi | Contoh |
|---|---|---|
<your-access-key-id> | ID AccessKey dari konsol RAM | LTAI5tXxx |
<your-access-key-secret> | Rahasia AccessKey dari konsol RAM | xXxXxXx |
Gunakan pasangan AccessKey milik Pengguna RAM, bukan pasangan AccessKey Akun Alibaba Cloud. Pasangan AccessKey Akun Alibaba Cloud memberikan akses ke semua operasi API dan menimbulkan risiko keamanan jika bocor. Untuk detailnya, lihat Konfigurasikan kredensial akses.
Langkah 3: Tentukan parameter koneksi
Atur parameter berikut berdasarkan sumber daya yang telah Anda buat di Konsol ApsaraMQ for MQTT:
// ID instans dari bagian Basic Information di konsol
String instanceId = "<your-instance-id>";
// Endpoint dari tab Endpoints di konsol
String endPoint = "<your-instance-id>.mqtt.aliyuncs.com";
// Kredensial dari variabel lingkungan
String accessKey = System.getenv("MQTT_AK_ENV");
// Rahasia AccessKey diperlukan untuk mode autentikasi signature
String secretKey = System.getenv("MQTT_SK_ENV");
// Format ID klien: GroupID@@@DeviceID (maksimal 64 karakter)
// Setiap koneksi TCP harus menggunakan ID klien yang unik.
// ID klien duplikat menyebabkan konflik koneksi dan pemutusan tak terduga.
String clientId = "<your-group-id>@@@<your-device-id>";
// Topik induk yang dibuat di konsol
final String parentTopic = "<your-parent-topic>";
// Subtopik untuk penyaringan pesan (total maksimal 128 karakter)
final String mq4IotTopic = parentTopic + "/" + "testMq4Iot";
// Tingkat QoS: 0 (at most once), 1 (at least once), atau 2 (exactly once)
final int qosLevel = 0;Jika klien mempublikasikan atau berlangganan ke topik yang tidak ada atau yang tidak diizinkan diakses oleh klien tersebut, broker akan segera menutup koneksi.
Langkah 4: Buat klien MQTT dan daftarkan callback
Daftarkan callback sebelum melakukan koneksi. Jika Anda mendaftarkan callback setelah connect(), Anda mungkin melewatkan pesan—terutama saat melanjutkan sesi persisten.
Buat klien MQTT:
ConnectionOptionWrapper connectionOptionWrapper =
new ConnectionOptionWrapper(instanceId, accessKey, secretKey, clientId);
final MemoryPersistence memoryPersistence = new MemoryPersistence();
// Protokol dan port harus sesuai:
// TCP -> tcp://endpoint:1883
// SSL -> ssl://endpoint:8883
final MqttClient mqttClient = new MqttClient(
"tcp://" + endPoint + ":1883", clientId, memoryPersistence);
// Timeout untuk menunggu tanggapan broker (milidetik)
mqttClient.setTimeToWait(5000);
final ExecutorService executorService = new ThreadPoolExecutor(
1, 1, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());Siapkan callback event untuk menangani event koneksi dan pesan masuk:
mqttClient.setCallback(new MqttCallbackExtended() {
@Override
public void connectComplete(boolean reconnect, String serverURI) {
// Berlangganan segera setelah setiap koneksi (termasuk rekoneksi)
System.out.println("connect success");
executorService.submit(() -> {
try {
final String[] topicFilter = {mq4IotTopic};
final int[] qos = {qosLevel};
mqttClient.subscribe(topicFilter, qos);
} catch (MqttException e) {
e.printStackTrace();
}
});
}
@Override
public void connectionLost(Throwable throwable) {
throwable.printStackTrace();
}
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
// Proses pesan masuk di sini.
// Jangan lemparkan exception dari callback ini. Broker menganggap
// callback yang dikembalikan secara normal sebagai acknowledgment pesan.
// Terapkan deduplikasi untuk memastikan konsumsi idempoten.
System.out.println(
"receive msg from topic " + topic + " , body is "
+ new String(mqttMessage.getPayload()));
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
System.out.println("send msg succeed topic is : "
+ iMqttDeliveryToken.getTopics()[0]);
}
});Perilaku utama callback:
| Callback | Perilaku |
|---|---|
connectComplete | Dipanggil pada setiap koneksi, termasuk rekoneksi otomatis. Lakukan subscribe ulang di sini untuk memulihkan langganan. |
connectionLost | Dipanggil saat koneksi terputus. Catat error atau terapkan logika rekoneksi. |
messageArrived | Dipanggil saat pesan tiba. Jangan lemparkan exception—broker menganggap callback yang dikembalikan secara normal sebagai acknowledgment. Konsumsi pesan dalam periode timeout. Terapkan deduplikasi untuk pemrosesan idempoten. |
deliveryComplete | Dipanggil saat pesan QoS 1 atau QoS 2 berhasil dikirim. |
Langkah 5: Koneksi dan publikasi pesan
Lakukan koneksi ke broker dan publikasikan pesan. Contoh ini mengirimkan pesan pub/sub biasa dan pesan point-to-point (P2P):
mqttClient.connect(connectionOptionWrapper.getMqttConnectOptions());
for (int i = 0; i < 10; i++) {
// Publikasikan pesan biasa ke topik yang di-subscribe
MqttMessage message = new MqttMessage("hello mq4Iot pub sub msg".getBytes());
message.setQos(qosLevel);
mqttClient.publish(mq4IotTopic, message);
// Publikasikan pesan P2P langsung ke klien tertentu
// Format topik P2P: {parentTopic}/p2p/{targetClientId}
// Klien target tidak perlu berlangganan ke topik ini.
final String p2pSendTopic = parentTopic + "/p2p/" + clientId;
message = new MqttMessage("hello mq4Iot p2p msg".getBytes());
message.setQos(qosLevel);
mqttClient.publish(p2pSendTopic, message);
}
// Pertahankan klien tetap berjalan agar terus menerima pesan
Thread.sleep(Long.MAX_VALUE);Pesan P2P: Saat Anda mengetahui klien target secara pasti, kirimkan pesan ke topik {parentTopic}/p2p/{targetClientId}. Klien target menerima pesan tanpa perlu berlangganan. Hal ini menyederhanakan skenario di mana pesan ditujukan hanya untuk satu penerima.
Kode lengkap
Daftar berikut menggabungkan semua langkah di atas menjadi satu kelas yang dapat dijalankan. Kode ini juga tersedia di proyek demo di GitHub.
package com.aliyun.openservices.lmq.example.demo;
import com.aliyun.openservices.lmq.example.util.ConnectionOptionWrapper;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class MQ4IoTSendMessageToMQ4IoTUseSignatureMode {
public static void main(String[] args) throws Exception {
// --- Parameter koneksi ---
String instanceId = "<your-instance-id>";
String endPoint = "<your-instance-id>.mqtt.aliyuncs.com";
String accessKey = System.getenv("MQTT_AK_ENV");
String secretKey = System.getenv("MQTT_SK_ENV");
String clientId = "<your-group-id>@@@<your-device-id>";
final String parentTopic = "<your-parent-topic>";
final String mq4IotTopic = parentTopic + "/" + "testMq4Iot";
final int qosLevel = 0;
// --- Buat klien ---
ConnectionOptionWrapper connectionOptionWrapper =
new ConnectionOptionWrapper(instanceId, accessKey, secretKey, clientId);
final MemoryPersistence memoryPersistence = new MemoryPersistence();
final MqttClient mqttClient = new MqttClient(
"tcp://" + endPoint + ":1883", clientId, memoryPersistence);
mqttClient.setTimeToWait(5000);
final ExecutorService executorService = new ThreadPoolExecutor(
1, 1, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
// --- Daftarkan callback sebelum koneksi ---
mqttClient.setCallback(new MqttCallbackExtended() {
@Override
public void connectComplete(boolean reconnect, String serverURI) {
System.out.println("connect success");
executorService.submit(() -> {
try {
final String[] topicFilter = {mq4IotTopic};
final int[] qos = {qosLevel};
mqttClient.subscribe(topicFilter, qos);
} catch (MqttException e) {
e.printStackTrace();
}
});
}
@Override
public void connectionLost(Throwable throwable) {
throwable.printStackTrace();
}
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
System.out.println(
"receive msg from topic " + topic + " , body is "
+ new String(mqttMessage.getPayload()));
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
System.out.println("send msg succeed topic is : "
+ iMqttDeliveryToken.getTopics()[0]);
}
});
// --- Koneksi dan publikasi ---
mqttClient.connect(connectionOptionWrapper.getMqttConnectOptions());
for (int i = 0; i < 10; i++) {
MqttMessage message = new MqttMessage("hello mq4Iot pub sub msg".getBytes());
message.setQos(qosLevel);
mqttClient.publish(mq4IotTopic, message);
final String p2pSendTopic = parentTopic + "/p2p/" + clientId;
message = new MqttMessage("hello mq4Iot p2p msg".getBytes());
message.setQos(qosLevel);
mqttClient.publish(p2pSendTopic, message);
}
Thread.sleep(Long.MAX_VALUE);
}
}Verifikasi hasil
Setelah menjalankan aplikasi, kueri jejak pesan di Konsol ApsaraMQ for MQTT untuk memastikan pesan telah dikirim dan diterima. Untuk langkah-langkah detail, lihat Kueri jejak pesan.