全部產品
Search
文件中心

ApsaraMQ for MQTT:MQTT資料流出規則的實現

更新時間:Aug 08, 2025

如果您的雲端應用需要使用雲訊息佇列 RocketMQ 版產品的某些功能,例如順序訊息特性、事務訊息特性等,您可以通過訊息流程入或流出規則將雲Message QueueTT 版雲訊息佇列 RocketMQ 版資料進行流轉。本文介紹如何將雲Message QueueTT 版的資料匯出至其他阿里雲產品。

背景資訊

雲Message QueueTT 版支援雲端SDK,雲上應用可直接通過雲端SDK接入雲Message QueueTT 版服務端進行訊息收發。雲端SDK使用,請參見雲端開發概述

同時雲Message QueueTT 版支援和其他雲產品進行互連,當前支援的雲產品有雲訊息佇列 RocketMQ 版

本文以公網環境中的Java SDK為例說明如何將雲Message QueueTT 版的資料匯出至雲訊息佇列 RocketMQ 版

此情境下可使用多語言的第三方開源SDK來實現訊息收發。更多資訊,請參見SDK下載

quick_start_data_outflow

網路訪問

雲Message QueueTT 版同時提供了公网接入点VPC 接入点
  • 公网接入点為本地公網環境訪問的IP地址,一般用於物聯網和移動互連網情境中;
  • VPC 接入点為雲上私網訪問的IP地址,一般用於雲端應用接入雲Message QueueTT 版
重要 用戶端使用存取點串連服務時務必使用網域名稱接入,不得直接使用網域名稱背後的IP地址直接連接,因為IP地址隨時會變化。在以下使用方式中出現的問題雲Message QueueTT 版產品方概不負責:
  • 用戶端不使用網域名稱接入而是使用IP地址接入,產品方更新了網域名稱解析導致原有IP地址失效。
  • 用戶端網路對IP地址設定網路防火牆策略,產品方更新了網域名稱解析後新IP地址被您的防火牆策略攔截。

前提條件

  • 安裝IDE。您可以使用IntelliJ IDEA或者Eclipse,本文以IntelliJ IDEA為例。
  • 安裝Java 8或11。更多資訊,請參見安裝JDK

  • 已建立雲Message QueueTT 版執行個體、Topic和Group ID,具體操作,請參見建立資源

  • 已建立雲訊息佇列 RocketMQ 版執行個體、Topic和Group ID,具體操作,請參見步驟二:建立資源

重要
  • 雲Message QueueTT 版資料流出規則僅支援雲訊息佇列 RocketMQ 版4.x系列執行個體。

  • 雲Message QueueTT 版資料流出規則不能跨地區使用,因此,雲Message QueueTT 版雲訊息佇列 RocketMQ 版的資源都必須建立在同一地區。

1.建立資料流出規則

  1. 登入雲Message QueueTT 版控制台,並在左側導覽列單擊实例列表

  2. 在頂部功能表列選擇目標地區,然後在執行個體列表中單擊執行個體名稱進入实例详情頁面。

  3. 在左側導覽列單擊规则管理

  4. 规则管理頁面,單擊RocketMQ 规则頁簽,然後單擊创建规则

  5. 创建规则頁面完成以下操作。

    1. 配置基本資料。輸入規則ID,選擇資料流出的規則類型。

      image

    2. 配置規則源。選擇已經建立好的雲Message QueueTT 版的Topic。

      image

    3. 配置規則目標。選擇已經建立好的雲訊息佇列 RocketMQ 版的執行個體和Topic。

      image

2.準備測試代碼

2.1下載範例程式碼

  1. 下載mqtt-java-demo,並解壓該Demo工程包至您指定的檔案夾。

  2. 在解壓的Demo工程中找到lmq-java-demo檔案夾,將此檔案夾匯入IntelliJ IDEA,並確認pom.xml中已包含以下依賴。

    <dependencies>
        <dependency>
            <groupId>org.bouncycastle</groupId>
            <artifactId>bcprov-jdk15on</artifactId>
            <version>1.70</version>
        </dependency>
        <dependency>
            <groupId>commons-codec</groupId>
            <artifactId>commons-codec</artifactId>
            <version>1.10</version>
        </dependency>
        <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
            <version>1.2.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpclient</artifactId>
            <version>4.5.2</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.83</version>
        </dependency>
        <dependency>
            <groupId>com.aliyun.openservices</groupId>
            <artifactId>ons-client</artifactId>
            <version>1.8.5.Final</version>
        </dependency>
        <dependency>
            <groupId>com.aliyun</groupId>
            <artifactId>aliyun-java-sdk-onsmqtt</artifactId>
            <version>1.0.3</version>
        </dependency>
        <dependency>
            <groupId>com.aliyun</groupId>
            <artifactId>aliyun-java-sdk-core</artifactId>
            <version>4.5.0</version>
        </dependency>
    </dependencies>
  3. 配置訪問憑證。

    • 擷取AccessKey資訊。擷取方式,請參見建立AccessKey

    • 配置環境變數。雲Message QueueTT 版的AccessKey ID和AccessKey Secret的環境變數名稱分別為MQTT_AK_ENVMQTT_SK_ENV。關於配置環境變數的方法,請參見配置訪問憑證

2.2收發訊息代碼

MQ4IoTSendMessageToRocketMQ.java類中包含了發送MQTT訊息和使用RocketMQ消費的代碼,按代碼注釋說明填寫雲Message QueueTT 版雲訊息佇列 RocketMQ 版資源的參數。範例程式碼如下。

收發送訊息程式碼範例

import com.aliyun.openservices.lmq.example.util.ConnectionOptionWrapper;
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import java.util.Properties;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class MQ4IoTSendMessageToRocketMQ {
    public static void main(String[] args) throws Exception {
        /**
         * 初始化雲訊息佇列 RocketMQ 版接收用戶端,實際業務中一般部署在服務端應用中。
         */
        Properties properties = new Properties();
        /**
         * 設定雲訊息佇列 RocketMQ 版Group ID,在雲訊息佇列 RocketMQ 版控制台建立。
         */
        properties.setProperty(PropertyKeyConst.GROUP_ID, "GID-XXXXX");
        
        /**
         * AccessKey ID,阿里雲身分識別驗證,在阿里雲RAM控制台建立。
         * 阿里雲帳號AccessKey擁有所有API的存取權限,建議您使用RAM使用者進行API訪問或日常營運。
         * 強烈建議不要把AccessKey ID和AccessKey Secret儲存到工程代碼裡,否則可能導致AccessKey泄露,威脅您帳號下所有資源的安全。
         * 本樣本以將AccessKey 和 AccessKeySecret 儲存在環境變數為例說明。
         */
        properties.put(PropertyKeyConst.AccessKey, System.getenv("MQTT_AK_ENV"));
        /**
         * AccessKey Secret,阿里雲身分識別驗證,在阿里雲RAM控制台建立。僅在簽名鑒權模式下需要設定。
         */
        properties.put(PropertyKeyConst.SecretKey, System.getenv("MQTT_SK_ENV"));
      
         /**
         * 設定TCP存取點,該存取點為雲訊息佇列 RocketMQ 版執行個體的存取點。雲訊息佇列 RocketMQ 版控制台執行個體詳情頁面擷取。
         */
        properties.put(PropertyKeyConst.NAMESRV_ADDR, "http://xxxxx.XXXXX.mq-internet.aliyuncs.com");
        /**
         * 設定雲訊息佇列 RocketMQ 版的Topic,在雲訊息佇列 RocketMQ 版控制台建立。
         * 雲訊息佇列 RocketMQ 版和微Message QueueTT配合使用時,RocketMQ用戶端僅操作一級Topic。
         */
        final String parentTopic = "XXXXX";
        Consumer consumer = ONSFactory.createConsumer(properties);
        consumer.subscribe(parentTopic, "*", new MessageListener() {
            public Action consume(Message message, ConsumeContext consumeContext) {
                System.out.println("recv msg:" + message);
                return Action.CommitMessage;
            }
        });
        consumer.start();
        //////////////////////////////////////////////////////////////////////////////////////////////////////////////////
        /**
         * 初始化雲Message QueueTT 版發送用戶端,實際業務中雲Message QueueTT 版一般部署在移動終端環境。
         */

        /**
         * 您在控制台建立的微Message QueueTT的執行個體ID。
         */
        String instanceId = "XXXXX";
         /**
         * 設定存取點,進入微Message QueueTT版控制台執行個體詳情頁面擷取。
         */
        String endPoint = "XXXXXX.mqtt.aliyuncs.com";
        /**
         * AccessKey ID,阿里雲身分識別驗證,在阿里雲RAM控制台建立。
         * 阿里雲帳號AccessKey擁有所有API的存取權限,建議您使用RAM使用者進行API訪問或日常營運。
         * 強烈建議不要把AccessKey ID和AccessKey Secret儲存到工程代碼裡,否則可能導致AccessKey泄露,威脅您帳號下所有資源的安全。
         * 本樣本以將AccessKey 和 AccessKeySecret 儲存在環境變數為例說明。
         */
        String accessKey = System.getenv("MQTT_AK_ENV");
        /**
         * AccessKey Secret,阿里雲身分識別驗證,在阿里雲RAM控制台建立。僅在簽名鑒權模式下需要設定。
         */
        String secretKey = System.getenv("MQTT_SK_ENV");
        /**
         * MQTT用戶端ID,由業務系統分配,需要保證每個TCP串連都不一樣,保證全域唯一,如果不同的用戶端對象(TCP串連)使用了相同的clientId會導致串連異常斷開。
         * clientId由兩部分組成,格式為GroupID@@@DeviceID,其中GroupID在雲Message QueueTT 版控制台建立,DeviceID由業務方自己設定,clientId總長度不得超過64個字元。
         */
        String clientId = "GID_XXXX@@@XXXXX";
       /**
         * 雲Message QueueTT 版支援子級Topic,用來做自訂的過濾,此處為樣本,可以填寫任何字串。
         * 需要注意的是,完整的Topic長度不得超過128個字元。
         */
        final String mq4IotTopic = parentTopic + "/" + "testMq4Iot";
        /**
         * QoS參數代表傳輸品質,可選0,1,2。詳細資料,請參見名詞解釋。
         */
        final int qosLevel = 0;
        ConnectionOptionWrapper connectionOptionWrapper = new ConnectionOptionWrapper(instanceId, accessKey, secretKey, clientId);
        final MemoryPersistence memoryPersistence = new MemoryPersistence();
         /**
         * 用戶端協議和連接埠。用戶端使用的協議和連接埠必須匹配,如果是SSL加密則設定ssl://endpoint:8883。
         */
        final MqttClient mqttClient = new MqttClient("tcp://" + endPoint + ":1883", clientId, memoryPersistence);
        /**
         * 設定用戶端發送逾時時間,防止無限阻塞。
         */
        mqttClient.setTimeToWait(5000);
        mqttClient.setCallback(new MqttCallbackExtended() {
            @Override
            public void connectComplete(boolean reconnect, String serverURI) {
                /**
                 * 用戶端串連成功後就需要儘快訂閱需要的Topic。
                 */
                System.out.println("connect success");
            }

            @Override
            public void connectionLost(Throwable throwable) {
                throwable.printStackTrace();
            }

            @Override
            public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
            }

            @Override
            public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                System.out.println("send msg succeed topic is : " + iMqttDeliveryToken.getTopics()[0]);
            }
        });
        mqttClient.connect(connectionOptionWrapper.getMqttConnectOptions());
        for (int i = 0; i < 10; i++) {
            MqttMessage message = new MqttMessage("hello mq4Iot pub sub msg".getBytes());
            message.setQos(qosLevel);
            /**
             *  發送普通訊息時,Topic必須和接收方訂閱的Topic一致,或者符合萬用字元匹配規則。
             */
            mqttClient.publish(mq4IotTopic, message);
        }
        Thread.sleep(Long.MAX_VALUE);

    }

}

3.結果驗證

執行MQ4IoTSendMessageToRocketMQ.java類中的Main函數運行代碼。可以根據下面操作驗證訊息的發送和消費情況。

代碼驗證

如下圖所示,MQTT訊息已經成功發送,RocketMQ用戶端也已經成功消費。

image

控制台驗證

  • 查詢訊息發送情況。在雲Message QueueTT 版控制台消息轨迹查询頁面,根據Group ID和Device ID查詢訊息已經成功發送,如下圖所示。

    image

  • 查詢訊息消費情況。在雲訊息佇列 RocketMQ 版控制台訊息查詢頁面,根據Topic查詢到訊息已經被轉寄到雲訊息佇列 RocketMQ 版,如下圖所示。

    image

    單擊操作列中的訊息軌跡查看,訊息已經被消費,如下圖所示。

    image

更多資訊