全部产品
Search
文档中心

ApsaraMQ for MQTT:Impor data dari ApsaraMQ for RocketMQ ke ApsaraMQ for MQTT

更新时间:Jul 02, 2025

Jika Anda ingin menggunakan fitur tertentu dari ApsaraMQ for RocketMQ, seperti pesan terurut dan pesan transaksional, dalam aplikasi cloud, Anda dapat menggunakan aturan arah masuk atau arah keluar untuk bertukar data antara ApsaraMQ for MQTT dan ApsaraMQ for RocketMQ. Topik ini menjelaskan cara mengimpor data dari ApsaraMQ for RocketMQ ke ApsaraMQ for MQTT.

Informasi latar belakang

ApsaraMQ for MQTT mendukung SDK cloud. Anda dapat menghubungkan aplikasi cloud ke broker ApsaraMQ for MQTT untuk mengirim dan menerima pesan menggunakan SDK cloud. Untuk informasi lebih lanjut tentang penggunaan SDK cloud, lihat Ikhtisar.

ApsaraMQ for MQTT juga mendukung pertukaran data antara ApsaraMQ for MQTT dan layanan Alibaba Cloud lainnya. Saat ini, Anda hanya dapat bertukar data antara ApsaraMQ for MQTT dan ApsaraMQ for RocketMQ.

Topik ini menjelaskan cara mengimpor data dari ApsaraMQ for RocketMQ ke ApsaraMQ for MQTT melalui Internet menggunakan SDK for Java.

quick_start_data_inflow

Akses jaringan

ApsaraMQ for MQTT menyediakan Public Endpoint dan VPC Endpoint.

  • Public Endpoint adalah alamat IP yang digunakan untuk mengakses ApsaraMQ for MQTT melalui Internet. Dalam banyak kasus, endpoint publik digunakan dalam skenario IoT dan mobile Internet.

  • VPC Endpoint adalah alamat IP yang digunakan untuk mengakses ApsaraMQ for MQTT di private virtual cloud (VPC). Dalam banyak kasus, endpoint VPC digunakan oleh aplikasi cloud untuk terhubung ke ApsaraMQ for MQTT.

Penting

Jika Anda ingin menggunakan endpoint untuk menghubungkan klien ke ApsaraMQ for MQTT, gunakan nama domain alih-alih alamat IP karena alamat IP berubah secara dinamis. Tim teknis ApsaraMQ for MQTT tidak bertanggung jawab atas titik kegagalan dan kerugian langsung atau tidak langsung dalam skenario berikut:

  • Anda menggunakan alamat IP untuk mengakses klien ke ApsaraMQ for MQTT. Setelah tim teknis ApsaraMQ for MQTT memperbarui resolusi nama domain, alamat IP asli menjadi tidak valid.

  • Kebijakan firewall pada alamat IP ditetapkan di jaringan tempat klien Anda berjalan. Setelah tim teknis ApsaraMQ for MQTT memperbarui resolusi nama domain, alamat IP baru diblokir karena kebijakan firewall.

Prasyarat

  • Lingkungan pengembangan terintegrasi (IDE) telah diinstal. Untuk informasi lebih lanjut, lihat IDE. Anda dapat menggunakan IntelliJ IDEA atau Eclipse. Dalam contoh ini, IntelliJ IDEA digunakan.

  • Java 8 atau 11 telah diinstal. Untuk informasi lebih lanjut, lihat Unduhan Java.

  • Instansi ApsaraMQ for MQTT telah dibuat, dan topik serta grup telah dibuat di instansinya. Untuk informasi lebih lanjut, lihat Buat Sumber Daya.

  • Instansi ApsaraMQ for RocketMQ telah dibuat, dan topik serta grup telah dibuat di instansinya. Untuk informasi lebih lanjut, lihat Langkah 2: Buat Sumber Daya.

Penting
  • Anda dapat menggunakan aturan arah masuk data dari ApsaraMQ for MQTT untuk mengimpor data hanya dari instansi ApsaraMQ for RocketMQ versi 4.x.

  • Anda tidak dapat menggunakan aturan arah masuk data dari ApsaraMQ for MQTT lintas wilayah. Saat membuat aturan arah masuk data, pastikan bahwa sumber daya ApsaraMQ for MQTT dan ApsaraMQ for RocketMQ berada di wilayah yang sama.

1. Buat aturan arah masuk data

  1. Masuk ke Konsol ApsaraMQ for MQTT. Di panel navigasi sebelah kiri, klik Instances.

  2. Di bilah navigasi atas, pilih wilayah tempat instansi yang ingin Anda kelola berada. Di halaman Instansi, klik nama instansi untuk pergi ke halaman Instance Details.

  3. Di panel navigasi sebelah kiri, klik Rules. Di sudut kiri atas halaman Aturan, klik Create Rule.

  4. Di panduan Create Rule, lakukan langkah-langkah berikut:

    1. Di langkah Konfigurasikan Informasi Dasar, tentukan ID aturan dan pilih Data Masuk untuk parameter Tipe Aturan.

      image

    2. Di langkah Konfigurasikan Sumber Aturan, pilih instansi ApsaraMQ for RocketMQ yang telah dibuat dan topik yang telah dibuat di instansi tersebut.

      image

    3. Di langkah Konfigurasikan Tujuan Aturan, pilih topik yang telah dibuat di instansi ApsaraMQ for MQTT.

      image

2. Siapkan kode uji

2.1 Unduh kode sampel

  1. Unduh proyek demo mqtt-java-demo dan ekstrak paket proyek demo ke folder di mesin lokal Anda.

  2. Di proyek demo yang diekstrak, temukan folder lmq-java-demo, impor folder ke IntelliJ IDEA, lalu konfirmasi apakah dependensi berikut termasuk dalam file pom.xml:

    <dependencies>
        <dependency>
            <groupId>org.bouncycastle</groupId>
            <artifactId>bcprov-jdk15on</artifactId>
            <version>1.70</version>
        </dependency>
        <dependency>
            <groupId>commons-codec</groupId>
            <artifactId>commons-codec</artifactId>
            <version>1.10</version>
        </dependency>
        <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
            <version>1.2.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpclient</artifactId>
            <version>4.5.2</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.83</version>
        </dependency>
        <dependency>
            <groupId>com.aliyun.openservices</groupId>
            <artifactId>ons-client</artifactId>
            <version>1.8.5.Final</version>
        </dependency>
        <dependency>
            <groupId>com.aliyun</groupId>
            <artifactId>aliyun-java-sdk-onsmqtt</artifactId>
            <version>1.0.3</version>
        </dependency>
        <dependency>
            <groupId>com.aliyun</groupId>
            <artifactId>aliyun-java-sdk-core</artifactId>
            <version>4.5.0</version>
        </dependency>
    </dependencies>
  3. Konfigurasikan kredensial akses.

    • Peroleh pasangan AccessKey. Untuk informasi tentang cara memperoleh pasangan AccessKey, lihat Buat Pasangan AccessKey.

    • Konfigurasikan variabel lingkungan. Nama variabel lingkungan untuk ID AccessKey yang digunakan untuk mengakses ApsaraMQ for MQTT adalah MQTT_AK_ENV, dan nama variabel lingkungan untuk Rahasia AccessKey yang digunakan untuk mengakses ApsaraMQ for MQTT adalah MQTT_SK_ENV. Untuk informasi tentang cara mengonfigurasi variabel lingkungan, lihat Konfigurasikan Kredensial Akses.

2.2 Konfigurasikan kode untuk perpesanan

Kelas RocketMQSendMessageToMQ4IoT.java berisi kode untuk mengirim pesan menggunakan ApsaraMQ for RocketMQ dan menerima pesan menggunakan ApsaraMQ for MQTT. Anda harus menentukan parameter untuk sumber daya ApsaraMQ for RocketMQ dan ApsaraMQ for MQTT berdasarkan komentar dalam kode.

Saat menguji perpesanan, Anda dapat mengomentari kode terkait pengiriman pesan point-to-point (P2P). Contoh kode:

Contoh Kode untuk Perpesanan

import com.aliyun.openservices.lmq.example.util.ConnectionOptionWrapper;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.SendResult;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class RocketMQSendMessageToMQ4IoT {
    public static void main(String[] args) throws Exception {
        /**
         * Inisialisasi klien ApsaraMQ for RocketMQ sebagai pengirim. Dalam sebagian besar skenario bisnis, pengirim ditempatkan di aplikasi backend. 
         */
        Properties properties = new Properties();
        /**
         * ID grup yang Anda buat di konsol ApsaraMQ for RocketMQ. 
         */
        properties.setProperty(PropertyKeyConst.GROUP_ID, "GID-XXXXX");
        /**
         * ID AccessKey yang Anda buat di konsol Resource Access Management (RAM) Alibaba Cloud untuk otentikasi identitas. 
         * Pasangan AccessKey akun Alibaba Cloud memiliki izin untuk semua operasi API. Untuk mencegah risiko keamanan, kami sarankan Anda menggunakan pengguna RAM untuk memanggil operasi API atau melakukan pemeliharaan rutin. 
         * Kami sangat menyarankan Anda untuk tidak menyimpan pasangan AccessKey di kode proyek. Jika tidak, pasangan AccessKey mungkin bocor dan semua sumber daya di akun Anda mungkin terpapar pada risiko keamanan potensial. 
         * Dalam contoh ini, ID AccessKey dan Rahasia AccessKey disimpan di variabel lingkungan. 
         */
        properties.put(PropertyKeyConst.AccessKey, System.getenv("MQTT_AK_ENV"));
        /**
         * Rahasia AccessKey yang Anda buat di konsol RAM Alibaba Cloud untuk otentikasi identitas. Rahasia AccessKey diperlukan hanya jika Anda menggunakan mode otentikasi tanda tangan. 
         */
        properties.put(PropertyKeyConst.SecretKey, System.getenv("MQTT_SK_ENV"));
        /**
         * Endpoint TCP yang digunakan untuk mengakses instansi ApsaraMQ for RocketMQ. Anda dapat memperoleh endpoint TCP di halaman Detail Instansi di konsol ApsaraMQ for RocketMQ. 
         */
        properties.put(PropertyKeyConst.NAMESRV_ADDR, "XXXX");
        /**
         * Topik ApsaraMQ for RocketMQ. Anda dapat membuat topik di konsol ApsaraMQ for RocketMQ. 
         * Saat bertukar data antara ApsaraMQ for RocketMQ dan ApsaraMQ for MQTT, hanya topik induk yang dapat digunakan oleh klien ApsaraMQ for RocketMQ. 
         */
        final String parentTopic = "XXXXX";
        Producer producer = ONSFactory.createProducer(properties);
        producer.start();
        //////////////////////////////////////////////////////////////////////////////////////////////////////////////////
        /**
         * Inisialisasi klien ApsaraMQ for MQTT sebagai penerima. Dalam sebagian besar skenario bisnis, penerima ditempatkan di terminal seluler. 
         */

        /**
         * ID instansi ApsaraMQ for MQTT yang Anda buat di konsol ApsaraMQ for MQTT. 
         */
        String instanceId = "XXXXX";
        /**
         * Endpoint yang ingin Anda gunakan untuk mengakses instansi ApsaraMQ for MQTT. Anda dapat memperoleh endpoint di halaman Detail Instansi di konsol ApsaraMQ for MQTT. 
         */
        String endPoint = "XXXXXX.mqtt.aliyuncs.com";
        /**
         * ID AccessKey yang Anda buat di konsol RAM Alibaba Cloud untuk otentikasi identitas. 
         * Pasangan AccessKey akun Alibaba Cloud memiliki izin untuk semua operasi API. Untuk mencegah risiko keamanan, kami sarankan Anda menggunakan pengguna RAM untuk memanggil operasi API atau melakukan pemeliharaan rutin. 
         * Kami sangat menyarankan Anda untuk tidak menyimpan pasangan AccessKey di kode proyek. Jika tidak, pasangan AccessKey mungkin bocor dan semua sumber daya di akun Anda mungkin terpapar pada risiko keamanan potensial. 
         * Dalam contoh ini, ID AccessKey dan Rahasia AccessKey disimpan di variabel lingkungan. 
         */
        String accessKey = System.getenv("MQTT_AK_ENV");
        /**
         * Rahasia AccessKey yang Anda buat di konsol RAM Alibaba Cloud untuk otentikasi identitas. Parameter ini diperlukan hanya jika Anda menggunakan mode otentikasi tanda tangan. 
         */
        String secretKey = System.getenv("MQTT_SK_ENV");
        /**
         * ID unik global yang sistem tetapkan untuk klien ApsaraMQ for MQTT. ID klien harus bervariasi berdasarkan koneksi TCP. Jika beberapa koneksi TCP menggunakan ID klien yang sama, pengecualian akan terjadi dan koneksi akan ditutup secara tak terduga. 
         * ID klien terdiri dari ID grup dan ID perangkat dan dalam format GroupID@@@DeviceID. ID grup adalah ID grup yang Anda buat di konsol ApsaraMQ for MQTT. ID perangkat adalah ID kustom yang Anda tentukan. ID klien tidak boleh melebihi 64 karakter panjangnya. 
         */
        String clientId = "GID_XXXX@@@XXXXX";
        /**
         * ApsaraMQ for MQTT memungkinkan Anda menggunakan subtopik untuk memfilter pesan. Anda dapat menentukan string sebagai nama subtopik. 
         * Nilai parameter mq4IotTopic dapat mencapai hingga 128 karakter panjangnya. 
         */
        final String subTopic = "/testMq4Iot";
        final String mq4IotTopic = parentTopic + subTopic;
        /**
         * Tingkat kualitas layanan (QoS) untuk transmisi pesan. Nilai valid: 0, 1, dan 2. Untuk informasi lebih lanjut, lihat Istilah. 
         */
        final int qosLevel = 0;
        ConnectionOptionWrapper connectionOptionWrapper = new ConnectionOptionWrapper(instanceId, accessKey, secretKey, clientId);
        final MemoryPersistence memoryPersistence = new MemoryPersistence();
        /**
         * Protokol dan port yang digunakan oleh klien ApsaraMQ for MQTT. Protokol dan port yang digunakan oleh klien ApsaraMQ for MQTT harus cocok. Jika enkripsi Secure Sockets Layer (SSL) digunakan, tentukan ssl://endpoint:8883 sebagai protokol dan port. 
         */
        final MqttClient mqttClient = new MqttClient("tcp://" + endPoint + ":1883", clientId, memoryPersistence);
        /**
         * Periode timeout saat klien ApsaraMQ for MQTT menunggu respons. Periode timeout mencegah klien ApsaraMQ for MQTT menunggu respons untuk periode waktu yang tidak terbatas. 
         */
        mqttClient.setTimeToWait(5000);
        final ExecutorService executorService = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>());
        mqttClient.setCallback(new MqttCallbackExtended() {
            @Override
            public void connectComplete(boolean reconnect, String serverURI) {
                /**
                 * Topik yang harus dilanggan konsumen sesegera mungkin setelah koneksi klien didirikan. 
                 */
                System.out.println("koneksi berhasil");
                executorService.submit(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            final String topicFilter[] = {mq4IotTopic};
                            final int[] qos = {qosLevel};
                            mqttClient.subscribe(topicFilter, qos);
                        } catch (MqttException e) {
                            e.printStackTrace();
                        }
                    }
                });
            }

            @Override
            public void connectionLost(Throwable throwable) {
                throwable.printStackTrace();
            }

            @Override
            public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
                /**
                 * Callback yang dipanggil untuk mengonsumsi pesan yang diterbitkan. Pastikan callback tidak melempar pengecualian. Jika respons dikembalikan untuk callback, pesan dikonsumsi. 
                 * Pesan harus dikonsumsi dalam periode waktu tertentu. Jika pesan tidak dikonsumsi dalam periode timeout yang ditentukan oleh broker ApsaraMQ for MQTT, broker mungkin mencoba mengirim ulang pesan. Pastikan deduplikasi dilakukan untuk memastikan idempotensi untuk konsumsi pesan. 
                 */
                System.out.println(
                    "menerima pesan dari topik " + s + " , isi adalah " + new String(mqttMessage.getPayload()));
            }

            @Override
            public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                System.out.println("pesan terkirim sukses topik adalah : " + iMqttDeliveryToken.getTopics()[0]);
            }
        });
        mqttClient.connect(connectionOptionWrapper.getMqttConnectOptions());
        for (int i = 0; i < 10; i++) {
            /**
             * Tentukan topik induk sebagai topik dan MQ2MQTT sebagai tag saat menggunakan klien ApsaraMQ for RocketMQ untuk mengirim pesan ke klien ApsaraMQ for MQTT. 
             */
            Message msg = new Message(parentTopic, "MQ2MQTT", "hello mq send mqtt msg".getBytes());
            /**
             * Anda dapat menggunakan parameter MqttSecondTopic untuk menentukan subtopik saat menggunakan klien ApsaraMQ for RocketMQ untuk mengirim pesan ke klien ApsaraMQ for MQTT. 
             */
            msg.putUserProperties(PropertyKeyConst.MqttSecondTopic, subTopic);
            SendResult result = producer.send(msg);
            System.out.println(result);
//            /**
//             * Kirim pesan P2P dan tentukan subtopik. 
//             */
//            msg.putUserProperties(PropertyKeyConst.MqttSecondTopic, "/p2p/" + clientId);
//            result = producer.send(msg);
//            System.out.println(result);
        }
        Thread.sleep(Long.MAX_VALUE);

    }

}

3. Verifikasi hasil

Anda dapat memanggil fungsi utama di kelas RocketMQSendMessageToMQ4IoT.java dan kemudian menggunakan salah satu metode berikut untuk memverifikasi pengiriman dan konsumsi pesan.

Gunakan kode

Jika kode serupa dengan kode berikut ditampilkan, pesan dikirim oleh ApsaraMQ for RocketMQ dan dikonsumsi oleh ApsaraMQ for MQTT.

image

Gunakan konsol

  • Periksa Apakah Pesan Dikirim. Buka halaman detail instansi di konsol ApsaraMQ for RocketMQ lalu klik Message Query di panel navigasi sebelah kiri. Di halaman Kueri Pesan, periksa apakah pesan dikirim berdasarkan topik dan ID pesan.

    image

  • Periksa Apakah Pesan Dikonsumsi. Buka halaman detail instansi di konsol ApsaraMQ for MQTT dan klik Message trace query di panel navigasi sebelah kiri. Di halaman Kueri Jejak Pesan, periksa apakah pesan dikonsumsi berdasarkan ID pesan.

    image

Referensi