全部产品
Search
文档中心

ApsaraMQ for MQTT:Gunakan ApsaraMQ for MQTT SDK for Java untuk mengimplementasikan pesan antara klien ApsaraMQ for MQTT dan aplikasi layanan backend

更新时间:Jul 02, 2025

Topik ini menjelaskan cara menggunakan ApsaraMQ for MQTT SDK for Java untuk mengimplementasikan komunikasi pesan antara klien ApsaraMQ for MQTT dan aplikasi layanan backend.

Prasyarat

  • Sumber daya yang diperlukan telah dibuat. Untuk informasi lebih lanjut, lihat Buat Sumber Daya.

  • Pasangan AccessKey telah diperoleh. Untuk informasi lebih lanjut, lihat Peroleh Pasangan AccessKey.

  • Lingkungan pengembangan terintegrasi (IDE) telah diinstal. Untuk informasi lebih lanjut, lihat IntelliJ IDEA. Anda dapat menggunakan IntelliJ IDEA atau Eclipse. Dalam contoh ini, IntelliJ IDEA digunakan.

  • Java Development Kit (JDK) telah diinstal. Untuk informasi lebih lanjut, lihat Unduhan Java.

Informasi latar belakang

Gambar berikut menunjukkan proses untuk mengimplementasikan komunikasi pesan antara klien ApsaraMQ for MQTT dan aplikasi layanan backend. Klien ApsaraMQ for MQTT dapat menggunakan SDK klien, sedangkan aplikasi layanan backend dapat menggunakan SDK cloud untuk mengakses ApsaraMQ for MQTT dan mengimplementasikan komunikasi dua arah.Interaksi Terminal dan CloudTopik ini menjelaskan cara menggunakan ApsaraMQ for MQTT SDK for Java untuk mengimplementasikan komunikasi pesan melalui Internet. Untuk kode contoh yang digunakan dalam pengiriman dan penerimaan pesan, lihat Proyek Demo atau Demo.

Titik akhir

Sebelum menggunakan SDK klien dan SDK cloud untuk mengakses ApsaraMQ for MQTT, tentukan titik akhir dari instance ApsaraMQ for MQTT dalam kode SDK. Hal ini memungkinkan klien ApsaraMQ for MQTT dan aplikasi layanan backend untuk menggunakan titik akhir guna mengakses broker ApsaraMQ for MQTT.

  • Format Titik Akhir untuk SDK Klien

    Ketika menggunakan SDK klien untuk menghubungkan klien ke ApsaraMQ for MQTT, tentukan titik akhir dalam salah satu format berikut:

    • Public Endpoint: ID dari instance ApsaraMQ for MQTT.mqtt.aliyuncs.com

    • VPC Endpoint: ID dari instance ApsaraMQ for MQTT-internal-vpc.mqtt.aliyuncs.com

    Anda juga dapat melihat titik akhir untuk SDK klien pada tab Endpoints halaman Instance Details di Konsol ApsaraMQ for MQTT.

  • Format Titik Akhir untuk SDK Cloud

    Ketika menggunakan SDK cloud untuk menghubungkan aplikasi layanan backend ke ApsaraMQ for MQTT, tentukan titik akhir dalam salah satu format berikut:

    Penting

    Untuk informasi tentang wilayah yang mendukung akses menggunakan SDK cloud, lihat Wilayah Akses SDK.

    • Public Endpoint: ID dari instance ApsaraMQ for MQTT-server-internet.mqtt.aliyuncs.com

    • VPC Endpoint: ID dari instance ApsaraMQ for MQTT server-internal.mqtt.aliyuncs.com

Catatan

Anda dapat memperoleh ID dari instance ApsaraMQ for MQTT di bagian Basic Information halaman Instance Details di Konsol ApsaraMQ for MQTT.

Titik akhir untuk SDK klien dan SDK cloud dapat berupa titik akhir publik atau titik akhir VPC. Titik akhir publik adalah alamat IP untuk akses dari Internet dan biasanya digunakan dalam skenario IoT dan mobile Internet. Titik akhir VPC adalah alamat IP untuk akses dari virtual private cloud (VPC) dan biasanya digunakan oleh aplikasi layanan backend untuk mengakses ApsaraMQ for MQTT.

Penting

Ketika menggunakan titik akhir untuk menghubungkan SDK ke ApsaraMQ for MQTT, gunakan nama domain alih-alih alamat IP. Alamat IP dapat berubah secara tidak terduga. Tim teknis ApsaraMQ for MQTT tidak bertanggung jawab atas kesalahan dan kerugian langsung atau tidak langsung dalam skenario berikut:

  • Klien atau aplikasi layanan backend menggunakan alamat IP alih-alih nama domain untuk mengakses ApsaraMQ for MQTT. Alamat IP asli menjadi tidak valid setelah tim teknis ApsaraMQ for MQTT memperbarui resolusi nama domain.

  • Kebijakan firewall pada alamat IP disetel di jaringan tempat klien ApsaraMQ for MQTT atau aplikasi layanan backend Anda berjalan. Alamat IP baru diblokir karena kebijakan firewall setelah tim teknis ApsaraMQ for MQTT memperbarui resolusi nama domain.

Gunakan SDK klien untuk mengirim pesan

  1. Unduh SDK open source pihak ketiga untuk Java. Tautan unduhan: Eclipse Paho Java Client.

  2. Unduh demo dari SDK klien sebagai referensi untuk pengembangan kode. Tautan unduhan: mqtt-java-demo.

  3. Ekstrak paket proyek demo ke folder tertentu.

  4. Di IntelliJ IDEA, impor file yang diekstrak untuk membuat proyek dan periksa apakah file pom.xml berisi dependensi berikut:

    <dependencies>
            <dependency>
                <groupId>commons-codec</groupId>
                <artifactId>commons-codec</artifactId>
                <version>1.10</version>
            </dependency>
            <dependency>
                <groupId>org.eclipse.paho</groupId>
                <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
                <version>1.2.2</version>
            </dependency>
            <dependency>
                <groupId>org.apache.httpcomponents</groupId>
                <artifactId>httpclient</artifactId>
                <version>4.5.2</version>
            </dependency>
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.83</version>
            </dependency>
            <dependency>
                <groupId>com.aliyun</groupId>
                <artifactId>aliyun-java-sdk-onsmqtt</artifactId>
                <version>1.0.3</version>
            </dependency>
            <dependency>
                <groupId>com.aliyun</groupId>
                <artifactId>aliyun-java-sdk-core</artifactId>
                <version>4.5.0</version>
            </dependency>
    </dependencies>
  5. Di kelas MQ4IoTProducerDemo.java, ikuti deskripsi dalam anotasi kode untuk mengonfigurasi parameter. Sebagian besar parameter terkait dengan sumber daya ApsaraMQ for MQTT yang Anda buat. Untuk informasi lebih lanjut, lihat Buat Sumber Daya. Gunakan fungsi main() untuk menjalankan kode contoh guna mengirim pesan.

    Kode contoh:

    Catatan Sebelum menggunakan kode contoh untuk mengirim dan menerima pesan, konfigurasikan variabel lingkungan untuk memperoleh kredensial yang digunakan untuk mengakses ApsaraMQ for MQTT. Untuk informasi tentang cara mengonfigurasi variabel lingkungan, lihat Konfigurasikan Kredensial Akses.

    Nama variabel lingkungan dari ID AccessKey yang digunakan untuk mengakses ApsaraMQ for MQTT adalah MQTT_AK_ENV, dan nama variabel lingkungan dari Rahasia AccessKey yang digunakan untuk mengakses ApsaraMQ for MQTT adalah MQTT_SK_ENV.

    package com.aliyun.openservices.lmq.example.demo;
    
    import com.aliyun.openservices.lmq.example.util.ConnectionOptionWrapper;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
    import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
    import org.eclipse.paho.client.mqttv3.MqttClient;
    import org.eclipse.paho.client.mqttv3.MqttException;
    import org.eclipse.paho.client.mqttv3.MqttMessage;
    import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
    
    
    public class MQ4IoTProducerDemo {
        public static void main(String[] args) throws Exception {
            /**
             * ID dari instance ApsaraMQ for MQTT yang Anda buat. 
             */
            String instanceId = "XXXXX";
            /**
             * Titik akhir untuk SDK klien. Anda dapat memperoleh titik akhir pada tab Endpoints halaman Detail Instance di Konsol ApsaraMQ for MQTT. 
             * Gunakan nama domain alih-alih alamat IP sebagai titik akhir. Jika tidak, pengecualian klien mungkin terjadi. 
             */
            String endPoint = "XXXXX.mqtt.aliyuncs.com";
            /**
             * ID AccessKey yang Anda buat di Konsol Resource Access Management (RAM) Alibaba Cloud untuk otentikasi identitas. 
             * Pasangan AccessKey dari akun Alibaba Cloud memiliki izin pada semua operasi API. Untuk menghindari risiko keamanan, kami sarankan Anda menggunakan pengguna RAM untuk memanggil operasi API atau melakukan O&M rutin. 
             * Kami sangat menyarankan Anda untuk tidak menyimpan pasangan AccessKey dalam kode proyek. Jika tidak, pasangan AccessKey mungkin bocor dan semua sumber daya yang terkandung dalam akun Anda mungkin terpapar pada risiko keamanan potensial. 
             * Dalam contoh ini, pasangan AccessKey disimpan dalam variabel lingkungan. 
             */
            String accessKey = System.getenv("MQTT_AK_ENV");
            /**
             * Rahasia AccessKey yang Anda buat di Konsol RAM Alibaba Cloud untuk otentikasi identitas. Rahasia AccessKey hanya diperlukan untuk otentikasi tanda tangan. 
             */
            String secretKey = System.getenv("MQTT_SK_ENV");
            /**
             * ID unik global yang sistem tetapkan untuk klien ApsaraMQ for MQTT. ID klien harus bervariasi berdasarkan koneksi TCP. Jika beberapa koneksi TCP menggunakan ID klien yang sama, pengecualian akan terjadi dan koneksi akan ditutup secara tidak terduga. 
             * ID klien terdiri dari ID grup dan ID perangkat dan dalam format GroupID@@@DeviceID. ID grup adalah ID dari grup yang Anda buat di Konsol ApsaraMQ for MQTT. ID perangkat adalah ID kustom yang ditentukan oleh Anda. ID klien tidak boleh melebihi 64 karakter panjangnya. 
             */
            String clientId = "GID_XXXXX@@@XXXXX";
            /**
             * Topik induk yang Anda buat di Konsol ApsaraMQ for MQTT. 
             * Jika Anda menentukan topik yang tidak ada atau topik yang klien ApsaraMQ for MQTT tidak berwenang untuk mengakses, broker ApsaraMQ for MQTT menutup koneksi. 
             */
            final String parentTopic = "XXXXX";
            /**
             * Subtopik. ApsaraMQ for MQTT memungkinkan Anda menggunakan subtopik untuk memfilter pesan. Anda dapat menentukan string sebagai nama subtopik. 
             * Nilai parameter mq4IotTopic dapat mencapai hingga 128 karakter panjangnya. 
             */
            final String mq4IotTopic = parentTopic + "/" + "testMq4Iot";
            /**
             * Tingkat kualitas layanan (QoS) untuk transmisi pesan. Nilai valid: 0, 1, dan 2. Untuk informasi lebih lanjut, lihat Istilah. 
             */
            final int qosLevel = 0;
            ConnectionOptionWrapper connectionOptionWrapper = new ConnectionOptionWrapper(instanceId, accessKey, secretKey, clientId);
            final MemoryPersistence memoryPersistence = new MemoryPersistence();
             /**
             * Protokol dan port yang digunakan oleh klien ApsaraMQ for MQTT. Protokol dan port yang digunakan oleh klien ApsaraMQ for MQTT harus cocok. Jika enkripsi Secure Sockets Layer (SSL) digunakan, protokol dan port harus ditentukan sebagai ssl://endpoint:8883. 
             */
            final MqttClient mqttClient = new MqttClient("tcp://" + endPoint + ":1883", clientId, memoryPersistence);
             /**
             * Periode timeout selama klien ApsaraMQ for MQTT menunggu respons. Periode timeout mencegah klien ApsaraMQ for MQTT menunggu respons untuk periode waktu yang tidak terbatas. 
             */
            mqttClient.setTimeToWait(5000);
            final ExecutorService executorService = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>());
            mqttClient.setCallback(new MqttCallbackExtended() {
                @Override
                public void connectComplete(boolean reconnect, String serverURI) {
                    /**
                     * Topik yang harus dilanggan konsumen secepat mungkin setelah koneksi klien didirikan. 
                     */
                    System.out.println("koneksi berhasil");
                }
    
                @Override
                public void connectionLost(Throwable throwable) {
                    throwable.printStackTrace();
                }
    
                @Override
                public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
                     /**
                     * Callback yang dipanggil untuk mengonsumsi pesan yang diterbitkan. Pastikan callback tidak melempar pengecualian. Jika respons dikembalikan untuk callback, pesan dikonsumsi. 
                     * Pesan harus dikonsumsi dalam periode waktu yang ditentukan. Jika pesan tidak dikonsumsi dalam periode timeout yang ditentukan oleh broker ApsaraMQ for MQTT, broker mungkin mencoba mengirim ulang pesan. Pastikan deduplikasi dilakukan untuk memastikan idempotensi untuk konsumsi pesan. 
                     */
                    System.out.println(
                        "menerima pesan dari topik " + s + " , isi adalah " + new String(mqttMessage.getPayload()));
                }
    
                @Override
                public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                    System.out.println("pesan terkirim sukses topik adalah : " + iMqttDeliveryToken.getTopics()[0]);
                }
            });
            mqttClient.connect(connectionOptionWrapper.getMqttConnectOptions());
            for (int i = 0; i < 10; i++) {
                MqttMessage message = new MqttMessage("hello mq4Iot pub sub msg".getBytes());
                message.setQos(qosLevel);
                 /**
                 * Topik ke mana pesan dikirim. Jika pesan tersebut adalah pesan normal, topik ini harus sama dengan topik yang dilanggan konsumen atau dapat dicocokkan menggunakan wildcard. 
                 */
                mqttClient.publish(mq4IotTopic, message);
                /**
                 * ApsaraMQ for MQTT mendukung pesan point-to-point (P2P). Jika produsen mengetahui bahwa pesan tertentu akan dikirim ke ID klien tertentu, produsen dapat menggunakan pesan P2P. 
                 * Dalam pesan P2P, konsumen tidak perlu melanggan topik ke mana produsen mengirim pesan. Logika disederhanakan di sisi konsumen. Dalam pesan P2P, tentukan topik dalam format {{parentTopic}}/p2p/{{targetClientId}}. 
                 */
                String receiverId = "xxx";
                final String p2pSendTopic = parentTopic + "/p2p/" + receiverId;
                message = new MqttMessage("hello mq4Iot p2p msg".getBytes());
                message.setQos(qosLevel);
                mqttClient.publish(p2pSendTopic, message);
            }
            Thread.sleep(Long.MAX_VALUE);
        }
    }

Gunakan SDK cloud untuk menerima pesan

  1. Unduh SDK cloud yang disediakan oleh ApsaraMQ for MQTT. Untuk informasi lebih lanjut tentang URL unduhan, lihat Catatan Rilis.

  2. Unduh demo dari SDK cloud sebagai referensi untuk pengembangan kode. Tautan unduhan: mqtt-server-sdk-demo.

  3. Ekstrak paket proyek demo ke folder tertentu.

  4. Di IntelliJ IDEA, impor file yang diekstrak untuk membuat proyek dan periksa apakah file pom.xml berisi dependensi berikut:

    <dependencies>
            <dependency>
                <groupId>com.alibaba.mqtt</groupId>
                <artifactId>server-sdk</artifactId>
                <version>1.0.0.Final</version>
            </dependency>
    
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.83</version>
            </dependency>
    </dependencies>
  5. Di kelas MQTTConsumerDemo.java, ikuti deskripsi dalam anotasi kode untuk mengonfigurasi parameter. Sebagian besar parameter terkait dengan sumber daya ApsaraMQ for MQTT yang Anda buat. Untuk informasi lebih lanjut, lihat Buat Sumber Daya. Gunakan fungsi main() untuk menjalankan kode contoh guna menerima pesan.

    Kode contoh:

    Catatan Sebelum menggunakan kode contoh untuk mengirim dan menerima pesan, konfigurasikan variabel lingkungan untuk memperoleh kredensial yang digunakan untuk mengakses ApsaraMQ for MQTT. Untuk informasi tentang cara mengonfigurasi variabel lingkungan, lihat Konfigurasikan Kredensial Akses.

    Nama variabel lingkungan dari ID AccessKey yang digunakan untuk mengakses ApsaraMQ for MQTT adalah MQTT_AK_ENV, dan nama variabel lingkungan dari Rahasia AccessKey yang digunakan untuk mengakses ApsaraMQ for MQTT adalah MQTT_SK_ENV.

    package com.aliyun.openservices.lmq.example;
    
    import com.alibaba.fastjson.JSONObject;
    import com.alibaba.mqtt.server.ServerConsumer;
    import com.alibaba.mqtt.server.callback.MessageListener;
    import com.alibaba.mqtt.server.config.ChannelConfig;
    import com.alibaba.mqtt.server.config.ConsumerConfig;
    import com.alibaba.mqtt.server.model.MessageProperties;
    
    public class MQTTConsumerDemo {
        public static void main(String[] args) throws Exception {
            /**
             * Titik akhir untuk SDK cloud. Untuk informasi tentang format titik akhir, lihat bagian "Endpoints" dari topik ini. 
             * Gunakan nama domain alih-alih alamat IP sebagai titik akhir. Jika tidak, pengecualian broker mungkin terjadi. 
             */
            String domain = "post-cn-jaj3h8i****.mqtt.aliyuncs.com";
    
            /**
             * Port yang digunakan oleh SDK cloud. Protokol dan port yang digunakan oleh SDK cloud harus cocok. Setel nilai ke 5672. 
             */
            int port = "5672";
    
            /**
             * ID dari instance ApsaraMQ for MQTT yang Anda buat. 
             */
            String instanceId = "post-cn-jaj3h8i****";
    
            /**
             * ID AccessKey yang Anda buat di Konsol RAM Alibaba Cloud untuk otentikasi identitas. 
             * Pasangan AccessKey dari akun Alibaba Cloud memiliki izin pada semua operasi API. Untuk menghindari risiko keamanan, kami sarankan Anda menggunakan pengguna RAM untuk memanggil operasi API atau melakukan O&M rutin. 
             * Kami sangat menyarankan Anda untuk tidak menyimpan pasangan AccessKey dalam kode proyek. Jika tidak, pasangan AccessKey mungkin bocor dan semua sumber daya yang terkandung dalam akun Anda mungkin terpapar pada risiko keamanan potensial. 
             * Dalam contoh ini, pasangan AccessKey disimpan dalam variabel lingkungan. 
             */
            String accessKey = System.getenv("MQTT_AK_ENV");
            /**
             * Rahasia AccessKey yang Anda buat di Konsol RAM Alibaba Cloud untuk otentikasi identitas. Rahasia AccessKey hanya diperlukan untuk otentikasi tanda tangan. 
             */
            String secretKey = System.getenv("MQTT_SK_ENV");
    
            /**
             * Topik induk yang Anda buat di Konsol ApsaraMQ for MQTT. 
             * Dalam banyak kasus, Anda menggunakan SDK cloud untuk melanggan pesan dalam skenario di mana aplikasi cloud mengumpulkan dan menganalisis pesan. Akibatnya, Anda tidak dapat menentukan subtopik saat menggunakan SDK cloud untuk melanggan pesan. 
             * Jika Anda menentukan topik yang tidak ada atau topik yang klien ApsaraMQ for MQTT tidak berwenang untuk mengakses, broker ApsaraMQ for MQTT menutup koneksi. 
             */
            String firstTopic = "firstTopic";
    
            ChannelConfig channelConfig = new ChannelConfig();
            channelConfig.setDomain(domain);
            channelConfig.setPort(port);
            channelConfig.setInstanceId(instanceId);
            channelConfig.setAccessKey(accessKey);
            channelConfig.setSecretKey(secretKey);
    
            ServerConsumer serverConsumer = new ServerConsumer(channelConfig, new ConsumerConfig());
            serverConsumer.start();
            serverConsumer.subscribeTopic(firstTopic, new MessageListener() {
                @Override
                public void process(String msgId, MessageProperties messageProperties, byte[] payload) {
                    System.out.println("Menerima:" + msgId + "," + JSONObject.toJSONString(messageProperties) + "," + new String(payload));
                }
            });
        }
    
    }
    Catatan

    Untuk informasi tentang kode contoh yang digunakan untuk mengirim pesan menggunakan SDK cloud, lihat MQTTProducerDemo.java.