Topik ini menjelaskan cara mengimplementasikan pesan antar klien ApsaraMQ for MQTT menggunakan ApsaraMQ for MQTT SDK untuk Java.
Prasyarat
- Membuat Sumber Daya
- Pasangan AccessKey telah diperoleh.
Lingkungan Pengembangan Terintegrasi (IDE) telah diinstal. Untuk informasi lebih lanjut, lihat IDE. Anda dapat menggunakan IntelliJ IDEA atau Eclipse. Dalam contoh ini, IntelliJ IDEA digunakan.
Java Development Kit (JDK) telah diinstal. Untuk informasi lebih lanjut, lihat JDK.
Informasi latar belakang
ApsaraMQ for MQTT berlaku untuk skenario di mana klien ApsaraMQ for MQTT saling berinteraksi. Dalam skenario ini, baik produsen maupun konsumen adalah klien ApsaraMQ for MQTT. Setiap klien ApsaraMQ for MQTT menggunakan SDK klien ApsaraMQ for MQTT untuk terhubung ke broker ApsaraMQ for MQTT guna bertukar pesan.

Topik ini menjelaskan cara menggunakan ApsaraMQ for MQTT SDK untuk Java untuk mengimplementasikan pesan antar klien ApsaraMQ for MQTT melalui Internet.
Titik akhir
Sebelum menggunakan SDK klien dan SDK cloud untuk mengakses ApsaraMQ for MQTT, Anda harus menentukan titik akhir dari instance ApsaraMQ for MQTT dalam kode SDK. Dengan cara ini, klien ApsaraMQ for MQTT dan aplikasi layanan backend dapat menggunakan titik akhir untuk mengakses broker ApsaraMQ for MQTT.
Format Titik Akhir untuk SDK Klien
Ketika menggunakan SDK klien untuk menghubungkan klien ke ApsaraMQ for MQTT, tentukan titik akhir dalam salah satu format berikut:
Public Endpoint:
ID dari instance ApsaraMQ for MQTT.mqtt.aliyuncs.comVPC Endpoint:
ID dari instance ApsaraMQ for MQTT-internal-vpc.mqtt.aliyuncs.com
Anda juga dapat melihat titik akhir untuk SDK klien pada tab Endpoints halaman Instance Details di Konsol ApsaraMQ for MQTT.
Format Titik Akhir untuk SDK Cloud
Ketika menggunakan SDK cloud untuk menghubungkan aplikasi layanan backend ke ApsaraMQ for MQTT, tentukan titik akhir dalam salah satu format berikut:
PentingUntuk informasi tentang wilayah yang mendukung akses menggunakan SDK cloud, lihat Wilayah Akses SDK.
Public Endpoint:
ID dari instance ApsaraMQ for MQTT-server-internet.mqtt.aliyuncs.comVPC Endpoint:
ID dari instance ApsaraMQ for MQTT-server-internal.mqtt.aliyuncs.com
Anda dapat memperoleh ID dari instance ApsaraMQ for MQTT di bagian Basic Information halaman Instance Details di Konsol ApsaraMQ for MQTT.
Titik akhir untuk SDK klien dan SDK cloud dapat berupa titik akhir publik atau titik akhir VPC. Titik akhir publik adalah alamat IP untuk akses dari Internet dan biasanya digunakan dalam skenario IoT dan Internet seluler. Titik akhir pribadi adalah alamat IP untuk akses dari virtual private cloud (VPC) dan biasanya digunakan oleh aplikasi layanan backend untuk mengakses ApsaraMQ for MQTT.
Ketika menggunakan titik akhir untuk menghubungkan SDK ke ApsaraMQ for MQTT, gunakan nama domain alih-alih alamat IP. Alamat IP dapat berubah secara tak terduga. Tim teknis ApsaraMQ for MQTT tidak bertanggung jawab atas kesalahan dan kerugian langsung atau tidak langsung dalam skenario berikut:
Klien atau aplikasi layanan backend menggunakan alamat IP alih-alih nama domain untuk mengakses ApsaraMQ for MQTT. Alamat IP asli menjadi tidak valid setelah tim teknis ApsaraMQ for MQTT memperbarui resolusi nama domain.
Kebijakan firewall pada alamat IP disetel di jaringan tempat klien ApsaraMQ for MQTT atau aplikasi layanan backend Anda berjalan. Alamat IP baru diblokir karena kebijakan firewall setelah tim teknis ApsaraMQ for MQTT memperbarui resolusi nama domain.
Panggil SDK untuk Java untuk mengirim dan berlangganan pesan
Unduh SDK sumber terbuka pihak ketiga untuk Java. Tautan unduhan: Eclipse Paho Java Client.
Unduh demo dari ApsaraMQ for MQTT SDK untuk Java sebagai referensi selama pengembangan kode. Tautan unduhan: mqtt-java-demo.
Ekstrak paket proyek demo ke folder tertentu.
- Di IntelliJ IDEA, impor file yang diekstrak untuk membuat proyek dan periksa apakah file pom.xml berisi dependensi berikut:
<dependencies> <dependency> <groupId>commons-codec</groupId> <artifactId>commons-codec</artifactId> <version>1.10</version> </dependency> <dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.2</version> </dependency> <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpclient</artifactId> <version>4.5.2</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.83</version> </dependency> <dependency> <groupId>com.aliyun</groupId> <artifactId>aliyun-java-sdk-onsmqtt</artifactId> <version>1.0.3</version> </dependency> <dependency> <groupId>com.aliyun</groupId> <artifactId>aliyun-java-sdk-core</artifactId> <version>4.5.0</version> </dependency> </dependencies> - Di kelas MQ4IoTSendMessageToMQ4IoTUseSignatureMode.java, atur parameter berdasarkan komentar kode. Sebagian besar parameter ini dapat diatur berdasarkan sumber daya ApsaraMQ for MQTT yang Anda buat. Untuk informasi lebih lanjut, lihat Buat Sumber Daya. Gunakan fungsi main() untuk menjalankan kode sampel guna mengirim dan menerima pesan.Kode sampel:Catatan Sebelum menggunakan kode sampel untuk mengirim dan menerima pesan, Anda harus mengonfigurasi variabel lingkungan untuk mendapatkan kredensial yang digunakan untuk mengakses ApsaraMQ for MQTT. Untuk informasi tentang cara mengonfigurasi variabel lingkungan, lihat Konfigurasikan Kredensial Akses.
Nama variabel lingkungan untuk ID AccessKey yang digunakan untuk mengakses ApsaraMQ for MQTT adalah MQTT_AK_ENV, dan nama variabel lingkungan untuk Rahasia AccessKey yang digunakan untuk mengakses ApsaraMQ for MQTT adalah MQTT_SK_ENV.
package com.aliyun.openservices.lmq.example.demo; import com.aliyun.openservices.lmq.example.util.ConnectionOptionWrapper; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallbackExtended; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; public class MQ4IoTSendMessageToMQ4IoTUseSignatureMode { public static void main(String[] args) throws Exception { /** * ID dari instance ApsaraMQ for MQTT yang Anda buat di konsol ApsaraMQ for MQTT. */ String instanceId = "XXXXX"; /** * Titik akhir dari instance ApsaraMQ for MQTT. Anda dapat memperoleh titik akhir di halaman Detail Instance di konsol ApsaraMQ for MQTT. */ String endPoint = "XXXXX.mqtt.aliyuncs.com"; /** * ID AccessKey yang Anda buat di konsol RAM Alibaba Cloud untuk otentikasi identitas. * Pasangan AccessKey dari akun Alibaba Cloud dapat digunakan untuk mengakses semua operasi API. Untuk mencegah masalah keamanan, kami sarankan Anda menggunakan pengguna RAM untuk mengakses operasi API dan melakukan pemeliharaan rutin. * Kami sangat menyarankan agar Anda tidak menyimpan pasangan AccessKey di kode proyek. Jika tidak, pasangan AccessKey mungkin bocor dan semua sumber daya yang terkandung dalam akun Anda mungkin terpapar risiko keamanan potensial. * Dalam contoh ini, pasangan AccessKey disimpan dalam variabel lingkungan. */ String accessKey = System.getenv("MQTT_AK_ENV"); /** * Rahasia AccessKey yang Anda buat di konsol RAM Alibaba Cloud untuk otentikasi identitas. Rahasia AccessKey diperlukan hanya saat mode otentikasi tanda tangan digunakan. */ String secretKey = System.getenv("MQTT_SK_ENV"); /** * ID unik global yang sistem tetapkan ke klien ApsaraMQ for MQTT. ID klien harus bervariasi berdasarkan koneksi TCP. Jika beberapa koneksi TCP menggunakan ID klien yang sama, pengecualian terjadi dan koneksi ditutup secara tak terduga. * Nilai parameter clientId berada dalam format GroupID@@@DeviceID. GroupID menunjukkan ID grup yang Anda buat di konsol ApsaraMQ for MQTT dan DeviceID menunjukkan ID kustom perangkat. Nilai parameter clientId dapat mencapai hingga 64 karakter panjangnya. */ String clientId = "GID_XXXXX@@@XXXXX"; /** * Topik induk yang Anda buat di konsol ApsaraMQ for MQTT. * Jika Anda menentukan topik yang tidak ada atau topik yang klien ApsaraMQ for MQTT tidak berwenang untuk mengakses, broker ApsaraMQ for MQTT menutup koneksi. */ final String parentTopic = "XXXXX"; /** * ApsaraMQ for MQTT memungkinkan Anda menggunakan subtopik untuk memfilter pesan. Anda dapat menentukan string sebagai nilainya. Kode berikut menunjukkan contohnya. * Nilai parameter mq4IotTopic dapat mencapai hingga 128 karakter panjangnya. */ final String mq4IotTopic = parentTopic + "/" + "testMq4Iot"; /** * Tingkat kualitas layanan (QoS) dalam transmisi pesan. Nilai valid: 0, 1, dan 2. Untuk informasi lebih lanjut, lihat Istilah. */ final int qosLevel = 0; ConnectionOptionWrapper connectionOptionWrapper = new ConnectionOptionWrapper(instanceId, accessKey, secretKey, clientId); final MemoryPersistence memoryPersistence = new MemoryPersistence(); /** * Protokol dan port yang digunakan oleh klien ApsaraMQ for MQTT. Protokol dan port yang digunakan oleh klien ApsaraMQ for MQTT harus cocok. Jika enkripsi SSL digunakan, protokol dan port harus diatur ke ssl://endpoint:8883. */ final MqttClient mqttClient = new MqttClient("tcp://" + endPoint + ":1883", clientId, memoryPersistence); /** * Periode timeout selama klien ApsaraMQ for MQTT menunggu respons. Periode timeout mencegah klien ApsaraMQ for MQTT menunggu respons untuk periode waktu yang tidak terbatas. */ mqttClient.setTimeToWait(5000); final ExecutorService executorService = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); mqttClient.setCallback(new MqttCallbackExtended() { @Override public void connectComplete(boolean reconnect, String serverURI) { /** * Topik yang harus Anda langgani secepat mungkin setelah koneksi klien didirikan. */ System.out.println("koneksi berhasil"); executorService.submit(new Runnable() { @Override public void run() { try { final String topicFilter[] = {mq4IotTopic}; final int[] qos = {qosLevel}; mqttClient.subscribe(topicFilter, qos); } catch (MqttException e) { e.printStackTrace(); } } }); } @Override public void connectionLost(Throwable throwable) { throwable.printStackTrace(); } @Override public void messageArrived(String s, MqttMessage mqttMessage) throws Exception { /** * Panggil balik yang dipanggil untuk mengonsumsi pesan. Pastikan panggil balik tidak melempar pengecualian. Jika respons dikembalikan untuk panggil balik, pesan dikonsumsi. * Pesan harus dikonsumsi dalam periode yang ditentukan. Jika pesan tidak dikonsumsi dalam periode timeout yang ditentukan oleh broker ApsaraMQ for MQTT, broker mungkin mencoba mengirim ulang pesan. Pastikan deduplikasi dilakukan untuk memastikan idempotensi untuk konsumsi pesan. */ System.out.println( "menerima pesan dari topik " + s + " , isi adalah " + new String(mqttMessage.getPayload())); } @Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { System.out.println("pesan terkirim sukses topik adalah : " + iMqttDeliveryToken.getTopics()[0]); } }); mqttClient.connect(connectionOptionWrapper.getMqttConnectOptions()); for (int i = 0; i < 10; i++) { MqttMessage message = new MqttMessage("hello mq4Iot pub sub msg".getBytes()); message.setQos(qosLevel); /** * Topik ke mana pesan normal dikirim. Topik ini harus sama dengan topik ke mana konsumen berlangganan atau dapat dicocokkan menggunakan wildcard. */ mqttClient.publish(mq4IotTopic, message); /** * ApsaraMQ for MQTT mendukung pesan point-to-point (P2P). Jika produsen mengonfirmasi bahwa hanya konsumen dengan ID klien tertentu yang memerlukan pesan, produsen dapat mengirim pesan P2P ke konsumen. * Dalam pesan P2P, konsumen tidak perlu berlangganan topik ke mana produsen mengirim pesan. Dengan cara ini, logika konsumsi disederhanakan. Dalam pesan P2P, tentukan topik dalam format {{parentTopic}}/p2p/{{targetClientId}}. */ final String p2pSendTopic = parentTopic + "/p2p/" + clientId; message = new MqttMessage("hello mq4Iot p2p msg".getBytes()); message.setQos(qosLevel); mqttClient.publish(p2pSendTopic, message); } Thread.sleep(Long.MAX_VALUE); } }
Verifikasi hasil
Setelah produsen mengirim pesan dan konsumen berlangganan pesan, Anda dapat memeriksa jejak pesan di konsol ApsaraMQ for MQTT untuk memverifikasi apakah pesan dikirim dan diterima. Untuk informasi lebih lanjut, lihat Kueri Jejak Pesan.