全部产品
Search
文档中心

ApsaraMQ for MQTT:Ekspor data dari ApsaraMQ for MQTT ke ApsaraMQ for RocketMQ

更新时间:Jul 02, 2025

Jika Anda ingin menggunakan fitur tertentu dari ApsaraMQ for RocketMQ, seperti pesan terurut dan pesan transaksional dalam aplikasi cloud, Anda dapat menggunakan aturan masuk atau keluar untuk bertukar data antara ApsaraMQ for MQTT dan ApsaraMQ for RocketMQ. Topik ini menjelaskan cara mengekspor data dari ApsaraMQ for MQTT ke ApsaraMQ for RocketMQ.

Informasi latar belakang

ApsaraMQ for MQTT mendukung SDK cloud. Anda dapat menghubungkan aplikasi cloud ke broker ApsaraMQ for MQTT untuk mengirim dan menerima pesan menggunakan SDK cloud. Untuk informasi lebih lanjut tentang penggunaan SDK cloud, lihat Ikhtisar.

ApsaraMQ for MQTT juga mendukung pertukaran data antara ApsaraMQ for MQTT dan layanan Alibaba Cloud lainnya. Saat ini, Anda hanya dapat bertukar data antara ApsaraMQ for MQTT dan ApsaraMQ for RocketMQ.

Topik ini menjelaskan cara mengekspor data dari ApsaraMQ for MQTT ke ApsaraMQ for RocketMQ melalui Internet menggunakan SDK for Java.

Dalam skenario ini, Anda dapat menggunakan SDK sumber terbuka pihak ketiga untuk berbagai bahasa pemrograman guna mengirim dan menerima pesan. Untuk informasi lebih lanjut, lihat Unduh SDK.

quick_start_data_outflow

Akses jaringan

ApsaraMQ for MQTT menyediakan Public Endpoint dan VPC Endpoint.

  • Public Endpoint adalah alamat IP yang digunakan untuk mengakses ApsaraMQ for MQTT melalui Internet. Umumnya, endpoint publik digunakan dalam skenario IoT dan mobile Internet.

  • VPC Endpoint adalah alamat IP yang digunakan untuk mengakses ApsaraMQ for MQTT di private virtual cloud (VPC). Umumnya, endpoint VPC digunakan oleh aplikasi cloud untuk terhubung ke ApsaraMQ for MQTT.

Penting

Jika Anda ingin menggunakan endpoint untuk menghubungkan klien ke ApsaraMQ for MQTT, gunakan nama domain alih-alih alamat IP karena alamat IP berubah secara dinamis. Tim teknis ApsaraMQ for MQTT tidak bertanggung jawab atas kesalahan dan kerugian langsung atau tidak langsung dalam skenario berikut:

  • Anda menggunakan alamat IP untuk mengakses klien ke ApsaraMQ for MQTT. Setelah tim teknis ApsaraMQ for MQTT memperbarui resolusi nama domain, alamat IP asli menjadi tidak valid.

  • Kebijakan firewall pada alamat IP disetel di jaringan tempat klien Anda berjalan. Setelah tim teknis ApsaraMQ for MQTT memperbarui resolusi nama domain, alamat IP baru diblokir karena kebijakan firewall.

Prasyarat

  • Lingkungan pengembangan terintegrasi (IDE) telah diinstal. Untuk informasi lebih lanjut, lihat IDE. Anda dapat menggunakan IntelliJ IDEA atau Eclipse. Dalam contoh ini, IntelliJ IDEA digunakan.

  • Java 8 atau 11 telah diinstal. Untuk informasi lebih lanjut, lihat Unduhan Java.

  • Instansi ApsaraMQ for MQTT telah dibuat, serta topik dan grup telah dikonfigurasi pada instansinya. Untuk informasi lebih lanjut, lihat Buat Sumber Daya.

  • Instansi ApsaraMQ for RocketMQ telah dibuat, serta topik dan grup telah dikonfigurasi pada instansinya. Untuk informasi lebih lanjut, lihat Langkah 2: Buat Sumber Daya.

Penting
  • Anda dapat menggunakan aturan keluar data dari ApsaraMQ for MQTT untuk mengekspor data hanya ke instansi ApsaraMQ for RocketMQ versi 4.x.

  • Anda tidak dapat menggunakan aturan keluar data dari ApsaraMQ for MQTT lintas wilayah. Saat membuat aturan keluar data, pastikan bahwa sumber daya ApsaraMQ for MQTT dan ApsaraMQ for RocketMQ berada di wilayah yang sama.

1. Buat aturan keluar data

  1. Masuk ke Konsol ApsaraMQ for MQTT. Di panel navigasi sebelah kiri, klik Instances.

  2. Di bilah navigasi atas, pilih wilayah tempat instansi yang ingin Anda kelola berada. Pada halaman Instansi, klik nama instansi untuk pergi ke halaman Instance Details.

  3. Di panel navigasi sebelah kiri, klik Rules. Di sudut kiri atas halaman Aturan, klik Create Rule.

  4. Dalam wizard Create Rule, lakukan langkah-langkah berikut:

    1. Pada langkah Konfigurasikan Informasi Dasar, masukkan ID aturan dan pilih Data Keluar untuk parameter Jenis Aturan.

      image

    2. Pada langkah Konfigurasikan Sumber Aturan, pilih topik yang telah dibuat pada instansi ApsaraMQ for MQTT.

      image

    3. Pada langkah Konfigurasikan Tujuan Aturan, pilih instansi ApsaraMQ for RocketMQ yang telah dibuat dan topik yang telah dibuat pada instansi tersebut.

      image

2. Siapkan kode uji

2.1 Unduh kode sampel

  1. Unduh proyek demo mqtt-java-demo dan ekstrak paket proyek demo ke folder di mesin lokal Anda.

  2. Di proyek demo yang diekstrak, temukan folder lmq-java-demo, impor folder ke IntelliJ IDEA, dan konfirmasi apakah dependensi berikut termasuk dalam file pom.xml:

    <dependencies>
        <dependency>
            <groupId>org.bouncycastle</groupId>
            <artifactId>bcprov-jdk15on</artifactId>
            <version>1.70</version>
        </dependency>
        <dependency>
            <groupId>commons-codec</groupId>
            <artifactId>commons-codec</artifactId>
            <version>1.10</version>
        </dependency>
        <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
            <version>1.2.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpclient</artifactId>
            <version>4.5.2</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.83</version>
        </dependency>
        <dependency>
            <groupId>com.aliyun.openservices</groupId>
            <artifactId>ons-client</artifactId>
            <version>1.8.5.Final</version>
        </dependency>
        <dependency>
            <groupId>com.aliyun</groupId>
            <artifactId>aliyun-java-sdk-onsmqtt</artifactId>
            <version>1.0.3</version>
        </dependency>
        <dependency>
            <groupId>com.aliyun</groupId>
            <artifactId>aliyun-java-sdk-core</artifactId>
            <version>4.5.0</version>
        </dependency>
    </dependencies>
  3. Konfigurasikan kredensial akses.

    • Peroleh pasangan AccessKey. Untuk informasi tentang cara memperoleh pasangan AccessKey, lihat Buat Pasangan AccessKey.

    • Konfigurasikan variabel lingkungan. Nama variabel lingkungan untuk ID AccessKey yang digunakan untuk mengakses ApsaraMQ for MQTT adalah MQTT_AK_ENV, dan nama variabel lingkungan untuk rahasia AccessKey yang digunakan untuk mengakses ApsaraMQ for MQTT adalah MQTT_SK_ENV. Untuk informasi tentang cara mengonfigurasi variabel lingkungan, lihat Konfigurasikan Kredensial Akses.

2.2 Konfigurasikan kode untuk perpesanan

Kelas MQ4IoTSendMessageToRocketMQ.java berisi kode untuk mengirim pesan menggunakan ApsaraMQ for MQTT dan menerima pesan menggunakan ApsaraMQ for RocketMQ. Anda harus menentukan parameter untuk sumber daya ApsaraMQ for MQTT dan ApsaraMQ for RocketMQ berdasarkan komentar dalam kode.

Kode Sampel untuk Perpesanan

import com.aliyun.openservices.lmq.example.util.ConnectionOptionWrapper;
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import java.util.Properties;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class MQ4IoTSendMessageToRocketMQ {
    public static void main(String[] args) throws Exception {
        /**
         * Inisialisasi klien ApsaraMQ for RocketMQ sebagai penerima. Dalam sebagian besar skenario bisnis, penerima ditempatkan pada aplikasi backend. 
         */
        Properties properties = new Properties();
        /**
         * ID grup yang Anda buat di konsol ApsaraMQ for RocketMQ. 
         */
        properties.setProperty(PropertyKeyConst.GROUP_ID, "GID-XXXXX");
        
        /**
         * ID AccessKey yang Anda buat di konsol Resource Access Management (RAM) Alibaba Cloud untuk otentikasi identitas. 
         * Pasangan AccessKey akun Alibaba Cloud memiliki izin untuk semua operasi API. Untuk mencegah risiko keamanan, kami sarankan Anda menggunakan pengguna RAM untuk memanggil operasi API atau melakukan O&M rutin. 
         * Kami sangat menyarankan agar Anda tidak menyimpan pasangan AccessKey di kode proyek. Jika tidak, pasangan AccessKey mungkin bocor dan semua sumber daya di akun Anda mungkin terpapar pada risiko keamanan potensial. 
         * Dalam contoh ini, ID AccessKey dan rahasia AccessKey disimpan dalam variabel lingkungan. 
         */
        properties.put(PropertyKeyConst.AccessKey, System.getenv("MQTT_AK_ENV"));
        /**
         * Rahasia AccessKey yang Anda buat di konsol RAM Alibaba Cloud untuk otentikasi identitas. Parameter ini diperlukan hanya jika Anda menggunakan mode autentikasi tanda tangan. 
         */
        properties.put(PropertyKeyConst.SecretKey, System.getenv("MQTT_SK_ENV"));
      
         /**
         * Titik akhir TCP yang ingin Anda gunakan untuk mengakses instansi ApsaraMQ for RocketMQ. Anda dapat memperoleh titik akhir TCP di halaman Detail Instansi di konsol ApsaraMQ for RocketMQ. 
         */
        properties.put(PropertyKeyConst.NAMESRV_ADDR, "http://xxxxx.XXXXX.mq-internet.aliyuncs.com");
        /**
         * Topik yang Anda buat di konsol ApsaraMQ for RocketMQ. 
         * Saat Anda bertukar data antara ApsaraMQ for RocketMQ dan ApsaraMQ for MQTT, hanya topik induk yang dapat digunakan oleh klien ApsaraMQ for RocketMQ. 
         */
        final String parentTopic = "XXXXX";
        Consumer consumer = ONSFactory.createConsumer(properties);
        consumer.subscribe(parentTopic, "*", new MessageListener() {
            public Action consume(Message message, ConsumeContext consumeContext) {
                System.out.println("recv msg:" + message);
                return Action.CommitMessage;
            }
        });
        consumer.start();
        //////////////////////////////////////////////////////////////////////////////////////////////////////////////////
        /**
         * Inisialisasi klien ApsaraMQ for MQTT sebagai pengirim. Dalam sebagian besar skenario bisnis, pengirim ditempatkan pada terminal seluler. 
         */

        /**
         * ID instansi ApsaraMQ for MQTT yang Anda buat di konsol. 
         */
        String instanceId = "XXXXX";
         /**
         * Titik akhir yang ingin Anda gunakan untuk mengakses instansi ApsaraMQ for MQTT. Anda dapat memperoleh titik akhir di halaman Detail Instansi di konsol ApsaraMQ for MQTT. 
         */
        String endPoint = "XXXXXX.mqtt.aliyuncs.com";
        /**
         * ID AccessKey yang Anda buat di konsol RAM Alibaba Cloud untuk otentikasi identitas. 
         * Pasangan AccessKey akun Alibaba Cloud memiliki izin untuk semua operasi API. Untuk mencegah risiko keamanan, kami sarankan Anda menggunakan pengguna RAM untuk memanggil operasi API atau melakukan O&M rutin. 
         * Kami sangat menyarankan agar Anda tidak menyimpan pasangan AccessKey di kode proyek. Jika tidak, pasangan AccessKey mungkin bocor dan semua sumber daya di akun Anda mungkin terpapar pada risiko keamanan potensial. 
         * Dalam contoh ini, ID AccessKey dan rahasia AccessKey disimpan dalam variabel lingkungan. 
         */
        String accessKey = System.getenv("MQTT_AK_ENV");
        /**
         * Rahasia AccessKey yang Anda buat di konsol RAM Alibaba Cloud untuk otentikasi identitas. Parameter ini diperlukan hanya jika Anda menggunakan mode autentikasi tanda tangan. 
         */
        String secretKey = System.getenv("MQTT_SK_ENV");
        /**
         * ID unik global yang sistem tetapkan ke klien ApsaraMQ for MQTT. ID klien harus bervariasi berdasarkan koneksi TCP. Jika beberapa koneksi TCP menggunakan ID klien yang sama, pengecualian terjadi dan koneksi ditutup secara tak terduga. 
         * ID klien terdiri dari ID grup dan ID perangkat dan berformat GroupID@@@DeviceID. ID grup adalah ID grup yang Anda buat di konsol ApsaraMQ for MQTT. ID perangkat adalah ID kustom yang Anda tentukan. ID klien tidak boleh melebihi 64 karakter panjangnya. 
         */
        String clientId = "GID_XXXX@@@XXXXX";
       /**
         * ApsaraMQ for MQTT memungkinkan Anda menggunakan subtopik untuk memfilter pesan. Anda dapat menentukan string sebagai nama subtopik. 
         * Nilai parameter mq4IotTopic dapat mencapai hingga 128 karakter panjangnya. 
         */
        final String mq4IotTopic = parentTopic + "/" + "testMq4Iot";
        /**
         * Tingkat kualitas layanan (QoS) untuk transmisi pesan. Nilai valid: 0, 1, dan 2. Untuk informasi lebih lanjut, lihat Istilah. 
         */
        final int qosLevel = 0;
        ConnectionOptionWrapper connectionOptionWrapper = new ConnectionOptionWrapper(instanceId, accessKey, secretKey, clientId);
        final MemoryPersistence memoryPersistence = new MemoryPersistence();
         /**
         * Protokol dan port yang digunakan oleh klien ApsaraMQ for MQTT. Protokol dan port yang digunakan oleh klien ApsaraMQ for MQTT harus cocok. Jika enkripsi Secure Sockets Layer (SSL) digunakan, tentukan ssl://endpoint:8883 sebagai protokol dan port. 
         */
        final MqttClient mqttClient = new MqttClient("tcp://" + endPoint + ":1883", clientId, memoryPersistence);
        /**
         * Periode timeout saat klien ApsaraMQ for MQTT menunggu respons. Periode timeout mencegah klien ApsaraMQ for MQTT menunggu respons untuk periode waktu yang tidak terbatas. 
         */
        mqttClient.setTimeToWait(5000);
        mqttClient.setCallback(new MqttCallbackExtended() {
            @Override
            public void connectComplete(boolean reconnect, String serverURI) {
                /**
                 * Topik yang harus dilanggan konsumen sesegera mungkin setelah koneksi klien didirikan. 
                 */
                System.out.println("koneksi berhasil");
            }

            @Override
            public void connectionLost(Throwable throwable) {
                throwable.printStackTrace();
            }

            @Override
            public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
            }

            @Override
            public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                System.out.println("pesan terkirim sukses topik adalah : " + iMqttDeliveryToken.getTopics()[0]);
            }
        });
        mqttClient.connect(connectionOptionWrapper.getMqttConnectOptions());
        for (int i = 0; i < 10; i++) {
            MqttMessage message = new MqttMessage("hello mq4Iot pub sub msg".getBytes());
            message.setQos(qosLevel);
            /**
             * Topik ke mana pesan dikirim. Jika pesan yang dikirim adalah pesan normal, topik harus merupakan topik yang dilanggan konsumen atau topik yang dapat dicocokkan menggunakan wildcard. 
             */
            mqttClient.publish(mq4IotTopic, message);
        }
        Thread.sleep(Long.MAX_VALUE);

    }

}

3. Verifikasi hasil

Anda dapat mengeksekusi fungsi utama di kelas MQ4IoTSendMessageToRocketMQ.java dan kemudian menggunakan salah satu metode berikut untuk memverifikasi pengiriman dan konsumsi pesan.

Gunakan kode

Jika kode serupa dengan kode pada gambar berikut ditampilkan, pesan dikirim oleh ApsaraMQ for MQTT dan dikonsumsi oleh ApsaraMQ for RocketMQ.

image

Gunakan konsol

  • Periksa Apakah Pesan Dikirim. Buka halaman detail instansi di konsol ApsaraMQ for MQTT dan klik Query Jejak Pesan di panel navigasi sebelah kiri. Pada halaman Query Jejak Pesan, periksa apakah pesan dikirim menggunakan ID grup dan ID perangkat.

    image

  • Periksa Apakah Pesan Dikonsumsi. Buka halaman detail instansi di konsol ApsaraMQ for RocketMQ dan klik Message Query di panel navigasi sebelah kiri. Pada halaman Query Pesan, periksa apakah pesan diteruskan ke ApsaraMQ for RocketMQ berdasarkan topik, seperti yang ditunjukkan pada gambar berikut.

    image

    Klik Message Trace di kolom Actions pesan yang diquery. Jika informasi serupa dengan informasi pada gambar berikut ditampilkan, pesan dikonsumsi.

    image

Referensi