背景資訊
雲Message QueueTT 版支援雲端SDK,雲上應用可直接通過雲端SDK接入雲Message QueueTT 版服務端進行訊息收發。雲端SDK使用,請參見雲端開發概述。
同時雲Message QueueTT 版支援和其他雲產品進行互連,當前支援的雲產品有雲訊息佇列 RocketMQ 版。
本文以公網環境中的Java SDK為例說明如何將雲訊息佇列 RocketMQ 版資料流入至雲Message QueueTT 版。

網路訪問
雲Message QueueTT 版同時提供了
公网接入点和
VPC 接入点。
- 公网接入点為本地公網環境訪問的IP地址,一般用於物聯網和移動互連網情境中;
- VPC 接入点為雲上私網訪問的IP地址,一般用於雲端應用接入雲Message QueueTT 版。
重要 用戶端使用存取點串連服務時務必使用網域名稱接入,不得直接使用網域名稱背後的IP地址直接連接,因為IP地址隨時會變化。在以下使用方式中出現的問題
雲Message QueueTT 版產品方概不負責:
- 用戶端不使用網域名稱接入而是使用IP地址接入,產品方更新了網域名稱解析導致原有IP地址失效。
- 用戶端網路對IP地址設定網路防火牆策略,產品方更新了網域名稱解析後新IP地址被您的防火牆策略攔截。
前提條件
- 安裝IDE。您可以使用IntelliJ IDEA或者Eclipse,本文以IntelliJ IDEA為例。
安裝Java 8或11。更多資訊,請參見安裝JDK。
已建立雲Message QueueTT 版執行個體、Topic和Group ID,具體操作,請參見建立資源。
已建立雲訊息佇列 RocketMQ 版執行個體、Topic和Group ID,具體操作,請參見步驟二:建立資源。
1.建立資料流入規則
登入雲Message QueueTT 版控制台,並在左側導覽列單擊实例列表。
在頂部功能表列選擇目標地區,然後在執行個體列表中單擊執行個體名稱進入实例详情頁面。
在左側導覽列單擊规则管理。
在规则管理頁面,單擊RocketMQ 规则頁簽,然後單擊创建规则。
在创建规则頁面完成以下操作。
配置基本資料。輸入規則ID,選擇資料流入的規則類型。

配置規則源。選擇已經建立好的雲訊息佇列 RocketMQ 版的執行個體和Topic。

配置規則目標。選擇已經建立好的雲Message QueueTT 版的Topic。

2.準備測試代碼
2.1下載範例程式碼
下載mqtt-java-demo,並解壓該Demo工程包至您指定的檔案夾。
在解壓的Demo工程中找到lmq-java-demo檔案夾,將此檔案夾匯入IntelliJ IDEA,並確認pom.xml中已包含以下依賴。
<dependencies>
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcprov-jdk15on</artifactId>
<version>1.70</version>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.10</version>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.2</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.2</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83</version>
</dependency>
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>ons-client</artifactId>
<version>1.8.5.Final</version>
</dependency>
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>aliyun-java-sdk-onsmqtt</artifactId>
<version>1.0.3</version>
</dependency>
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>aliyun-java-sdk-core</artifactId>
<version>4.5.0</version>
</dependency>
</dependencies>
配置訪問憑證。
2.2收發訊息代碼
在RocketMQSendMessageToMQ4IoT.java類中包含了發送RocketMQ訊息和使用MQTT消費的代碼,按代碼注釋說明填寫雲訊息佇列 RocketMQ 版和雲Message QueueTT 版資源的參數。
測試中可以將其中發送P2P訊息的相關代碼注釋掉,範例程式碼如下。
收發送訊息程式碼範例
import com.aliyun.openservices.lmq.example.util.ConnectionOptionWrapper;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.SendResult;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class RocketMQSendMessageToMQ4IoT {
public static void main(String[] args) throws Exception {
/**
* 初始化雲訊息佇列 RocketMQ 版發送用戶端,實際業務中一般部署在服務端應用中。
*/
Properties properties = new Properties();
/**
* 設定雲訊息佇列 RocketMQ 版Group ID,在雲訊息佇列 RocketMQ 版控制台建立。
*/
properties.setProperty(PropertyKeyConst.GROUP_ID, "GID-XXXXX");
/**
* AccessKey ID,阿里雲身分識別驗證,在阿里雲RAM控制台建立。
* 阿里雲帳號AccessKey擁有所有API的存取權限,建議您使用RAM使用者進行API訪問或日常營運。
* 強烈建議不要把AccessKey ID和AccessKey Secret儲存到工程代碼裡,否則可能導致AccessKey泄露,威脅您帳號下所有資源的安全。
* 本樣本以將AccessKey 和 AccessKeySecret 儲存在環境變數為例說明。
*/
properties.put(PropertyKeyConst.AccessKey, System.getenv("MQTT_AK_ENV"));
/**
* AccessKey Secret,阿里雲身分識別驗證,在阿里雲RAM控制台建立。僅在簽名鑒權模式下需要設定。
*/
properties.put(PropertyKeyConst.SecretKey, System.getenv("MQTT_SK_ENV"));
/**
* 設定TCP存取點,該存取點為雲訊息佇列 RocketMQ 版執行個體的存取點。進入雲訊息佇列 RocketMQ 版控制台執行個體詳情頁面擷取。
*/
properties.put(PropertyKeyConst.NAMESRV_ADDR, "XXXX");
/**
* 設定雲訊息佇列 RocketMQ 版的Topic,在雲訊息佇列 RocketMQ 版控制台建立。
* 雲訊息佇列 RocketMQ 版和雲Message QueueTT 版配合使用時,RocketMQ用戶端僅操作一級Topic。
*/
final String parentTopic = "XXXXX";
Producer producer = ONSFactory.createProducer(properties);
producer.start();
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////
/**
* 初始化雲Message QueueTT 版接收用戶端,實際業務中雲Message QueueTT 版一般部署在移動終端環境。
*/
/**
* 您在控制台建立的雲Message QueueTT 版的執行個體ID。
*/
String instanceId = "XXXXX";
/**
* 設定存取點,進入雲Message QueueTT 版控制台執行個體詳情頁面擷取。
*/
String endPoint = "XXXXXX.mqtt.aliyuncs.com";
/**
* AccessKey ID,阿里雲身分識別驗證,在阿里雲RAM控制台建立。
* 阿里雲帳號AccessKey擁有所有API的存取權限,建議您使用RAM使用者進行API訪問或日常營運。
* 強烈建議不要把AccessKey ID和AccessKey Secret儲存到工程代碼裡,否則可能導致AccessKey泄露,威脅您帳號下所有資源的安全。
* 本樣本以將AccessKey 和 AccessKeySecret 儲存在環境變數為例說明。
*/
String accessKey = System.getenv("MQTT_AK_ENV");
/**
* AccessKey Secret,阿里雲身分識別驗證,在阿里雲RAM控制台建立。僅在簽名鑒權模式下需要設定。
*/
String secretKey = System.getenv("MQTT_SK_ENV");
/**
* MQTT用戶端ID,由業務系統分配,需要保證每個TCP串連都不一樣,保證全域唯一,如果不同的用戶端對象(TCP串連)使用了相同的clientId會導致串連異常斷開。
* clientId由兩部分組成,格式為GroupID@@@DeviceID,其中GroupID在雲Message QueueTT 版控制台建立,DeviceID由業務方自己設定,clientId總長度不得超過64個字元。
*/
String clientId = "GID_XXXX@@@XXXXX";
/**
* 雲Message QueueTT 版支援子級Topic,用來做自訂的過濾,此處為樣本,可以填寫任何字串。
* 需要注意的是,完整的Topic長度不得超過128個字元。
*/
final String subTopic = "/testMq4Iot";
final String mq4IotTopic = parentTopic + subTopic;
/**
* QoS參數代表傳輸品質,可選0,1,2。詳細資料,請參見名詞解釋。
*/
final int qosLevel = 0;
ConnectionOptionWrapper connectionOptionWrapper = new ConnectionOptionWrapper(instanceId, accessKey, secretKey, clientId);
final MemoryPersistence memoryPersistence = new MemoryPersistence();
/**
* 用戶端協議和連接埠。用戶端使用的協議和連接埠必須匹配,如果是SSL加密則設定ssl://endpoint:8883。
*/
final MqttClient mqttClient = new MqttClient("tcp://" + endPoint + ":1883", clientId, memoryPersistence);
/**
* 設定用戶端發送逾時時間,防止無限阻塞。
*/
mqttClient.setTimeToWait(5000);
final ExecutorService executorService = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
mqttClient.setCallback(new MqttCallbackExtended() {
@Override
public void connectComplete(boolean reconnect, String serverURI) {
/**
* 用戶端串連成功後就需要儘快訂閱需要的Topic。
*/
System.out.println("connect success");
executorService.submit(new Runnable() {
@Override
public void run() {
try {
final String topicFilter[] = {mq4IotTopic};
final int[] qos = {qosLevel};
mqttClient.subscribe(topicFilter, qos);
} catch (MqttException e) {
e.printStackTrace();
}
}
});
}
@Override
public void connectionLost(Throwable throwable) {
throwable.printStackTrace();
}
@Override
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
/**
* 消費訊息的回調介面,需要確保該介面不拋異常,該介面運行返回即代表訊息消費成功。
* 消費訊息需要保證在規定時間內完成,如果消費耗時超過服務端約定的逾時時間,對於可靠傳輸的模式,服務端可能會重試推送,業務需要做好等冪去重處理。
*/
System.out.println(
"receive msg from topic " + s + " , body is " + new String(mqttMessage.getPayload()));
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
System.out.println("send msg succeed topic is : " + iMqttDeliveryToken.getTopics()[0]);
}
});
mqttClient.connect(connectionOptionWrapper.getMqttConnectOptions());
for (int i = 0; i < 10; i++) {
/**
* 使用RocketMQ用戶端發訊息給MQTT用戶端時,Topic指定為一級父Topic,Tag指定為MQ2MQTT。
*/
Message msg = new Message(parentTopic, "MQ2MQTT", "hello mq send mqtt msg".getBytes());
/**
* 使用RocketMQ用戶端發訊息給MQTT用戶端時,可以通過MqttSecondTopic屬性設定MQTT的子級Topic屬性。
*/
msg.putUserProperties(PropertyKeyConst.MqttSecondTopic, subTopic);
SendResult result = producer.send(msg);
System.out.println(result);
// /**
// * 發送P2P訊息,設定子級Topic。
// */
// msg.putUserProperties(PropertyKeyConst.MqttSecondTopic, "/p2p/" + clientId);
// result = producer.send(msg);
// System.out.println(result);
}
Thread.sleep(Long.MAX_VALUE);
}
}
3.結果驗證
執行RocketMQSendMessageToMQ4IoT.java類中的Main函數運行代碼。可以根據下面操作驗證訊息的發送和消費情況。
代碼驗證
如下圖所示,RocketMQ訊息已經成功發送,MQTT用戶端也已經成功消費。

控制台驗證