全部产品
Search
文档中心

ApsaraMQ for MQTT:Menggunakan ApsaraMQ for MQTT SDK untuk Java untuk mengimplementasikan pesan antar klien ApsaraMQ for MQTT

更新时间:Jul 02, 2025

Topik ini menjelaskan cara mengimplementasikan pesan antar klien ApsaraMQ for MQTT menggunakan ApsaraMQ for MQTT SDK untuk Java.

Prasyarat

  • Membuat Sumber Daya
  • Pasangan AccessKey telah diperoleh.
  • Lingkungan Pengembangan Terintegrasi (IDE) telah diinstal. Untuk informasi lebih lanjut, lihat IDE. Anda dapat menggunakan IntelliJ IDEA atau Eclipse. Dalam contoh ini, IntelliJ IDEA digunakan.

  • Java Development Kit (JDK) telah diinstal. Untuk informasi lebih lanjut, lihat JDK.

Informasi latar belakang

ApsaraMQ for MQTT berlaku untuk skenario di mana klien ApsaraMQ for MQTT saling berinteraksi. Dalam skenario ini, baik produsen maupun konsumen adalah klien ApsaraMQ for MQTT. Setiap klien ApsaraMQ for MQTT menggunakan SDK klien ApsaraMQ for MQTT untuk terhubung ke broker ApsaraMQ for MQTT guna bertukar pesan.

Messaging between clients

Topik ini menjelaskan cara menggunakan ApsaraMQ for MQTT SDK untuk Java untuk mengimplementasikan pesan antar klien ApsaraMQ for MQTT melalui Internet.

Titik akhir

Sebelum menggunakan SDK klien dan SDK cloud untuk mengakses ApsaraMQ for MQTT, Anda harus menentukan titik akhir dari instance ApsaraMQ for MQTT dalam kode SDK. Dengan cara ini, klien ApsaraMQ for MQTT dan aplikasi layanan backend dapat menggunakan titik akhir untuk mengakses broker ApsaraMQ for MQTT.

  • Format Titik Akhir untuk SDK Klien

    Ketika menggunakan SDK klien untuk menghubungkan klien ke ApsaraMQ for MQTT, tentukan titik akhir dalam salah satu format berikut:

    • Public Endpoint: ID dari instance ApsaraMQ for MQTT.mqtt.aliyuncs.com

    • VPC Endpoint: ID dari instance ApsaraMQ for MQTT-internal-vpc.mqtt.aliyuncs.com

    Anda juga dapat melihat titik akhir untuk SDK klien pada tab Endpoints halaman Instance Details di Konsol ApsaraMQ for MQTT.

  • Format Titik Akhir untuk SDK Cloud

    Ketika menggunakan SDK cloud untuk menghubungkan aplikasi layanan backend ke ApsaraMQ for MQTT, tentukan titik akhir dalam salah satu format berikut:

    Penting

    Untuk informasi tentang wilayah yang mendukung akses menggunakan SDK cloud, lihat Wilayah Akses SDK.

    • Public Endpoint: ID dari instance ApsaraMQ for MQTT-server-internet.mqtt.aliyuncs.com

    • VPC Endpoint: ID dari instance ApsaraMQ for MQTT-server-internal.mqtt.aliyuncs.com

Catatan

Anda dapat memperoleh ID dari instance ApsaraMQ for MQTT di bagian Basic Information halaman Instance Details di Konsol ApsaraMQ for MQTT.

Titik akhir untuk SDK klien dan SDK cloud dapat berupa titik akhir publik atau titik akhir VPC. Titik akhir publik adalah alamat IP untuk akses dari Internet dan biasanya digunakan dalam skenario IoT dan Internet seluler. Titik akhir pribadi adalah alamat IP untuk akses dari virtual private cloud (VPC) dan biasanya digunakan oleh aplikasi layanan backend untuk mengakses ApsaraMQ for MQTT.

Penting

Ketika menggunakan titik akhir untuk menghubungkan SDK ke ApsaraMQ for MQTT, gunakan nama domain alih-alih alamat IP. Alamat IP dapat berubah secara tak terduga. Tim teknis ApsaraMQ for MQTT tidak bertanggung jawab atas kesalahan dan kerugian langsung atau tidak langsung dalam skenario berikut:

  • Klien atau aplikasi layanan backend menggunakan alamat IP alih-alih nama domain untuk mengakses ApsaraMQ for MQTT. Alamat IP asli menjadi tidak valid setelah tim teknis ApsaraMQ for MQTT memperbarui resolusi nama domain.

  • Kebijakan firewall pada alamat IP disetel di jaringan tempat klien ApsaraMQ for MQTT atau aplikasi layanan backend Anda berjalan. Alamat IP baru diblokir karena kebijakan firewall setelah tim teknis ApsaraMQ for MQTT memperbarui resolusi nama domain.

Panggil SDK untuk Java untuk mengirim dan berlangganan pesan

  1. Unduh SDK sumber terbuka pihak ketiga untuk Java. Tautan unduhan: Eclipse Paho Java Client.

  2. Unduh demo dari ApsaraMQ for MQTT SDK untuk Java sebagai referensi selama pengembangan kode. Tautan unduhan: mqtt-java-demo.

  3. Ekstrak paket proyek demo ke folder tertentu.

  4. Di IntelliJ IDEA, impor file yang diekstrak untuk membuat proyek dan periksa apakah file pom.xml berisi dependensi berikut:
    <dependencies>
            <dependency>
                <groupId>commons-codec</groupId>
                <artifactId>commons-codec</artifactId>
                <version>1.10</version>
            </dependency>
            <dependency>
                <groupId>org.eclipse.paho</groupId>
                <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
                <version>1.2.2</version>
            </dependency>
            <dependency>
                <groupId>org.apache.httpcomponents</groupId>
                <artifactId>httpclient</artifactId>
                <version>4.5.2</version>
            </dependency>
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.83</version>
            </dependency>
            <dependency>
                <groupId>com.aliyun</groupId>
                <artifactId>aliyun-java-sdk-onsmqtt</artifactId>
                <version>1.0.3</version>
            </dependency>
            <dependency>
                <groupId>com.aliyun</groupId>
                <artifactId>aliyun-java-sdk-core</artifactId>
                <version>4.5.0</version>
            </dependency>
    </dependencies>
  5. Di kelas MQ4IoTSendMessageToMQ4IoTUseSignatureMode.java, atur parameter berdasarkan komentar kode. Sebagian besar parameter ini dapat diatur berdasarkan sumber daya ApsaraMQ for MQTT yang Anda buat. Untuk informasi lebih lanjut, lihat Buat Sumber Daya. Gunakan fungsi main() untuk menjalankan kode sampel guna mengirim dan menerima pesan.
    Kode sampel:
    Catatan Sebelum menggunakan kode sampel untuk mengirim dan menerima pesan, Anda harus mengonfigurasi variabel lingkungan untuk mendapatkan kredensial yang digunakan untuk mengakses ApsaraMQ for MQTT. Untuk informasi tentang cara mengonfigurasi variabel lingkungan, lihat Konfigurasikan Kredensial Akses.

    Nama variabel lingkungan untuk ID AccessKey yang digunakan untuk mengakses ApsaraMQ for MQTT adalah MQTT_AK_ENV, dan nama variabel lingkungan untuk Rahasia AccessKey yang digunakan untuk mengakses ApsaraMQ for MQTT adalah MQTT_SK_ENV.

    package com.aliyun.openservices.lmq.example.demo;
    
    import com.aliyun.openservices.lmq.example.util.ConnectionOptionWrapper;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
    import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
    import org.eclipse.paho.client.mqttv3.MqttClient;
    import org.eclipse.paho.client.mqttv3.MqttException;
    import org.eclipse.paho.client.mqttv3.MqttMessage;
    import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
    
    public class MQ4IoTSendMessageToMQ4IoTUseSignatureMode {
        public static void main(String[] args) throws Exception {
            /**
             * ID dari instance ApsaraMQ for MQTT yang Anda buat di konsol ApsaraMQ for MQTT. 
             */
            String instanceId = "XXXXX";
            /**
             * Titik akhir dari instance ApsaraMQ for MQTT. Anda dapat memperoleh titik akhir di halaman Detail Instance di konsol ApsaraMQ for MQTT. 
             */
            String endPoint = "XXXXX.mqtt.aliyuncs.com";
            /**
             * ID AccessKey yang Anda buat di konsol RAM Alibaba Cloud untuk otentikasi identitas. 
             * Pasangan AccessKey dari akun Alibaba Cloud dapat digunakan untuk mengakses semua operasi API. Untuk mencegah masalah keamanan, kami sarankan Anda menggunakan pengguna RAM untuk mengakses operasi API dan melakukan pemeliharaan rutin. 
             * Kami sangat menyarankan agar Anda tidak menyimpan pasangan AccessKey di kode proyek. Jika tidak, pasangan AccessKey mungkin bocor dan semua sumber daya yang terkandung dalam akun Anda mungkin terpapar risiko keamanan potensial. 
             * Dalam contoh ini, pasangan AccessKey disimpan dalam variabel lingkungan. 
             */
            String accessKey = System.getenv("MQTT_AK_ENV");
            /**
             * Rahasia AccessKey yang Anda buat di konsol RAM Alibaba Cloud untuk otentikasi identitas. Rahasia AccessKey diperlukan hanya saat mode otentikasi tanda tangan digunakan. 
             */
            String secretKey = System.getenv("MQTT_SK_ENV");
            /**
             * ID unik global yang sistem tetapkan ke klien ApsaraMQ for MQTT. ID klien harus bervariasi berdasarkan koneksi TCP. Jika beberapa koneksi TCP menggunakan ID klien yang sama, pengecualian terjadi dan koneksi ditutup secara tak terduga. 
             * Nilai parameter clientId berada dalam format GroupID@@@DeviceID. GroupID menunjukkan ID grup yang Anda buat di konsol ApsaraMQ for MQTT dan DeviceID menunjukkan ID kustom perangkat. Nilai parameter clientId dapat mencapai hingga 64 karakter panjangnya. 
             */
            String clientId = "GID_XXXXX@@@XXXXX";
            /**
             * Topik induk yang Anda buat di konsol ApsaraMQ for MQTT. 
             * Jika Anda menentukan topik yang tidak ada atau topik yang klien ApsaraMQ for MQTT tidak berwenang untuk mengakses, broker ApsaraMQ for MQTT menutup koneksi. 
             */
            final String parentTopic = "XXXXX";
            /**
             * ApsaraMQ for MQTT memungkinkan Anda menggunakan subtopik untuk memfilter pesan. Anda dapat menentukan string sebagai nilainya. Kode berikut menunjukkan contohnya. 
             * Nilai parameter mq4IotTopic dapat mencapai hingga 128 karakter panjangnya. 
             */
            final String mq4IotTopic = parentTopic + "/" + "testMq4Iot";
            /**
             * Tingkat kualitas layanan (QoS) dalam transmisi pesan. Nilai valid: 0, 1, dan 2. Untuk informasi lebih lanjut, lihat Istilah. 
             */
            final int qosLevel = 0;
            ConnectionOptionWrapper connectionOptionWrapper = new ConnectionOptionWrapper(instanceId, accessKey, secretKey, clientId);
            final MemoryPersistence memoryPersistence = new MemoryPersistence();
            /**
             * Protokol dan port yang digunakan oleh klien ApsaraMQ for MQTT. Protokol dan port yang digunakan oleh klien ApsaraMQ for MQTT harus cocok. Jika enkripsi SSL digunakan, protokol dan port harus diatur ke ssl://endpoint:8883. 
             */
            final MqttClient mqttClient = new MqttClient("tcp://" + endPoint + ":1883", clientId, memoryPersistence);
            /**
             * Periode timeout selama klien ApsaraMQ for MQTT menunggu respons. Periode timeout mencegah klien ApsaraMQ for MQTT menunggu respons untuk periode waktu yang tidak terbatas. 
             */
            mqttClient.setTimeToWait(5000);
            final ExecutorService executorService = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>());
            mqttClient.setCallback(new MqttCallbackExtended() {
                @Override
                public void connectComplete(boolean reconnect, String serverURI) {
                    /**
                     * Topik yang harus Anda langgani secepat mungkin setelah koneksi klien didirikan. 
                     */
                    System.out.println("koneksi berhasil");
                    executorService.submit(new Runnable() {
                        @Override
                        public void run() {
                            try {
                                final String topicFilter[] = {mq4IotTopic};
                                final int[] qos = {qosLevel};
                                mqttClient.subscribe(topicFilter, qos);
                            } catch (MqttException e) {
                                e.printStackTrace();
                            }
                        }
                    });
                }
    
                @Override
                public void connectionLost(Throwable throwable) {
                    throwable.printStackTrace();
                }
    
                @Override
                public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
                    /**
                     * Panggil balik yang dipanggil untuk mengonsumsi pesan. Pastikan panggil balik tidak melempar pengecualian. Jika respons dikembalikan untuk panggil balik, pesan dikonsumsi. 
                     * Pesan harus dikonsumsi dalam periode yang ditentukan. Jika pesan tidak dikonsumsi dalam periode timeout yang ditentukan oleh broker ApsaraMQ for MQTT, broker mungkin mencoba mengirim ulang pesan. Pastikan deduplikasi dilakukan untuk memastikan idempotensi untuk konsumsi pesan. 
                     */
                    System.out.println(
                        "menerima pesan dari topik " + s + " , isi adalah " + new String(mqttMessage.getPayload()));
                }
    
                @Override
                public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                    System.out.println("pesan terkirim sukses topik adalah : " + iMqttDeliveryToken.getTopics()[0]);
                }
            });
            mqttClient.connect(connectionOptionWrapper.getMqttConnectOptions());
            for (int i = 0; i < 10; i++) {
                MqttMessage message = new MqttMessage("hello mq4Iot pub sub msg".getBytes());
                message.setQos(qosLevel);
                /**
                 * Topik ke mana pesan normal dikirim. Topik ini harus sama dengan topik ke mana konsumen berlangganan atau dapat dicocokkan menggunakan wildcard. 
                 */
                mqttClient.publish(mq4IotTopic, message);
                /**
                 * ApsaraMQ for MQTT mendukung pesan point-to-point (P2P). Jika produsen mengonfirmasi bahwa hanya konsumen dengan ID klien tertentu yang memerlukan pesan, produsen dapat mengirim pesan P2P ke konsumen. 
                 * Dalam pesan P2P, konsumen tidak perlu berlangganan topik ke mana produsen mengirim pesan. Dengan cara ini, logika konsumsi disederhanakan. Dalam pesan P2P, tentukan topik dalam format {{parentTopic}}/p2p/{{targetClientId}}. 
                 */
                final String p2pSendTopic = parentTopic + "/p2p/" + clientId;
                message = new MqttMessage("hello mq4Iot p2p msg".getBytes());
                message.setQos(qosLevel);
                mqttClient.publish(p2pSendTopic, message);
            }
            Thread.sleep(Long.MAX_VALUE);
        }
    }

Verifikasi hasil

Setelah produsen mengirim pesan dan konsumen berlangganan pesan, Anda dapat memeriksa jejak pesan di konsol ApsaraMQ for MQTT untuk memverifikasi apakah pesan dikirim dan diterima. Untuk informasi lebih lanjut, lihat Kueri Jejak Pesan.

Referensi

Gunakan ApsaraMQ for MQTT SDK untuk Java untuk Mengimplementasikan Pesan Antar Klien ApsaraMQ for MQTT dan Aplikasi Layanan Backend