This topic describes how to import data from other Alibaba Cloud services to Message Queue for MQTT. In this example, data is imported from Message Queue for Apache RocketMQ, which is the only Alibaba Cloud service that Message Queue for MQTT supports.

Prerequisites

  • The integrated development environment (IDE) is installed. For more information, see IDE. You can use IntelliJ IDEA or Eclipse. In the example, IntelliJ IDEA is used.
  • The Java Development Kit (JDK) is installed. For more information, see JDK.
  • A Message Queue for Apache RocketMQ instance, a topic, and a group ID are created in the Internet region. For more information, see Create resources.

Background information

This topic describes how to import data from Message Queue for Apache RocketMQ to Message Queue for MQTT by using Message Queue for MQTT SDK for Java that is designed for the Internet.

You can use third-party open source SDKs for multiple programming languages to send and subscribe to messages in this scenario. For more information, see Download the SDK.

quick_start_data_inflow

As shown in the preceding figure, you want to transmit data of a backend application that you deploy in the Internet region to a Message Queue for MQTT client on the Internet. The backend application and the Message Queue for MQTT client are developed in Java. You can configure a data inbound rule to import data from Message Queue for Apache RocketMQ to Message Queue for MQTT. The Message Queue for MQTT client and broker implement messaging by using Message Queue for MQTT SDK for Java. The Message Queue for Apache RocketMQ client and broker implement messaging by using Message Queue for Apache RocketMQ SDK for Java.

Note Messages cannot be sent between a Message Queue for Apache RocketMQ topic in one region and a Message Queue for MQTT topic in another region. Therefore, all resources that are involved in this topic must be created in the Internet region. For more information, see Region-specific topics.

Network access

Message Queue for MQTT provides the Public Endpoint and VPC Endpoint. We recommend that you use different types of endpoints based on scenarios:
  • In Internet of Things (IoT) and mobile Internet scenarios, we recommend that you use the Public Endpoint to connect your Message Queue for MQTT client to Message Queue for MQTT.
  • The VPC Endpoint is used only in specific special scenarios. If your applications are deployed on cloud servers, we recommend that you use a server-side messaging service, such as Message Queue for Apache RocketMQ.
Notice To connect a Message Queue for MQTT client to Message Queue for MQTT by using an endpoint, use the domain name instead of the IP address because the IP address can change at any time. The service team of Message Queue for MQTT is not liable for direct or indirect faults or losses that arise in the following scenarios:
  • Your Message Queue for MQTT client uses an IP address instead of a domain name to access the service. The original IP address becomes invalid after the service team of Message Queue for MQTT updates domain name resolution.
  • A firewall policy on IP addresses is set in the network where your Message Queue for MQTT client is running. New IP addresses are blocked due to the firewall policy after the product team of Message Queue for MQTT updates domain name resolution.
In the example, the public endpoint of Message Queue for MQTT is used. For more information about the comparison of scenarios and mappings of message attributes between Message Queue for MQTT and Message Queue for Apache RocketMQ, see the following two topics:

Procedure

The following figure shows the procedure of sending messages from a backend application to a Message Queue for MQTT client.

rocketmq_send_message_to_mqtt

Step 1: Create a Message Queue for MQTT instance and obtain its endpoint

  1. Log on to the Message Queue for MQTT console.
  2. In the left-side navigation pane, click Instances.
  3. In the top navigation bar, select a region.
  4. On the Instances page, click Create Instance in the upper-left corner.
  5. On the buy page, select the desired instance type and click Buy Now.
    You can purchase subscription or pay-as-you-go instances. For more information about two billing methods, see Overview. In this example, a pay-as-you-go instance is created.
  6. On the Confirm Order page, select I have read and agree to Alibaba Cloud Message Queue for MQTT Agreement of Service and click Activate Now.
    In this example, a pay-as-you-go instance is created.
  7. Go back to the Message Queue for MQTT console. In the left-side navigation pane, click Instances. In the top navigation bar, select the region where your instance resides.
  8. On the Instances page, click the name of the instance that you purchased or click Details in the Actions column to go to the Instance Details page.
  9. On the Instance Details page, click the Endpoints tab. On this tab, you can view the endpoint information. In the example, the public endpoint is used.

Step 2: Create a parent topic

The MQTT protocol supports multi-level topics. You must create a parent topic in the Message Queue for MQTT console or by calling an API operation. You do not need to create a subtopic. For more information about topics, see Terms. In this example, a parent topic is created in the Message Queue for MQTT console.

  1. Log on to the Message Queue for MQTT console.
  2. In the left-side navigation pane, click Instances.
  3. In the top navigation bar, select a region.
  4. Find the instance for which you want to create a topic in the instance list. In the Actions column, choose More > Topics.
  5. In the upper-left corner of the Topics page, click Create Topic.
  6. In the Create Topic panel, set the Name and Description parameters for the topic and click OK in the lower-left corner.
    You can view the topic that you create on the Topics page.

Step 3: Create a group ID

For more information about group IDs, see Terms.

  1. Log on to the Message Queue for MQTT console.
  2. In the left-side navigation pane, click Instances.
  3. In the top navigation bar, select a region.
  4. Find the instance for which you want to create a group ID in the instance list. In the Actions column, choose More > Groups.
  5. In the upper-left corner of the Groups page, click Create Group.
  6. In the Create Group panel, set the Group ID parameter and click OK.
    You can view the created group ID on the Groups page.

Step 4: Create a data inbound rule

The parameter values that you set in the rule must be the same as those that correspond to the resources you create.

  1. Log on to the Message Queue for MQTT console.
  2. In the left-side navigation pane, click Instances.
  3. In the top navigation bar, select a region.
  4. Find the instance for which you want to create a data inbound rule in the instance list. In the Actions column, choose More > Rules.
  5. In the upper-left corner of the Rules page, click Create Rule.
  6. On the Create Rule page, perform the following steps:
    1. In the Configure Basic Information step, set the following parameters and click Next.
      Parameter Example Description
      Rule ID 111111 The global unique identifier of a rule.
      • It can contain only letters, digits, hyphens (-), and underscores (_) and must contain at least one letter or digit.
      • It must be 3 to 64 characters in length. If the value contains more than 64 characters, it is automatically truncated.
      • It cannot be updated after the rule is created.
      Description migrate from rocketmq The description of the rule.
      Status Enable Specifies whether to enable the current rule. Valid values:
      • Enable
      • Disable
      Rule Type Data Inbound The type of the rule. Valid values:
    2. In the Configure Rule Source step, specify the data source and click Next.
      Parameter Example Description
      Source Service Type Message Queue for Apache RocketMQ The cloud service from which the data is forwarded to Message Queue for MQTT.
      Note Only Message Queue for Apache RocketMQ is supported.
      Message Queue for Apache RocketMQ Instance MQ_INST_13801563067*****_BbyOD2jQ The ID of the Message Queue for Apache RocketMQ instance from which the data is forwarded.
      Topic TopicA The Message Queue for Apache RocketMQ topic from which the data is forwarded. In this example, the messages of TopicA are forwarded to a topic in Message Queue for MQTT.
    3. In the Configure Rule Destination step, specify the source from which the data is forwarded and click Create.
      Parameter Example Description
      Topic TpoicB The topic in Message Queue for MQTT to which the data is forwarded from other Alibaba Cloud services.
    You can view the data inbound rule that you create on the Rules page.

Step 5: Send and subscribe to messages by using Message Queue for MQTT SDK for Java

  1. Download a third-party open source SDK for Java. Download address: Eclipse Paho Java Client.
  2. Download the demo of Alibaba Cloud Message Queue for MQTT SDK for Java for reference during code development. Download address: mqtt-java-demo.
  3. Decompress this demo project package to a specific folder.
  4. In IntelliJ IDEA, import the decompressed files to create a project and check whether the following dependencies are included in the pom.xml file.
    <dependencies>
            <dependency>
                <groupId>commons-codec</groupId>
                <artifactId>commons-codec</artifactId>
                <version>1.10</version>
            </dependency>
            <dependency>
                <groupId>org.eclipse.paho</groupId>
                <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
                <version>1.2.2</version>
            </dependency>
            <dependency>
                <groupId>org.apache.httpcomponents</groupId>
                <artifactId>httpclient</artifactId>
                <version>4.5.2</version>
            </dependency>
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.48</version>
            </dependency>
            <dependency>
                <groupId>com.aliyun.openservices</groupId>
                <artifactId>ons-client</artifactId>
                <version>1.8.5.Final</version>
            </dependency>
            <dependency>
                <groupId>com.aliyun</groupId>
                <artifactId>aliyun-java-sdk-onsmqtt</artifactId>
                <version>1.0.3</version>
            </dependency>
            <dependency>
                <groupId>com.aliyun</groupId>
                <artifactId>aliyun-java-sdk-core</artifactId>
                <version>4.5.0</version>
            </dependency>
    </dependencies>
    Note For more information about the ons-client version, see Release notes.
  5. In the RocketMQSendMessageToMQ4IoT.java class, specify parameter values as instructed in comments. Most of these parameter values correspond to the Message Queue for MQTT resources that you create in Step 1 to Step 3 and related resources that you create in Message Queue for Apache RocketMQ. Then, execute the main() function to send and subscribe to messages.
    The following code shows an example:
    package com.aliyun.openservices.lmq.example.demo;
    
    import com.aliyun.openservices.lmq.example.util.ConnectionOptionWrapper;
    import com.aliyun.openservices.ons.api.Message;
    import com.aliyun.openservices.ons.api.ONSFactory;
    import com.aliyun.openservices.ons.api.Producer;
    import com.aliyun.openservices.ons.api.PropertyKeyConst;
    import com.aliyun.openservices.ons.api.SendResult;
    import java.util.Properties;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
    import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
    import org.eclipse.paho.client.mqttv3.MqttClient;
    import org.eclipse.paho.client.mqttv3.MqttException;
    import org.eclipse.paho.client.mqttv3.MqttMessage;
    import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
    
    /**
     * In the production environment of Alibaba Cloud, Message Queue for Apache RocketMQ instances in regions except the Internet region cannot be accessed by an on-premises server. Instead, the instances can be accessed by an Elastic Compute Service (ECS) server in the same region.
     */
    public class RocketMQSendMessageToMQ4IoT {
        public static void main(String[] args) throws Exception {
            /**
             * Initialize a Message Queue for Apache RocketMQ client as a sender. In most actual scenarios, the code is deployed in a backend application.
             */
            Properties properties = new Properties();
            /**
             * The group ID that you create in the Message Queue for Apache RocketMQ console.
             */
            properties.setProperty(PropertyKeyConst.GROUP_ID, "GID-XXXXX");
            /**
             // The AccessKey ID that you created in the Alibaba Cloud RAM console for identity verification.
             */
            properties.put(PropertyKeyConst.AccessKey, "XXXX");
            /**
             // The AccessKey secret that you created in the Alibaba Cloud RAM console for identity verification. The AccessKey secret is required only for signature authentication.
             */
            properties.put(PropertyKeyConst.SecretKey, "XXXX");
            /**
             * The endpoint of the Message Queue for Apache RocketMQ instance that is used in the TCP connection. You can obtain the endpoint on the Instance Details page in the Message Queue for Apache RocketMQ console.
             */
            properties.put(PropertyKeyConst.NAMESRV_ADDR, "XXXX");
            /**
             * The topic that you create in the Message Queue for Apache RocketMQ console.
             * Only a parent topic can be used by the Message Queue for Apache RocketMQ client when you exchange data between Message Queue for Apache RocketMQ and Message Queue for MQTT.
             */
            final String parentTopic = "XXXXX";
            Producer producer = ONSFactory.createProducer(properties);
            producer.start();
            //////////////////////////////////////////////////////////////////////////////////////////////////////////////////
            /**
             * Initialize a Message Queue for MQTT client as a receiver. In most actual scenarios, the code is deployed on a mobile terminal.
             */
    
            /**
             * The ID of the Message Queue for MQTT instance that you created in the console.
             */
            String instanceId = "XXXXX";
            /**
             // The endpoint of the Message Queue for MQTT instance. You can obtain the endpoint on the Instance Details page in the Message Queue for MQTT console.
             */
            String endPoint = "XXXXXX.mqtt.aliyuncs.com";
            /**
             // The AccessKey ID that you created in the Alibaba Cloud RAM console for identity verification.
             */
            String accessKey = "XXXX";
            /**
             // The AccessKey secret that you created in the Alibaba Cloud RAM console for identity verification. The AccessKey secret is required only for signature authentication.
             */
            String secretKey = "XXXX";
            /**
             * The globally unique ID that the system assigns to the Message Queue for MQTT client. The client ID must be different in each TCP connection. If different TCP connections use the same client ID, an exception occurs and the connections are closed.
             * The value of the clientId parameter is in the format of GroupID@@@DeviceID. GroupID is the group ID that you created in the Message Queue for MQTT console and DeviceID is the custom ID of the device. The value of the clientId parameter can be up to 64 characters in length.
             */
            String clientId = "GID_XXXX@@@XXXXX";
            /**
             * Message Queue for MQTT allows you to use a subtopic to filter messages. You can specify a string as a subtopic, as shown in the following code.
             * The value of the mq4IotTopic parameter can be up to 128 characters in length.
             */
            final String subTopic = "/testMq4Iot";
            final String mq4IotTopic = parentTopic + subTopic;
            /**
             * The quality of service (QoS) level in message transmission. Valid values: 0, 1, and 2. For more information, see Terms.
             */
            final int qosLevel = 0;
            ConnectionOptionWrapper connectionOptionWrapper = new ConnectionOptionWrapper(instanceId, accessKey, secretKey, clientId);
            final MemoryPersistence memoryPersistence = new MemoryPersistence();
            /**
             * The protocol and port of the Message Queue for MQTT client. The protocol and port used by the Message Queue for MQTT client must match. If SSL encryption is used, the protocol and port must be specified by following the format of ssl://endpoint:8883.
             */
            final MqttClient mqttClient = new MqttClient("tcp://" + endPoint + ":1883", clientId, memoryPersistence);
            /**
             * The timeout period for the Message Queue for MQTT client to wait for a response. The timeout period can prevent the Message Queue for MQTT client from keeping waiting for a response.
             */
            mqttClient.setTimeToWait(5000);
            final ExecutorService executorService = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>());
            mqttClient.setCallback(new MqttCallbackExtended() {
                @Override
                public void connectComplete(boolean reconnect, String serverURI) {
                    /**
                     * After a connection is established, the Message Queue for MQTT client must subscribe to the required topic as soon as possible.
                     */
                    System.out.println("connect success");
                    executorService.submit(new Runnable() {
                        @Override
                        public void run() {
                            try {
                                final String topicFilter[] = {mq4IotTopic};
                                final int[] qos = {qosLevel};
                                mqttClient.subscribe(topicFilter, qos);
                            } catch (MqttException e) {
                                e.printStackTrace();
                            }
                        }
                    });
                }
    
                @Override
                public void connectionLost(Throwable throwable) {
                    throwable.printStackTrace();
                }
    
                @Override
                public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
                    /**
                     * The callback that is triggered to consume messages. Make sure that the callback has no exception. If a response is returned for the callback, the messages are consumed.
                     * The messages must be consumed in a specified period of time. If the time used to consume messages exceeds the timeout period that is specified by the Message Queue for MQTT broker, the Message Queue for MQTT broker retries sending messages. Make sure that the idempotence of the business data is kept and the business data is de-duplicated.
                     */
                    System.out.println(
                        "receive msg from topic " + s + " , body is " + new String(mqttMessage.getPayload()));
                }
    
                @Override
                public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                    System.out.println("send msg succeed topic is : " + iMqttDeliveryToken.getTopics()[0]);
                }
            });
            mqttClient.connect(connectionOptionWrapper.getMqttConnectOptions());
            for (int i = 0; i < 10; i++) {
                /**
                 * Specify the parent topic as the topic and MQ2MQTT as the tag when you use the Message Queue for Apache RocketMQ client to send messages to the Message Queue for MQTT client.
                 */
                Message msg = new Message(parentTopic, "MQ2MQTT", "hello mq send mqtt msg".getBytes());
                /**
                 * You can use the MqttSecondTopic parameter to specify a subtopic when you use the Message Queue for Apache RocketMQ client to send messages to the Message Queue for MQTT client.
                 */
                msg.putUserProperties(PropertyKeyConst.MqttSecondTopic, subTopic);
                SendResult result = producer.send(msg);
                System.out.println(result);
                /**
                 * Send a point-to-point (P2P) message and specify a subtopic. 
                 */
                msg.putUserProperties(PropertyKeyConst.MqttSecondTopic, "/p2p/" + clientId);
                result = producer.send(msg);
                System.out.println(result);
            }
            Thread.sleep(Long.MAX_VALUE);
    
        }
    
    }

Verify the result

After the producer sends the message and the consumer subscribes to the message, you can query the trace of the message in the Message Queue for MQTT console to verify whether the message is sent and received. For more information, see Query the message traces.

References