This topic describes how to export data from Message Queue for MQTT to other Alibaba Cloud services. In this example, data is exported to Message Queue for Apache RocketMQ, which is the only Alibaba Cloud service that Message Queue for MQTT supports.

Prerequisites

  • The integrated development environment (IDE) is installed. For more information, see IDE. You can use IntelliJ IDEA or Eclipse. In the example, IntelliJ IDEA is used.
  • The Java Development Kit (JDK) is installed. For more information, see JDK.
  • A Message Queue for Apache RocketMQ instance, a topic, and a group ID are created in the Internet region. For more information, see Create resources.

Background information

This topic describes how to export data from Message Queue for MQTT to Message Queue for Apache RocketMQ by using Message Queue for MQTT SDK for Java that is designed for the Internet.

You can use third-party open source SDKs for multiple programming languages to send and subscribe to messages in this scenario. For more information, see Download the SDK.

quick_start_data_outflow

As shown in the preceding figure, a Message Queue for MQTT client that you deploy on the Internet needs to send data to a backend application that you deploy in the Internet region. The backend application and the Message Queue for MQTT client are developed in Java. You can configure a data outbound rule to export data from Message Queue for MQTT to Message Queue for Apache RocketMQ. The Message Queue for MQTT client and broker implement messaging by using Message Queue for MQTT SDK for Java. The Message Queue for Apache RocketMQ client and broker implement messaging by using Message Queue for Apache RocketMQ SDK for Java.

Note Messages cannot be sent between a Message Queue for Apache RocketMQ topic in one region and a Message Queue for MQTT topic in another region. Therefore, all resources that are involved in this topic must be created in the Internet region. For more information, see Region-specific topics.

Network access

Message Queue for MQTT provides the Public Endpoint and VPC Endpoint. We recommend that you use different types of endpoints based on scenarios:
  • In Internet of Things (IoT) and mobile Internet scenarios, we recommend that you use the Public Endpoint to connect your Message Queue for MQTT client to Message Queue for MQTT.
  • The VPC Endpoint is used only in specific special scenarios. If your applications are deployed on cloud servers, we recommend that you use a server-side messaging service, such as Message Queue for Apache RocketMQ.
Notice To connect a Message Queue for MQTT client to Message Queue for MQTT by using an endpoint, use the domain name instead of the IP address because the IP address can change at any time. The service team of Message Queue for MQTT is not liable for direct or indirect faults or losses that arise in the following scenarios:
  • Your Message Queue for MQTT client uses an IP address instead of a domain name to access the service. The original IP address becomes invalid after the service team of Message Queue for MQTT updates domain name resolution.
  • A firewall policy on IP addresses is set in the network where your Message Queue for MQTT client is running. New IP addresses are blocked due to the firewall policy after the product team of Message Queue for MQTT updates domain name resolution.
In the example, the public endpoint of Message Queue for MQTT is used. For more information about the comparison of scenarios and mappings of message attributes between Message Queue for MQTT and Message Queue for Apache RocketMQ, see the following two topics:

Procedure

The following figure shows the procedure of sending messages from a Message Queue for MQTT client to a backend application.

quick_start_mqtt_to_rocketmq

Step 1: Create a Message Queue for MQTT instance and obtain its endpoint

  1. Log on to the Message Queue for MQTT console.
  2. In the left-side navigation pane, click Instances.
  3. In the top navigation bar, select a region.
  4. On the Instances page, click Create Instance in the upper-left corner.
  5. In the panel that appears, select a billing method as needed.
    Message Queue for MQTT supports the Pay-as-you-go and Subscription billing methods. For more information about the two billing methods, see Overview.
    • Create a pay-as-you-go instance:
      1. Set Billing Method to Pay-as-you-go and click OK.
      2. In the panel that appears, set the parameters as needed and click Buy Now.
    • Create a subscription instance:
      1. Set Billing Method to Subscription and click OK.
      2. In the panel that appears, set the parameters as needed and click Buy Now.
      3. In the panel that appears, click Purchase.
    After you complete the purchase, refresh the Instances page in the Message Queue for MQTT console. The instance that you created appears in the instance list.
  6. On the Instances page, click the name of the instance that you purchased or click Details in the Actions column to go to the Instance Details page.
  7. On the Instance Details page, click the Endpoints tab. On this tab, you can view the endpoint information. In the example, the public endpoint is used.

Step 2: Create a parent topic

The MQTT protocol supports multi-level topics. You must create a parent topic in the Message Queue for MQTT console or by calling an API operation. You do not need to create a subtopic. For more information about topics, see Terms. In this example, a parent topic is created in the Message Queue for MQTT console.

  1. Log on to the Message Queue for MQTT console.
  2. In the left-side navigation pane, click Instances.
  3. In the top navigation bar, select a region.
  4. Find the instance for which you want to create a topic in the instance list. In the Actions column, choose More > Topics.
  5. In the upper-left corner of the Topics page, click Create Topic.
  6. In the Create Topic panel, set the Name and Description parameters for the topic and click OK in the lower-left corner.
    You can view the topic that you create on the Topics page.

Step 3: Create a group ID

For more information about group IDs, see Terms.

  1. Log on to the Message Queue for MQTT console.
  2. In the left-side navigation pane, click Instances.
  3. In the top navigation bar, select a region.
  4. Find the instance for which you want to create a group ID in the instance list. In the Actions column, choose More > Groups.
  5. In the upper-left corner of the Groups page, click Create Group.
  6. In the Create Group panel, set the Group ID parameter and click OK.
    You can view the created group ID on the Groups page.

Step 4: Create a data outbound rule

The parameter values that you set in the rule must be the same as those that correspond to the resources you create.

  1. Log on to the Message Queue for MQTT console.
  2. In the left-side navigation pane, click Instances.
  3. In the top navigation bar, select a region.
  4. Find the instance for which you want to create a data outbound rule in the instance list. In the Actions column, choose More > Rules.
  5. In the upper-left corner of the Rules page, click Create Rule.
  6. On the Create Rule page, perform the following steps:
    1. In the Configure Basic Information step, set the following parameters and click Next.
      Parameter Example Description
      Rule ID 111111 The global unique identifier of a rule.
      • It can contain only letters, digits, hyphens (-), and underscores (_) and must contain at least one letter or digit.
      • It must be 3 to 64 characters in length. If the value contains more than 64 characters, it is automatically truncated.
      • It cannot be updated after the rule is created.
      Description migrate from rocketmq The description of the rule.
      Status Enable Specifies whether to enable the current rule. Valid values:
      • Enable
      • Disable
      Rule Type Data Outbound The type of the rule. Valid values:
    2. In the Configure Rule Source step, specify the data source and click Next.
      Parameter Example Description
      Topic TpoicA The source topic from which you want to export data. Specify a topic in Message Queue for MQTT.
    3. In the Configure Rule Destination step, specify the destination to which the data is forwarded and click Create.
      Parameter Example Description
      Destination Service Type Message Queue for Apache RocketMQ The cloud service to which the data of the source topic is forwarded.
      Note Only Message Queue for Apache RocketMQ is supported.
      Message Queue for Apache RocketMQ Instance MQ_INST_13801563067*****_BbyOD2jQ The ID of the Message Queue for Apache RocketMQ instance to which the data is forwarded.
      Topic TopicB The Message Queue for Apache RocketMQ topic to which the data is forwarded. In this example, the data of the source topic is forwarded to TopicB.
    You can view the data outbound rule that you create on the Rules page.

Step 5: Send and subscribe to messages by using Message Queue for MQTT SDK for Java

  1. Download a third-party open source SDK for Java. Download address: Eclipse Paho Java Client.
  2. Download the demo of Alibaba Cloud Message Queue for MQTT SDK for Java for reference during code development. Download address: mqtt-java-demo.
  3. Decompress this demo project package to a specific folder.
  4. In IntelliJ IDEA, import the decompressed files to create a project and check whether the following dependencies are included in the pom.xml file.
    <dependencies>
            <dependency>
                <groupId>commons-codec</groupId>
                <artifactId>commons-codec</artifactId>
                <version>1.10</version>
            </dependency>
            <dependency>
                <groupId>org.eclipse.paho</groupId>
                <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
                <version>1.2.2</version>
            </dependency>
            <dependency>
                <groupId>org.apache.httpcomponents</groupId>
                <artifactId>httpclient</artifactId>
                <version>4.5.2</version>
            </dependency>
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.48</version>
            </dependency>
            <dependency>
                <groupId>com.aliyun.openservices</groupId>
                <artifactId>ons-client</artifactId>
                <version>1.8.5.Final</version>
            </dependency>
            <dependency>
                <groupId>com.aliyun</groupId>
                <artifactId>aliyun-java-sdk-onsmqtt</artifactId>
                <version>1.0.3</version>
            </dependency>
            <dependency>
                <groupId>com.aliyun</groupId>
                <artifactId>aliyun-java-sdk-core</artifactId>
                <version>4.5.0</version>
            </dependency>
    </dependencies>
    Note For more information about the ons-client version, see Release notes.
  5. In the MQ4IoTSendMessageToRocketMQ.java class, specify parameter values as instructed in comments. Most of these parameter values correspond to the Message Queue for MQTT resources that you create in Steps 1 to Step 3 and related resources that you create in Message Queue for Apache RocketMQ. Then, execute the main() function to send and subscribe to messages.
    The following code shows an example:
    package com.aliyun.openservices.lmq.example.demo;
    
    import com.aliyun.openservices.lmq.example.util.ConnectionOptionWrapper;
    import com.aliyun.openservices.ons.api.Action;
    import com.aliyun.openservices.ons.api.ConsumeContext;
    import com.aliyun.openservices.ons.api.Consumer;
    import com.aliyun.openservices.ons.api.Message;
    import com.aliyun.openservices.ons.api.MessageListener;
    import com.aliyun.openservices.ons.api.ONSFactory;
    import com.aliyun.openservices.ons.api.PropertyKeyConst;
    import java.util.Properties;
    import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
    import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
    import org.eclipse.paho.client.mqttv3.MqttClient;
    import org.eclipse.paho.client.mqttv3.MqttMessage;
    import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
    
    /**
     * In the production environment of Alibaba Cloud, Message Queue for Apache RocketMQ instances in regions except the Internet region cannot be accessed by an on-premises server. Instead, the instances can be accessed by an Elastic Compute Service (ECS) server in the same region.
     */
    public class MQ4IoTSendMessageToRocketMQ {
        public static void main(String[] args) throws Exception {
            /**
             * Initialize a Message Queue for Apache RocketMQ client as a receiver. In most actual scenarios, the code is deployed in a backend application.
             */
            Properties properties = new Properties();
            /**
             * The group ID that you create in the Message Queue for Apache RocketMQ console.
             */
            properties.setProperty(PropertyKeyConst.GROUP_ID, "GID-XXXXX");
            /**
             // The AccessKey ID that you created in the Alibaba Cloud RAM console for identity verification.
             */
            properties.put(PropertyKeyConst.AccessKey,"XXXX");
             /**
             // The AccessKey secret that you created in the Alibaba Cloud RAM console for identity verification. The AccessKey secret is required only for signature authentication.
             */
            properties.put(PropertyKeyConst.SecretKey, "XXXX");
             /**
             * The endpoint of the Message Queue for Apache RocketMQ instance that is used in the TCP connection. You can obtain the endpoint on the Instance Details page in the Message Queue for Apache RocketMQ console.
             */
            properties.put(PropertyKeyConst.NAMESRV_ADDR, "http://xxxxx.XXXXX.mq-internet.aliyuncs.com");
            /**
             * The topic that you create in the Message Queue for Apache RocketMQ console.
             * Only a parent topic can be used by the Message Queue for Apache RocketMQ client when you exchange data between Message Queue for Apache RocketMQ and Message Queue for MQTT.
             */
            final String parentTopic = "XXXXX";
            Consumer consumer = ONSFactory.createConsumer(properties);
            consumer.subscribe(parentTopic, "*", new MessageListener() {
                public Action consume(Message message, ConsumeContext consumeContext) {
                    System.out.println("recv msg:" + message);
                    return Action.CommitMessage;
                }
            });
            consumer.start();
            //////////////////////////////////////////////////////////////////////////////////////////////////////////////////
            /**
             * Initialize a Message Queue for MQTT client as a sender. In most actual scenarios, the code is deployed on a mobile terminal.
             */
    
            /**
             * The ID of the Message Queue for MQTT instance that you created in the console.
             */
            String instanceId = "XXXXX";
             /**
             // The endpoint of the Message Queue for MQTT instance. You can obtain the endpoint on the Instance Details page in the Message Queue for MQTT console.
             */
            String endPoint = "XXXXXX.mqtt.aliyuncs.com";
             /**
             // The AccessKey ID that you created in the Alibaba Cloud RAM console for identity verification.
             */
            String accessKey = "XXXX";
            /**
             // The AccessKey secret that you created in the Alibaba Cloud RAM console for identity verification. The AccessKey secret is required only for signature authentication.
             */
            String secretKey = "XXXX";
            /**
             * The globally unique ID that the system assigns to the Message Queue for MQTT client. The client ID must be different in each TCP connection. If different TCP connections use the same client ID, an exception occurs and the connections are closed.
             * The value of the clientId parameter is in the format of GroupID@@@DeviceID. GroupID is the group ID that you created in the Message Queue for MQTT console and DeviceID is the custom ID of the device. The value of the clientId parameter can be up to 64 characters in length.
             */
            String clientId = "GID_XXXX@@@XXXXX";
           /**
             * Message Queue for MQTT allows you to use a subtopic to filter messages. You can specify a string as a subtopic, as shown in the following code.
             * The value of the mq4IotTopic parameter can be up to 128 characters in length.
             */
            final String mq4IotTopic = parentTopic + "/" + "testMq4Iot";
            /**
             * The quality of service (QoS) level in message transmission. Valid values: 0, 1, and 2. For more information, see Terms.
             */
            final int qosLevel = 0;
            ConnectionOptionWrapper connectionOptionWrapper = new ConnectionOptionWrapper(instanceId, accessKey, secretKey, clientId);
            final MemoryPersistence memoryPersistence = new MemoryPersistence();
             /**
             * The protocol and port of the Message Queue for MQTT client. The protocol and port used by the Message Queue for MQTT client must match. If SSL encryption is used, the protocol and port must be specified by following the format of ssl://endpoint:8883.
             */
            final MqttClient mqttClient = new MqttClient("tcp://" + endPoint + ":1883", clientId, memoryPersistence);
            /**
             * The timeout period for the Message Queue for MQTT client to wait for a response. The timeout period can prevent the Message Queue for MQTT client from keeping waiting for a response.
             */
            mqttClient.setTimeToWait(5000);
            mqttClient.setCallback(new MqttCallbackExtended() {
                @Override
                public void connectComplete(boolean reconnect, String serverURI) {
                    /**
                     * After a connection is established, the Message Queue for MQTT client must subscribe to the required topic as soon as possible.
                     */
                    System.out.println("connect success");
                }
    
                @Override
                public void connectionLost(Throwable throwable) {
                    throwable.printStackTrace();
                }
    
                @Override
                public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
                }
    
                @Override
                public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                    System.out.println("send msg succeed topic is : " + iMqttDeliveryToken.getTopics()[0]);
                }
            });
            mqttClient.connect(connectionOptionWrapper.getMqttConnectOptions());
            for (int i = 0; i < 10; i++) {
                MqttMessage message = new MqttMessage("hello mq4Iot pub sub msg".getBytes());
                message.setQos(qosLevel);
                /**
                 * The topic to which messages are sent. This topic must be the same as the topic that the receiver subscribes to or can be matched by using wildcard characters.
                 */
                mqttClient.publish(mq4IotTopic, message);
            }
            Thread.sleep(Long.MAX_VALUE);
    
        }
    
    }

Verify the result

After the producer sends the message and the consumer subscribes to the message, you can query the trace of the message in the Message Queue for MQTT console to verify whether the message is sent and received. For more information, see Query the message traces.

References