全部产品
Search
文档中心

ApsaraMQ for MQTT:Kirim dan terima pesan antara klien MQTT dan aplikasi backend

更新时间:Mar 11, 2026

Tutorial ini memandu Anda mengirim pesan dari klien ApsaraMQ for MQTT dan menerimanya di aplikasi layanan backend. Klien menggunakan client SDK (Eclipse Paho) untuk memublikasikan pesan, sedangkan backend menggunakan cloud SDK untuk mengonsumsinya.

Cara kerja

Klien ApsaraMQ for MQTT dan aplikasi layanan backend berkomunikasi melalui broker ApsaraMQ for MQTT. Masing-masing sisi menggunakan SDK yang berbeda:

SDKPustakaTerhubung keKasus penggunaan
Client SDKEclipse Paho Java ClientPerangkat IoT, aplikasi selulerPublikasikan dan berlangganan pesan melalui MQTT
Cloud SDKApsaraMQ for MQTT server SDKAplikasi layanan backendMengonsumsi atau mengirim pesan dalam skala besar, dengan berlangganan pada level topik induk
Messaging between an ApsaraMQ for MQTT client and a backend application

Untuk proyek sampel lengkap, lihat Proyek demo atau Demo.

Prasyarat

Sebelum memulai, pastikan Anda telah memiliki:

Endpoints

Kedua SDK memerlukan endpoint untuk terhubung ke instans ApsaraMQ for MQTT Anda.

Endpoint Client SDK

Jenis aksesFormatKasus penggunaan umum
Public endpoint<instance-id>.mqtt.aliyuncs.comPerangkat IoT, aplikasi seluler
VPC endpoint<instance-id>-internal-vpc.mqtt.aliyuncs.comKlien di dalam VPC

Anda dapat menemukan endpoint Client SDK di tab Endpoints pada halaman Instance Details di Konsol ApsaraMQ for MQTT.

Endpoint Cloud SDK

Jenis aksesFormatKasus penggunaan umum
Public endpoint<instance-id>-server-internet.mqtt.aliyuncs.comAplikasi backend melalui Internet
VPC endpoint<instance-id>-server-internal.mqtt.aliyuncs.comAplikasi backend di dalam VPC
Penting

Tidak semua wilayah mendukung akses Cloud SDK. Lihat Wilayah yang didukung untuk detailnya.

Anda dapat menemukan ID instans di bagian Basic Information pada halaman Instance Details di Konsol ApsaraMQ for MQTT.

Penting

Selalu gunakan nama domain, bukan alamat IP. Alamat IP dapat berubah tanpa pemberitahuan selama pembaruan resolusi domain. ApsaraMQ for MQTT tidak bertanggung jawab atas kegagalan koneksi yang disebabkan oleh:

  • Penggunaan alamat IP yang dikodekan secara tetap (hardcoded) yang menjadi tidak valid setelah pembaruan DNS.

  • Aturan firewall yang memblokir alamat IP baru setelah pembaruan DNS.

Langkah 1: Siapkan proyek Client SDK

  1. Unduh SDK open source pihak ketiga untuk Java: Eclipse Paho Java Client.

  2. Unduh proyek demo mqtt-java-demo, ekstrak, lalu impor ke IntelliJ IDEA.

Verifikasi bahwa file pom.xml Anda menyertakan dependensi berikut:

<dependencies>
    <dependency>
        <groupId>commons-codec</groupId>
        <artifactId>commons-codec</artifactId>
        <version>1.10</version>
    </dependency>
    <dependency>
        <groupId>org.eclipse.paho</groupId>
        <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
        <version>1.2.2</version>
    </dependency>
    <dependency>
        <groupId>org.apache.httpcomponents</groupId>
        <artifactId>httpclient</artifactId>
        <version>4.5.2</version>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.83</version>
    </dependency>
    <dependency>
        <groupId>com.aliyun</groupId>
        <artifactId>aliyun-java-sdk-onsmqtt</artifactId>
        <version>1.0.3</version>
    </dependency>
    <dependency>
        <groupId>com.aliyun</groupId>
        <artifactId>aliyun-java-sdk-core</artifactId>
        <version>4.5.0</version>
    </dependency>
</dependencies>

Langkah 2: Konfigurasikan kredensial

Atur variabel lingkungan berikut untuk otentikasi. Lihat Konfigurasikan kredensial akses untuk detailnya.

# ID AccessKey
export MQTT_AK_ENV=<your-access-key-id>
# Rahasia AccessKey
export MQTT_SK_ENV=<your-access-key-secret>
Penting

Jangan masukkan pasangan AccessKey secara langsung ke dalam kode sumber. Simpan dalam variabel lingkungan untuk menghindari kebocoran kredensial.

Langkah 3: Kirim pesan dengan Client SDK

Buka MQ4IoTProducerDemo.java dan perbarui parameter berikut dengan nilai dari instans ApsaraMQ for MQTT Anda. Lihat Buat sumber daya untuk mengetahui lokasi nilai-nilai tersebut.

ParameterDeskripsiContoh
instanceIdID instans ApsaraMQ for MQTTpost-cn-xxxxx
endPointEndpoint Client SDK (lihat bagian Endpoints di atas)post-cn-xxxxx.mqtt.aliyuncs.com
clientIdID klien yang unik secara global dalam format {GroupID}@@@{DeviceID}. Maksimal 64 karakter. Setiap koneksi TCP harus menggunakan ID klien yang berbeda.GID_test@@@device_001
parentTopicTopik induk yang dibuat di konsoltestTopic

Hubungkan ke broker

Buat klien MQTT dan bangun koneksi. Gunakan tcp://endpoint:1883 untuk teks biasa atau ssl://endpoint:8883 untuk SSL/TLS.

String instanceId = "XXXXX";
String endPoint = "XXXXX.mqtt.aliyuncs.com";
String accessKey = System.getenv("MQTT_AK_ENV");
String secretKey = System.getenv("MQTT_SK_ENV");
String clientId = "GID_XXXXX@@@XXXXX";

ConnectionOptionWrapper connectionOptionWrapper =
    new ConnectionOptionWrapper(instanceId, accessKey, secretKey, clientId);
final MemoryPersistence memoryPersistence = new MemoryPersistence();

final MqttClient mqttClient =
    new MqttClient("tcp://" + endPoint + ":1883", clientId, memoryPersistence);

// Atur timeout respons (ms) untuk menghindari penantian tanpa batas
mqttClient.setTimeToWait(5000);
ID klien menggunakan format {GroupID}@@@{DeviceID} dan harus unik secara global per koneksi TCP. Penggunaan ulang ID klien di beberapa koneksi menyebabkan broker memutus sesi yang ada.

Siapkan callback

Atur callback sebelum melakukan koneksi untuk menghindari kehilangan pesan selama pemulihan sesi.

mqttClient.setCallback(new MqttCallbackExtended() {
    @Override
    public void connectComplete(boolean reconnect, String serverURI) {
        System.out.println("connect success");
    }

    @Override
    public void connectionLost(Throwable throwable) {
        throwable.printStackTrace();
    }

    @Override
    public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
        // Proses pesan dalam batas waktu broker untuk menghindari pengiriman ulang.
        // Terapkan deduplikasi untuk memastikan pemrosesan idempoten.
        System.out.println(
            "receive msg from topic " + s + " , body is "
            + new String(mqttMessage.getPayload()));
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
        System.out.println("send msg succeed topic is : "
            + iMqttDeliveryToken.getTopics()[0]);
    }
});

mqttClient.connect(connectionOptionWrapper.getMqttConnectOptions());

Publikasikan pesan

Publikasikan pesan ke subtopik. Subtopik ditambahkan ke topik induk dan dapat mencapai maksimal 128 karakter.

final String parentTopic = "XXXXX";
final String mq4IotTopic = parentTopic + "/" + "testMq4Iot";
final int qosLevel = 0;

MqttMessage message = new MqttMessage("hello mq4Iot pub sub msg".getBytes());
message.setQos(qosLevel);
mqttClient.publish(mq4IotTopic, message);

Level QoS: 0 (at most once), 1 (at least once), atau 2 (exactly once).

Kirim pesan point-to-point (P2P)

Pesan P2P mengirimkan pesan langsung ke klien tertentu. Klien target tidak perlu berlangganan ke topik tersebut. Publikasikan ke {parentTopic}/p2p/{targetClientId}.

String receiverId = "GID_test@@@device_002";
final String p2pSendTopic = parentTopic + "/p2p/" + receiverId;
MqttMessage p2pMessage = new MqttMessage("hello mq4Iot p2p msg".getBytes());
p2pMessage.setQos(qosLevel);
mqttClient.publish(p2pSendTopic, p2pMessage);

Kode produsen lengkap

Kode berikut menggabungkan semua langkah di atas. Buka MQ4IoTProducerDemo.java dan perbarui parameternya.

package com.aliyun.openservices.lmq.example.demo;

import com.aliyun.openservices.lmq.example.util.ConnectionOptionWrapper;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;


public class MQ4IoTProducerDemo {
    public static void main(String[] args) throws Exception {
        // ID instans ApsaraMQ for MQTT
        String instanceId = "XXXXX";

        // Endpoint Client SDK. Gunakan nama domain, bukan alamat IP.
        String endPoint = "XXXXX.mqtt.aliyuncs.com";

        // Muat kredensial dari variabel lingkungan
        String accessKey = System.getenv("MQTT_AK_ENV");
        String secretKey = System.getenv("MQTT_SK_ENV");

        // Format ID klien: {GroupID}@@@{DeviceID}
        // Harus unik per koneksi TCP. Maksimal 64 karakter.
        String clientId = "GID_XXXXX@@@XXXXX";

        // Topik induk yang dibuat di konsol ApsaraMQ for MQTT.
        // Topik yang tidak valid atau tidak sah menyebabkan broker menutup koneksi.
        final String parentTopic = "XXXXX";

        // Subtopik untuk penyaringan pesan. Maksimal 128 karakter.
        final String mq4IotTopic = parentTopic + "/" + "testMq4Iot";

        // Level QoS: 0, 1, atau 2
        final int qosLevel = 0;

        ConnectionOptionWrapper connectionOptionWrapper =
            new ConnectionOptionWrapper(instanceId, accessKey, secretKey, clientId);
        final MemoryPersistence memoryPersistence = new MemoryPersistence();

        // Hubungkan melalui TCP pada port 1883. Untuk SSL, gunakan ssl://endpoint:8883.
        final MqttClient mqttClient =
            new MqttClient("tcp://" + endPoint + ":1883", clientId, memoryPersistence);

        // Atur timeout respons (ms) untuk menghindari penantian tanpa batas
        mqttClient.setTimeToWait(5000);

        final ExecutorService executorService = new ThreadPoolExecutor(
            1, 1, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());

        // Atur callback sebelum melakukan koneksi untuk menghindari kehilangan pesan
        mqttClient.setCallback(new MqttCallbackExtended() {
            @Override
            public void connectComplete(boolean reconnect, String serverURI) {
                System.out.println("connect success");
            }

            @Override
            public void connectionLost(Throwable throwable) {
                throwable.printStackTrace();
            }

            @Override
            public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
                // Proses pesan dalam batas waktu broker untuk menghindari pengiriman ulang.
                // Terapkan deduplikasi untuk memastikan pemrosesan idempoten.
                System.out.println(
                    "receive msg from topic " + s + " , body is "
                    + new String(mqttMessage.getPayload()));
            }

            @Override
            public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                System.out.println("send msg succeed topic is : "
                    + iMqttDeliveryToken.getTopics()[0]);
            }
        });

        mqttClient.connect(connectionOptionWrapper.getMqttConnectOptions());

        for (int i = 0; i < 10; i++) {
            // Publikasikan ke subtopik (pub/sub standar)
            MqttMessage message = new MqttMessage("hello mq4Iot pub sub msg".getBytes());
            message.setQos(qosLevel);
            mqttClient.publish(mq4IotTopic, message);

            // Publikasikan pesan point-to-point (P2P) langsung ke klien tertentu.
            // Klien target tidak perlu berlangganan ke topik ini.
            // Format topik: {parentTopic}/p2p/{targetClientId}
            String receiverId = "xxx";
            final String p2pSendTopic = parentTopic + "/p2p/" + receiverId;
            message = new MqttMessage("hello mq4Iot p2p msg".getBytes());
            message.setQos(qosLevel);
            mqttClient.publish(p2pSendTopic, message);
        }
        Thread.sleep(Long.MAX_VALUE);
    }
}

Verifikasi produsen

Jalankan MQ4IoTProducerDemo.java. Jika koneksi berhasil, Anda akan melihat output seperti berikut:

connect success
send msg succeed topic is : XXXXX/testMq4Iot
send msg succeed topic is : XXXXX/p2p/xxx
...

Langkah 4: Terima pesan dengan Cloud SDK

Cloud SDK menghubungkan aplikasi backend Anda ke broker ApsaraMQ for MQTT. Berbeda dengan Client SDK, Cloud SDK hanya berlangganan ke topik induk (bukan subtopik), sehingga cocok untuk skenario backend di mana aplikasi mengumpulkan dan menganalisis semua pesan di bawah suatu topik.

Siapkan proyek

  1. Unduh Cloud SDK. Lihat Catatan rilis untuk versi terbaru.

  2. Unduh proyek demo: mqtt-server-sdk-demo.

  3. Ekstrak paket dan impor ke IntelliJ IDEA.

  4. Verifikasi bahwa file pom.xml Anda menyertakan dependensi berikut:

       <dependencies>
           <dependency>
               <groupId>com.alibaba.mqtt</groupId>
               <artifactId>server-sdk</artifactId>
               <version>1.0.0.Final</version>
           </dependency>
           <dependency>
               <groupId>com.alibaba</groupId>
               <artifactId>fastjson</artifactId>
               <version>1.2.83</version>
           </dependency>
       </dependencies>

Konfigurasikan dan jalankan konsumen

Buka MQTTConsumerDemo.java dan perbarui parameter berikut:

ParameterDeskripsiContoh
domainEndpoint Cloud SDK (lihat bagian Endpoints di atas)post-cn-xxxxx-server-internet.mqtt.aliyuncs.com
portPort Cloud SDK. Selalu 5672.5672
instanceIdID instans ApsaraMQ for MQTTpost-cn-xxxxx
firstTopicTopik induk yang akan di-subscribetestTopic

Kredensial dimuat dari variabel lingkungan yang sama (MQTT_AK_ENV dan MQTT_SK_ENV) yang dikonfigurasi di Langkah 2.

Hubungkan dan berlangganan

String domain = "post-cn-jaj3h8i****.mqtt.aliyuncs.com";
int port = 5672;
String instanceId = "post-cn-jaj3h8i****";
String accessKey = System.getenv("MQTT_AK_ENV");
String secretKey = System.getenv("MQTT_SK_ENV");
String firstTopic = "firstTopic";

ChannelConfig channelConfig = new ChannelConfig();
channelConfig.setDomain(domain);
channelConfig.setPort(port);
channelConfig.setInstanceId(instanceId);
channelConfig.setAccessKey(accessKey);
channelConfig.setSecretKey(secretKey);

ServerConsumer serverConsumer = new ServerConsumer(channelConfig, new ConsumerConfig());
serverConsumer.start();

serverConsumer.subscribeTopic(firstTopic, new MessageListener() {
    @Override
    public void process(String msgId, MessageProperties messageProperties,
                        byte[] payload) {
        System.out.println("Receive:" + msgId + ","
            + JSONObject.toJSONString(messageProperties) + ","
            + new String(payload));
    }
});

Kode konsumen lengkap

package com.aliyun.openservices.lmq.example;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.mqtt.server.ServerConsumer;
import com.alibaba.mqtt.server.callback.MessageListener;
import com.alibaba.mqtt.server.config.ChannelConfig;
import com.alibaba.mqtt.server.config.ConsumerConfig;
import com.alibaba.mqtt.server.model.MessageProperties;

public class MQTTConsumerDemo {
    public static void main(String[] args) throws Exception {
        // Endpoint Cloud SDK. Gunakan nama domain, bukan alamat IP.
        String domain = "post-cn-jaj3h8i****.mqtt.aliyuncs.com";

        // Port Cloud SDK. Harus 5672.
        int port = 5672;

        // ID instans ApsaraMQ for MQTT
        String instanceId = "post-cn-jaj3h8i****";

        // Muat kredensial dari variabel lingkungan
        String accessKey = System.getenv("MQTT_AK_ENV");
        String secretKey = System.getenv("MQTT_SK_ENV");

        // Topik induk yang akan di-subscribe.
        // Cloud SDK hanya berlangganan ke topik induk (tidak ke subtopik).
        // Topik yang tidak valid atau tidak sah menyebabkan broker menutup koneksi.
        String firstTopic = "firstTopic";

        ChannelConfig channelConfig = new ChannelConfig();
        channelConfig.setDomain(domain);
        channelConfig.setPort(port);
        channelConfig.setInstanceId(instanceId);
        channelConfig.setAccessKey(accessKey);
        channelConfig.setSecretKey(secretKey);

        ServerConsumer serverConsumer = new ServerConsumer(channelConfig, new ConsumerConfig());
        serverConsumer.start();

        // Berlangganan dan proses pesan masuk
        serverConsumer.subscribeTopic(firstTopic, new MessageListener() {
            @Override
            public void process(String msgId, MessageProperties messageProperties,
                                byte[] payload) {
                System.out.println("Receive:" + msgId + ","
                    + JSONObject.toJSONString(messageProperties) + ","
                    + new String(payload));
            }
        });
    }
}

Konsep utama

KonsepDeskripsi
Client IDFormat: {GroupID}@@@{DeviceID}. Harus unik secara global per koneksi TCP. Maksimal 64 karakter. Penggunaan ulang ID klien di beberapa koneksi menyebabkan broker memutus sesi yang ada.
Parent topicDibuat di konsol ApsaraMQ for MQTT. Topik yang tidak valid atau tidak sah menyebabkan broker menutup koneksi.
SubtopicDitambahkan ke topik induk (misalnya, parentTopic/testMq4Iot). Digunakan untuk penyaringan pesan. Maksimal 128 karakter.
QoSLevel Quality of Service: 0 (at most once), 1 (at least once), 2 (exactly once).
P2P messagingPublikasikan ke {parentTopic}/p2p/{targetClientId} untuk mengirim langsung ke klien tertentu tanpa memerlukan klien tersebut berlangganan.
Protokol dan portClient SDK: tcp://endpoint:1883 (teks biasa) atau ssl://endpoint:8883 (SSL/TLS). Cloud SDK: port 5672.

FAQ

Bagaimana cara mengaktifkan koneksi ulang otomatis?

Atur setAutomaticReconnect(true) pada MqttConnectOptions sebelum melakukan koneksi:

MqttConnectOptions options = connectionOptionWrapper.getMqttConnectOptions();
options.setAutomaticReconnect(true);
mqttClient.connect(options);

Bagaimana cara menghubungkan melalui SSL/TLS?

Ganti tcp:// dengan ssl:// dan gunakan port 8883:

final MqttClient mqttClient =
    new MqttClient("ssl://" + endPoint + ":8883", clientId, memoryPersistence);

Dapatkah saya mengirim pesan dari aplikasi backend ke klien MQTT?

Ya. Gunakan MQTTProducerDemo.java dalam proyek demo Cloud SDK untuk arah pengiriman pesan sebaliknya (backend ke klien).

Langkah selanjutnya

  • Jelajahi lebih banyak pola pengiriman pesan di Proyek demo.

  • Periksa wilayah yang mendukung akses Cloud SDK.