Tutorial ini memandu Anda mengirim pesan dari klien ApsaraMQ for MQTT dan menerimanya di aplikasi layanan backend. Klien menggunakan client SDK (Eclipse Paho) untuk memublikasikan pesan, sedangkan backend menggunakan cloud SDK untuk mengonsumsinya.
Cara kerja
Klien ApsaraMQ for MQTT dan aplikasi layanan backend berkomunikasi melalui broker ApsaraMQ for MQTT. Masing-masing sisi menggunakan SDK yang berbeda:
| SDK | Pustaka | Terhubung ke | Kasus penggunaan |
|---|---|---|---|
| Client SDK | Eclipse Paho Java Client | Perangkat IoT, aplikasi seluler | Publikasikan dan berlangganan pesan melalui MQTT |
| Cloud SDK | ApsaraMQ for MQTT server SDK | Aplikasi layanan backend | Mengonsumsi atau mengirim pesan dalam skala besar, dengan berlangganan pada level topik induk |

Untuk proyek sampel lengkap, lihat Proyek demo atau Demo.
Prasyarat
Sebelum memulai, pastikan Anda telah memiliki:
Instans ApsaraMQ for MQTT dengan sumber daya yang diperlukan (group, topic). Lihat Buat sumber daya.
Pasangan AccessKey. Lihat Dapatkan pasangan AccessKey.
JDK yang telah diinstal.
IntelliJ IDEA atau Eclipse yang telah diinstal (tutorial ini menggunakan IntelliJ IDEA).
Endpoints
Kedua SDK memerlukan endpoint untuk terhubung ke instans ApsaraMQ for MQTT Anda.
Endpoint Client SDK
| Jenis akses | Format | Kasus penggunaan umum |
|---|---|---|
| Public endpoint | <instance-id>.mqtt.aliyuncs.com | Perangkat IoT, aplikasi seluler |
| VPC endpoint | <instance-id>-internal-vpc.mqtt.aliyuncs.com | Klien di dalam VPC |
Anda dapat menemukan endpoint Client SDK di tab Endpoints pada halaman Instance Details di Konsol ApsaraMQ for MQTT.
Endpoint Cloud SDK
| Jenis akses | Format | Kasus penggunaan umum |
|---|---|---|
| Public endpoint | <instance-id>-server-internet.mqtt.aliyuncs.com | Aplikasi backend melalui Internet |
| VPC endpoint | <instance-id>-server-internal.mqtt.aliyuncs.com | Aplikasi backend di dalam VPC |
Tidak semua wilayah mendukung akses Cloud SDK. Lihat Wilayah yang didukung untuk detailnya.
Anda dapat menemukan ID instans di bagian Basic Information pada halaman Instance Details di Konsol ApsaraMQ for MQTT.
Selalu gunakan nama domain, bukan alamat IP. Alamat IP dapat berubah tanpa pemberitahuan selama pembaruan resolusi domain. ApsaraMQ for MQTT tidak bertanggung jawab atas kegagalan koneksi yang disebabkan oleh:
Penggunaan alamat IP yang dikodekan secara tetap (hardcoded) yang menjadi tidak valid setelah pembaruan DNS.
Aturan firewall yang memblokir alamat IP baru setelah pembaruan DNS.
Langkah 1: Siapkan proyek Client SDK
Unduh SDK open source pihak ketiga untuk Java: Eclipse Paho Java Client.
Unduh proyek demo mqtt-java-demo, ekstrak, lalu impor ke IntelliJ IDEA.
Verifikasi bahwa file pom.xml Anda menyertakan dependensi berikut:
<dependencies>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.10</version>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.2</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.2</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83</version>
</dependency>
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>aliyun-java-sdk-onsmqtt</artifactId>
<version>1.0.3</version>
</dependency>
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>aliyun-java-sdk-core</artifactId>
<version>4.5.0</version>
</dependency>
</dependencies>Langkah 2: Konfigurasikan kredensial
Atur variabel lingkungan berikut untuk otentikasi. Lihat Konfigurasikan kredensial akses untuk detailnya.
# ID AccessKey
export MQTT_AK_ENV=<your-access-key-id>
# Rahasia AccessKey
export MQTT_SK_ENV=<your-access-key-secret>Jangan masukkan pasangan AccessKey secara langsung ke dalam kode sumber. Simpan dalam variabel lingkungan untuk menghindari kebocoran kredensial.
Langkah 3: Kirim pesan dengan Client SDK
Buka MQ4IoTProducerDemo.java dan perbarui parameter berikut dengan nilai dari instans ApsaraMQ for MQTT Anda. Lihat Buat sumber daya untuk mengetahui lokasi nilai-nilai tersebut.
| Parameter | Deskripsi | Contoh |
|---|---|---|
instanceId | ID instans ApsaraMQ for MQTT | post-cn-xxxxx |
endPoint | Endpoint Client SDK (lihat bagian Endpoints di atas) | post-cn-xxxxx.mqtt.aliyuncs.com |
clientId | ID klien yang unik secara global dalam format {GroupID}@@@{DeviceID}. Maksimal 64 karakter. Setiap koneksi TCP harus menggunakan ID klien yang berbeda. | GID_test@@@device_001 |
parentTopic | Topik induk yang dibuat di konsol | testTopic |
Hubungkan ke broker
Buat klien MQTT dan bangun koneksi. Gunakan tcp://endpoint:1883 untuk teks biasa atau ssl://endpoint:8883 untuk SSL/TLS.
String instanceId = "XXXXX";
String endPoint = "XXXXX.mqtt.aliyuncs.com";
String accessKey = System.getenv("MQTT_AK_ENV");
String secretKey = System.getenv("MQTT_SK_ENV");
String clientId = "GID_XXXXX@@@XXXXX";
ConnectionOptionWrapper connectionOptionWrapper =
new ConnectionOptionWrapper(instanceId, accessKey, secretKey, clientId);
final MemoryPersistence memoryPersistence = new MemoryPersistence();
final MqttClient mqttClient =
new MqttClient("tcp://" + endPoint + ":1883", clientId, memoryPersistence);
// Atur timeout respons (ms) untuk menghindari penantian tanpa batas
mqttClient.setTimeToWait(5000);ID klien menggunakan format {GroupID}@@@{DeviceID} dan harus unik secara global per koneksi TCP. Penggunaan ulang ID klien di beberapa koneksi menyebabkan broker memutus sesi yang ada.Siapkan callback
Atur callback sebelum melakukan koneksi untuk menghindari kehilangan pesan selama pemulihan sesi.
mqttClient.setCallback(new MqttCallbackExtended() {
@Override
public void connectComplete(boolean reconnect, String serverURI) {
System.out.println("connect success");
}
@Override
public void connectionLost(Throwable throwable) {
throwable.printStackTrace();
}
@Override
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
// Proses pesan dalam batas waktu broker untuk menghindari pengiriman ulang.
// Terapkan deduplikasi untuk memastikan pemrosesan idempoten.
System.out.println(
"receive msg from topic " + s + " , body is "
+ new String(mqttMessage.getPayload()));
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
System.out.println("send msg succeed topic is : "
+ iMqttDeliveryToken.getTopics()[0]);
}
});
mqttClient.connect(connectionOptionWrapper.getMqttConnectOptions());Publikasikan pesan
Publikasikan pesan ke subtopik. Subtopik ditambahkan ke topik induk dan dapat mencapai maksimal 128 karakter.
final String parentTopic = "XXXXX";
final String mq4IotTopic = parentTopic + "/" + "testMq4Iot";
final int qosLevel = 0;
MqttMessage message = new MqttMessage("hello mq4Iot pub sub msg".getBytes());
message.setQos(qosLevel);
mqttClient.publish(mq4IotTopic, message);Level QoS: 0 (at most once), 1 (at least once), atau 2 (exactly once).
Kirim pesan point-to-point (P2P)
Pesan P2P mengirimkan pesan langsung ke klien tertentu. Klien target tidak perlu berlangganan ke topik tersebut. Publikasikan ke {parentTopic}/p2p/{targetClientId}.
String receiverId = "GID_test@@@device_002";
final String p2pSendTopic = parentTopic + "/p2p/" + receiverId;
MqttMessage p2pMessage = new MqttMessage("hello mq4Iot p2p msg".getBytes());
p2pMessage.setQos(qosLevel);
mqttClient.publish(p2pSendTopic, p2pMessage);Kode produsen lengkap
Kode berikut menggabungkan semua langkah di atas. Buka MQ4IoTProducerDemo.java dan perbarui parameternya.
package com.aliyun.openservices.lmq.example.demo;
import com.aliyun.openservices.lmq.example.util.ConnectionOptionWrapper;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class MQ4IoTProducerDemo {
public static void main(String[] args) throws Exception {
// ID instans ApsaraMQ for MQTT
String instanceId = "XXXXX";
// Endpoint Client SDK. Gunakan nama domain, bukan alamat IP.
String endPoint = "XXXXX.mqtt.aliyuncs.com";
// Muat kredensial dari variabel lingkungan
String accessKey = System.getenv("MQTT_AK_ENV");
String secretKey = System.getenv("MQTT_SK_ENV");
// Format ID klien: {GroupID}@@@{DeviceID}
// Harus unik per koneksi TCP. Maksimal 64 karakter.
String clientId = "GID_XXXXX@@@XXXXX";
// Topik induk yang dibuat di konsol ApsaraMQ for MQTT.
// Topik yang tidak valid atau tidak sah menyebabkan broker menutup koneksi.
final String parentTopic = "XXXXX";
// Subtopik untuk penyaringan pesan. Maksimal 128 karakter.
final String mq4IotTopic = parentTopic + "/" + "testMq4Iot";
// Level QoS: 0, 1, atau 2
final int qosLevel = 0;
ConnectionOptionWrapper connectionOptionWrapper =
new ConnectionOptionWrapper(instanceId, accessKey, secretKey, clientId);
final MemoryPersistence memoryPersistence = new MemoryPersistence();
// Hubungkan melalui TCP pada port 1883. Untuk SSL, gunakan ssl://endpoint:8883.
final MqttClient mqttClient =
new MqttClient("tcp://" + endPoint + ":1883", clientId, memoryPersistence);
// Atur timeout respons (ms) untuk menghindari penantian tanpa batas
mqttClient.setTimeToWait(5000);
final ExecutorService executorService = new ThreadPoolExecutor(
1, 1, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
// Atur callback sebelum melakukan koneksi untuk menghindari kehilangan pesan
mqttClient.setCallback(new MqttCallbackExtended() {
@Override
public void connectComplete(boolean reconnect, String serverURI) {
System.out.println("connect success");
}
@Override
public void connectionLost(Throwable throwable) {
throwable.printStackTrace();
}
@Override
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
// Proses pesan dalam batas waktu broker untuk menghindari pengiriman ulang.
// Terapkan deduplikasi untuk memastikan pemrosesan idempoten.
System.out.println(
"receive msg from topic " + s + " , body is "
+ new String(mqttMessage.getPayload()));
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
System.out.println("send msg succeed topic is : "
+ iMqttDeliveryToken.getTopics()[0]);
}
});
mqttClient.connect(connectionOptionWrapper.getMqttConnectOptions());
for (int i = 0; i < 10; i++) {
// Publikasikan ke subtopik (pub/sub standar)
MqttMessage message = new MqttMessage("hello mq4Iot pub sub msg".getBytes());
message.setQos(qosLevel);
mqttClient.publish(mq4IotTopic, message);
// Publikasikan pesan point-to-point (P2P) langsung ke klien tertentu.
// Klien target tidak perlu berlangganan ke topik ini.
// Format topik: {parentTopic}/p2p/{targetClientId}
String receiverId = "xxx";
final String p2pSendTopic = parentTopic + "/p2p/" + receiverId;
message = new MqttMessage("hello mq4Iot p2p msg".getBytes());
message.setQos(qosLevel);
mqttClient.publish(p2pSendTopic, message);
}
Thread.sleep(Long.MAX_VALUE);
}
}Verifikasi produsen
Jalankan MQ4IoTProducerDemo.java. Jika koneksi berhasil, Anda akan melihat output seperti berikut:
connect success
send msg succeed topic is : XXXXX/testMq4Iot
send msg succeed topic is : XXXXX/p2p/xxx
...Langkah 4: Terima pesan dengan Cloud SDK
Cloud SDK menghubungkan aplikasi backend Anda ke broker ApsaraMQ for MQTT. Berbeda dengan Client SDK, Cloud SDK hanya berlangganan ke topik induk (bukan subtopik), sehingga cocok untuk skenario backend di mana aplikasi mengumpulkan dan menganalisis semua pesan di bawah suatu topik.
Siapkan proyek
Unduh Cloud SDK. Lihat Catatan rilis untuk versi terbaru.
Unduh proyek demo: mqtt-server-sdk-demo.
Ekstrak paket dan impor ke IntelliJ IDEA.
Verifikasi bahwa file
pom.xmlAnda menyertakan dependensi berikut:<dependencies> <dependency> <groupId>com.alibaba.mqtt</groupId> <artifactId>server-sdk</artifactId> <version>1.0.0.Final</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.83</version> </dependency> </dependencies>
Konfigurasikan dan jalankan konsumen
Buka MQTTConsumerDemo.java dan perbarui parameter berikut:
| Parameter | Deskripsi | Contoh |
|---|---|---|
domain | Endpoint Cloud SDK (lihat bagian Endpoints di atas) | post-cn-xxxxx-server-internet.mqtt.aliyuncs.com |
port | Port Cloud SDK. Selalu 5672. | 5672 |
instanceId | ID instans ApsaraMQ for MQTT | post-cn-xxxxx |
firstTopic | Topik induk yang akan di-subscribe | testTopic |
Kredensial dimuat dari variabel lingkungan yang sama (MQTT_AK_ENV dan MQTT_SK_ENV) yang dikonfigurasi di Langkah 2.
Hubungkan dan berlangganan
String domain = "post-cn-jaj3h8i****.mqtt.aliyuncs.com";
int port = 5672;
String instanceId = "post-cn-jaj3h8i****";
String accessKey = System.getenv("MQTT_AK_ENV");
String secretKey = System.getenv("MQTT_SK_ENV");
String firstTopic = "firstTopic";
ChannelConfig channelConfig = new ChannelConfig();
channelConfig.setDomain(domain);
channelConfig.setPort(port);
channelConfig.setInstanceId(instanceId);
channelConfig.setAccessKey(accessKey);
channelConfig.setSecretKey(secretKey);
ServerConsumer serverConsumer = new ServerConsumer(channelConfig, new ConsumerConfig());
serverConsumer.start();
serverConsumer.subscribeTopic(firstTopic, new MessageListener() {
@Override
public void process(String msgId, MessageProperties messageProperties,
byte[] payload) {
System.out.println("Receive:" + msgId + ","
+ JSONObject.toJSONString(messageProperties) + ","
+ new String(payload));
}
});Kode konsumen lengkap
package com.aliyun.openservices.lmq.example;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.mqtt.server.ServerConsumer;
import com.alibaba.mqtt.server.callback.MessageListener;
import com.alibaba.mqtt.server.config.ChannelConfig;
import com.alibaba.mqtt.server.config.ConsumerConfig;
import com.alibaba.mqtt.server.model.MessageProperties;
public class MQTTConsumerDemo {
public static void main(String[] args) throws Exception {
// Endpoint Cloud SDK. Gunakan nama domain, bukan alamat IP.
String domain = "post-cn-jaj3h8i****.mqtt.aliyuncs.com";
// Port Cloud SDK. Harus 5672.
int port = 5672;
// ID instans ApsaraMQ for MQTT
String instanceId = "post-cn-jaj3h8i****";
// Muat kredensial dari variabel lingkungan
String accessKey = System.getenv("MQTT_AK_ENV");
String secretKey = System.getenv("MQTT_SK_ENV");
// Topik induk yang akan di-subscribe.
// Cloud SDK hanya berlangganan ke topik induk (tidak ke subtopik).
// Topik yang tidak valid atau tidak sah menyebabkan broker menutup koneksi.
String firstTopic = "firstTopic";
ChannelConfig channelConfig = new ChannelConfig();
channelConfig.setDomain(domain);
channelConfig.setPort(port);
channelConfig.setInstanceId(instanceId);
channelConfig.setAccessKey(accessKey);
channelConfig.setSecretKey(secretKey);
ServerConsumer serverConsumer = new ServerConsumer(channelConfig, new ConsumerConfig());
serverConsumer.start();
// Berlangganan dan proses pesan masuk
serverConsumer.subscribeTopic(firstTopic, new MessageListener() {
@Override
public void process(String msgId, MessageProperties messageProperties,
byte[] payload) {
System.out.println("Receive:" + msgId + ","
+ JSONObject.toJSONString(messageProperties) + ","
+ new String(payload));
}
});
}
}Konsep utama
| Konsep | Deskripsi |
|---|---|
| Client ID | Format: {GroupID}@@@{DeviceID}. Harus unik secara global per koneksi TCP. Maksimal 64 karakter. Penggunaan ulang ID klien di beberapa koneksi menyebabkan broker memutus sesi yang ada. |
| Parent topic | Dibuat di konsol ApsaraMQ for MQTT. Topik yang tidak valid atau tidak sah menyebabkan broker menutup koneksi. |
| Subtopic | Ditambahkan ke topik induk (misalnya, parentTopic/testMq4Iot). Digunakan untuk penyaringan pesan. Maksimal 128 karakter. |
| QoS | Level Quality of Service: 0 (at most once), 1 (at least once), 2 (exactly once). |
| P2P messaging | Publikasikan ke {parentTopic}/p2p/{targetClientId} untuk mengirim langsung ke klien tertentu tanpa memerlukan klien tersebut berlangganan. |
| Protokol dan port | Client SDK: tcp://endpoint:1883 (teks biasa) atau ssl://endpoint:8883 (SSL/TLS). Cloud SDK: port 5672. |
FAQ
Bagaimana cara mengaktifkan koneksi ulang otomatis?
Atur setAutomaticReconnect(true) pada MqttConnectOptions sebelum melakukan koneksi:
MqttConnectOptions options = connectionOptionWrapper.getMqttConnectOptions();
options.setAutomaticReconnect(true);
mqttClient.connect(options);Bagaimana cara menghubungkan melalui SSL/TLS?
Ganti tcp:// dengan ssl:// dan gunakan port 8883:
final MqttClient mqttClient =
new MqttClient("ssl://" + endPoint + ":8883", clientId, memoryPersistence);Dapatkah saya mengirim pesan dari aplikasi backend ke klien MQTT?
Ya. Gunakan MQTTProducerDemo.java dalam proyek demo Cloud SDK untuk arah pengiriman pesan sebaliknya (backend ke klien).
Langkah selanjutnya
Jelajahi lebih banyak pola pengiriman pesan di Proyek demo.
Periksa wilayah yang mendukung akses Cloud SDK.