全部產品
Search
文件中心

ApsaraMQ for MQTT:MQTT資料流入規則的實現

更新時間:Aug 08, 2025

如果您的雲端應用需要使用雲訊息佇列 RocketMQ 版產品的某些功能,例如順序訊息特性、事務訊息特性等,您可以通過訊息流程入或流出規則將雲Message QueueTT 版雲訊息佇列 RocketMQ 版資料進行流轉。本文介紹如何將雲訊息佇列 RocketMQ 版產品的資料匯入雲Message QueueTT 版

背景資訊

雲Message QueueTT 版支援雲端SDK,雲上應用可直接通過雲端SDK接入雲Message QueueTT 版服務端進行訊息收發。雲端SDK使用,請參見雲端開發概述

同時雲Message QueueTT 版支援和其他雲產品進行互連,當前支援的雲產品有雲訊息佇列 RocketMQ 版

本文以公網環境中的Java SDK為例說明如何將雲訊息佇列 RocketMQ 版資料流入至雲Message QueueTT 版

quick_start_data_inflow

網路訪問

雲Message QueueTT 版同時提供了公网接入点VPC 接入点
  • 公网接入点為本地公網環境訪問的IP地址,一般用於物聯網和移動互連網情境中;
  • VPC 接入点為雲上私網訪問的IP地址,一般用於雲端應用接入雲Message QueueTT 版
重要 用戶端使用存取點串連服務時務必使用網域名稱接入,不得直接使用網域名稱背後的IP地址直接連接,因為IP地址隨時會變化。在以下使用方式中出現的問題雲Message QueueTT 版產品方概不負責:
  • 用戶端不使用網域名稱接入而是使用IP地址接入,產品方更新了網域名稱解析導致原有IP地址失效。
  • 用戶端網路對IP地址設定網路防火牆策略,產品方更新了網域名稱解析後新IP地址被您的防火牆策略攔截。

前提條件

  • 安裝IDE。您可以使用IntelliJ IDEA或者Eclipse,本文以IntelliJ IDEA為例。
  • 安裝Java 8或11。更多資訊,請參見安裝JDK

  • 已建立雲Message QueueTT 版執行個體、Topic和Group ID,具體操作,請參見建立資源

  • 已建立雲訊息佇列 RocketMQ 版執行個體、Topic和Group ID,具體操作,請參見步驟二:建立資源

重要
  • 雲Message QueueTT 版資料流入規則僅支援雲訊息佇列 RocketMQ 版4.x系列執行個體。

  • 雲Message QueueTT 版資料流入規則不能跨地區使用,因此,雲Message QueueTT 版雲訊息佇列 RocketMQ 版的資源都必須建立在同一地區。

1.建立資料流入規則

  1. 登入雲Message QueueTT 版控制台,並在左側導覽列單擊实例列表

  2. 在頂部功能表列選擇目標地區,然後在執行個體列表中單擊執行個體名稱進入实例详情頁面。

  3. 在左側導覽列單擊规则管理

  4. 规则管理頁面,單擊RocketMQ 规则頁簽,然後單擊创建规则

  5. 创建规则頁面完成以下操作。

    1. 配置基本資料。輸入規則ID,選擇資料流入的規則類型。

      image

    2. 配置規則源。選擇已經建立好的雲訊息佇列 RocketMQ 版的執行個體和Topic。

      image

    3. 配置規則目標。選擇已經建立好的雲Message QueueTT 版的Topic。

      image

2.準備測試代碼

2.1下載範例程式碼

  1. 下載mqtt-java-demo,並解壓該Demo工程包至您指定的檔案夾。

  2. 在解壓的Demo工程中找到lmq-java-demo檔案夾,將此檔案夾匯入IntelliJ IDEA,並確認pom.xml中已包含以下依賴。

    <dependencies>
        <dependency>
            <groupId>org.bouncycastle</groupId>
            <artifactId>bcprov-jdk15on</artifactId>
            <version>1.70</version>
        </dependency>
        <dependency>
            <groupId>commons-codec</groupId>
            <artifactId>commons-codec</artifactId>
            <version>1.10</version>
        </dependency>
        <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
            <version>1.2.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpclient</artifactId>
            <version>4.5.2</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.83</version>
        </dependency>
        <dependency>
            <groupId>com.aliyun.openservices</groupId>
            <artifactId>ons-client</artifactId>
            <version>1.8.5.Final</version>
        </dependency>
        <dependency>
            <groupId>com.aliyun</groupId>
            <artifactId>aliyun-java-sdk-onsmqtt</artifactId>
            <version>1.0.3</version>
        </dependency>
        <dependency>
            <groupId>com.aliyun</groupId>
            <artifactId>aliyun-java-sdk-core</artifactId>
            <version>4.5.0</version>
        </dependency>
    </dependencies>
  3. 配置訪問憑證。

    • 擷取AccessKey資訊。擷取方式,請參見建立AccessKey

    • 配置環境變數。雲Message QueueTT 版的AccessKey ID和AccessKey Secret的環境變數名稱分別為MQTT_AK_ENVMQTT_SK_ENV。關於配置環境變數的方法,請參見配置訪問憑證

2.2收發訊息代碼

RocketMQSendMessageToMQ4IoT.java類中包含了發送RocketMQ訊息和使用MQTT消費的代碼,按代碼注釋說明填寫雲訊息佇列 RocketMQ 版雲Message QueueTT 版資源的參數。

測試中可以將其中發送P2P訊息的相關代碼注釋掉,範例程式碼如下。

收發送訊息程式碼範例

import com.aliyun.openservices.lmq.example.util.ConnectionOptionWrapper;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.SendResult;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class RocketMQSendMessageToMQ4IoT {
    public static void main(String[] args) throws Exception {
        /**
         * 初始化雲訊息佇列 RocketMQ 版發送用戶端,實際業務中一般部署在服務端應用中。
         */
        Properties properties = new Properties();
        /**
         * 設定雲訊息佇列 RocketMQ 版Group ID,在雲訊息佇列 RocketMQ 版控制台建立。
         */
        properties.setProperty(PropertyKeyConst.GROUP_ID, "GID-XXXXX");
        /**
         * AccessKey ID,阿里雲身分識別驗證,在阿里雲RAM控制台建立。
         * 阿里雲帳號AccessKey擁有所有API的存取權限,建議您使用RAM使用者進行API訪問或日常營運。
         * 強烈建議不要把AccessKey ID和AccessKey Secret儲存到工程代碼裡,否則可能導致AccessKey泄露,威脅您帳號下所有資源的安全。
         * 本樣本以將AccessKey 和 AccessKeySecret 儲存在環境變數為例說明。
         */
        properties.put(PropertyKeyConst.AccessKey, System.getenv("MQTT_AK_ENV"));
        /**
         * AccessKey Secret,阿里雲身分識別驗證,在阿里雲RAM控制台建立。僅在簽名鑒權模式下需要設定。
         */
        properties.put(PropertyKeyConst.SecretKey, System.getenv("MQTT_SK_ENV"));
        /**
         * 設定TCP存取點,該存取點為雲訊息佇列 RocketMQ 版執行個體的存取點。進入雲訊息佇列 RocketMQ 版控制台執行個體詳情頁面擷取。
         */
        properties.put(PropertyKeyConst.NAMESRV_ADDR, "XXXX");
        /**
         * 設定雲訊息佇列 RocketMQ 版的Topic,在雲訊息佇列 RocketMQ 版控制台建立。
         * 雲訊息佇列 RocketMQ 版和雲Message QueueTT 版配合使用時,RocketMQ用戶端僅操作一級Topic。
         */
        final String parentTopic = "XXXXX";
        Producer producer = ONSFactory.createProducer(properties);
        producer.start();
        //////////////////////////////////////////////////////////////////////////////////////////////////////////////////
        /**
         * 初始化雲Message QueueTT 版接收用戶端,實際業務中雲Message QueueTT 版一般部署在移動終端環境。
         */

        /**
         * 您在控制台建立的雲Message QueueTT 版的執行個體ID。
         */
        String instanceId = "XXXXX";
        /**
         * 設定存取點,進入雲Message QueueTT 版控制台執行個體詳情頁面擷取。
         */
        String endPoint = "XXXXXX.mqtt.aliyuncs.com";
        /**
         * AccessKey ID,阿里雲身分識別驗證,在阿里雲RAM控制台建立。
         * 阿里雲帳號AccessKey擁有所有API的存取權限,建議您使用RAM使用者進行API訪問或日常營運。
         * 強烈建議不要把AccessKey ID和AccessKey Secret儲存到工程代碼裡,否則可能導致AccessKey泄露,威脅您帳號下所有資源的安全。
         * 本樣本以將AccessKey 和 AccessKeySecret 儲存在環境變數為例說明。
         */
        String accessKey = System.getenv("MQTT_AK_ENV");
        /**
         * AccessKey Secret,阿里雲身分識別驗證,在阿里雲RAM控制台建立。僅在簽名鑒權模式下需要設定。
         */
        String secretKey = System.getenv("MQTT_SK_ENV");
        /**
         * MQTT用戶端ID,由業務系統分配,需要保證每個TCP串連都不一樣,保證全域唯一,如果不同的用戶端對象(TCP串連)使用了相同的clientId會導致串連異常斷開。
         * clientId由兩部分組成,格式為GroupID@@@DeviceID,其中GroupID在雲Message QueueTT 版控制台建立,DeviceID由業務方自己設定,clientId總長度不得超過64個字元。
         */
        String clientId = "GID_XXXX@@@XXXXX";
        /**
         * 雲Message QueueTT 版支援子級Topic,用來做自訂的過濾,此處為樣本,可以填寫任何字串。
         * 需要注意的是,完整的Topic長度不得超過128個字元。
         */
        final String subTopic = "/testMq4Iot";
        final String mq4IotTopic = parentTopic + subTopic;
        /**
         * QoS參數代表傳輸品質,可選0,1,2。詳細資料,請參見名詞解釋。
         */
        final int qosLevel = 0;
        ConnectionOptionWrapper connectionOptionWrapper = new ConnectionOptionWrapper(instanceId, accessKey, secretKey, clientId);
        final MemoryPersistence memoryPersistence = new MemoryPersistence();
        /**
         * 用戶端協議和連接埠。用戶端使用的協議和連接埠必須匹配,如果是SSL加密則設定ssl://endpoint:8883。
         */
        final MqttClient mqttClient = new MqttClient("tcp://" + endPoint + ":1883", clientId, memoryPersistence);
        /**
         * 設定用戶端發送逾時時間,防止無限阻塞。
         */
        mqttClient.setTimeToWait(5000);
        final ExecutorService executorService = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>());
        mqttClient.setCallback(new MqttCallbackExtended() {
            @Override
            public void connectComplete(boolean reconnect, String serverURI) {
                /**
                 * 用戶端串連成功後就需要儘快訂閱需要的Topic。
                 */
                System.out.println("connect success");
                executorService.submit(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            final String topicFilter[] = {mq4IotTopic};
                            final int[] qos = {qosLevel};
                            mqttClient.subscribe(topicFilter, qos);
                        } catch (MqttException e) {
                            e.printStackTrace();
                        }
                    }
                });
            }

            @Override
            public void connectionLost(Throwable throwable) {
                throwable.printStackTrace();
            }

            @Override
            public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
                /**
                 * 消費訊息的回調介面,需要確保該介面不拋異常,該介面運行返回即代表訊息消費成功。
                 * 消費訊息需要保證在規定時間內完成,如果消費耗時超過服務端約定的逾時時間,對於可靠傳輸的模式,服務端可能會重試推送,業務需要做好等冪去重處理。
                 */
                System.out.println(
                    "receive msg from topic " + s + " , body is " + new String(mqttMessage.getPayload()));
            }

            @Override
            public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                System.out.println("send msg succeed topic is : " + iMqttDeliveryToken.getTopics()[0]);
            }
        });
        mqttClient.connect(connectionOptionWrapper.getMqttConnectOptions());
        for (int i = 0; i < 10; i++) {
            /**
             * 使用RocketMQ用戶端發訊息給MQTT用戶端時,Topic指定為一級父Topic,Tag指定為MQ2MQTT。
             */
            Message msg = new Message(parentTopic, "MQ2MQTT", "hello mq send mqtt msg".getBytes());
            /**
             * 使用RocketMQ用戶端發訊息給MQTT用戶端時,可以通過MqttSecondTopic屬性設定MQTT的子級Topic屬性。
             */
            msg.putUserProperties(PropertyKeyConst.MqttSecondTopic, subTopic);
            SendResult result = producer.send(msg);
            System.out.println(result);
//            /**
//             * 發送P2P訊息,設定子級Topic。
//             */
//            msg.putUserProperties(PropertyKeyConst.MqttSecondTopic, "/p2p/" + clientId);
//            result = producer.send(msg);
//            System.out.println(result);
        }
        Thread.sleep(Long.MAX_VALUE);

    }

}

3.結果驗證

執行RocketMQSendMessageToMQ4IoT.java類中的Main函數運行代碼。可以根據下面操作驗證訊息的發送和消費情況。

代碼驗證

如下圖所示,RocketMQ訊息已經成功發送,MQTT用戶端也已經成功消費。

image

控制台驗證

  • 查詢訊息發送情況。在雲訊息佇列 RocketMQ 版控制台訊息查詢頁面,根據Topic和Message ID查詢到訊息已經成功發送,如下圖所示。

    image

  • 查詢訊息消費情況。在雲Message QueueTT 版控制台消息轨迹查询頁面,根據Message ID查詢訊息已經被消費,如下圖所示。

    image

更多資訊