All Products
Search
Document Center

ApsaraMQ for MQTT:Export data from ApsaraMQ for MQTT to ApsaraMQ for RocketMQ

Last Updated:Mar 15, 2024

If you want to use specific features of ApsaraMQ for RocketMQ, such as ordered messages and transactional messages, in cloud applications, you can use message inbound or outbound rules to exchange data between ApsaraMQ for MQTT and ApsaraMQ for RocketMQ. This topic describes how to export data from ApsaraMQ for MQTT to ApsaraMQ for RocketMQ.

Background information

ApsaraMQ for MQTT supports cloud SDKs. You can connect cloud applications to ApsaraMQ for MQTT brokers to send and receive messages by using cloud SDKs. For information about how to use cloud SDKs, see Overview.

ApsaraMQ for MQTT also supports data exchange between ApsaraMQ for MQTT and other Alibaba Cloud services. Currently, you can exchange data only between ApsaraMQ for MQTT and ApsaraMQ for RocketMQ.

This topic describes how to export data from ApsaraMQ for MQTT to ApsaraMQ for RocketMQ over the Internet by using the SDK for Java.

In this scenario, you can use third-party open source SDKs for multiple programming languages to send and receive messages. For more information, see Download the SDK.

quick_start_data_outflow

Important

Messages cannot be sent between an ApsaraMQ for RocketMQ topic in one region and an ApsaraMQ for MQTT topic in another region. Therefore, all resources that are involved in this topic must be created in the Internet region.

Network access

ApsaraMQ for MQTT provides Public Endpoint and VPC Endpoint.

  • Public Endpoint is an IP address that is used to access ApsaraMQ for MQTT over the Internet. In most cases, public endpoints are used in the IoT and mobile Internet scenarios.

  • VPC Endpoint is an IP address that is used to access ApsaraMQ for MQTT in a private virtual cloud (VPC). In most cases, VPC endpoints are used by cloud applications to connect to ApsaraMQ for MQTT.

Important

If you want to use an endpoint to connect a client to ApsaraMQ for MQTT, use the domain name instead of the IP address because the IP address dynamically changes. The ApsaraMQ for MQTT technical team is not liable for faults and direct or indirect losses in the following scenarios:

  • You use an IP address to access your client to ApsaraMQ for MQTT. After the technical team of ApsaraMQ for MQTT updates the domain name resolution, the original IP address becomes invalid.

  • A firewall policy on IP addresses is set in the network in which your client is running. After the technical team of ApsaraMQ for MQTT updates the domain name resolution, new IP addresses are blocked due to the firewall policy.

In the example of this topic, a public endpoint is used. For information about the comparison of the scenarios and mappings of message attributes between ApsaraMQ for MQTT and ApsaraMQ for RocketMQ, see the following topics:

Process

The following figure shows the process for sending messages from an ApsaraMQ for MQTT client to a backend application.

quick_start_mqtt_to_rocketmq

Prerequisites

  • The integrated development environment (IDE) is installed. For more information, see IDE. You can use IntelliJ IDEA or Eclipse. In the example, IntelliJ IDEA is used.

  • The Java Development Kit (JDK) is installed. For more information, see JDK.

  • An ApsaraMQ for RocketMQ is created, and a topic and a group are created on the instance. For more information, see Step 2: Create resources.

Step 1: Create an ApsaraMQ for MQTT instance and obtain the endpoint of the instance

  1. Log on to the ApsaraMQ for MQTT console. In the left-side navigation pane, click Instances.

  2. In the top navigation bar, select the region where the instance that you want to manage resides. Then, in the upper-left corner, click Create Instance.

  3. In the panel that appears, use the default value Subscription for the Billing Method parameter. Then, Click OK.

  4. On the buy page that appears, select the instance specifications that you want to purchase based on your business requirements, select the terms of service, and then click Buy Now.

    For information about the instance editions provided by ApsaraMQ for MQTT and their feature differences, see Instance editions.

  5. On the order payment page, follow the on-screen instructions to complete the payment.

  6. On the page that appears after you complete the payment, click Console.

  7. Go back to the ApsaraMQ for MQTT console. In the left-side navigation pane, click Instances. In the top navigation bar, select the region in which your instance is deployed.

  8. On the Instances page, click the name of the instance or click Details in the Actions column of the instance to go to the Instance Details page.

  9. On the Instance Details page, click the Endpoints tab. On the tab, you can view the endpoint information. In the example, the public endpoint is used.

    Endpoint

Step 2: Create a parent topic

The MQTT protocol supports multi-level topics. You must create a parent topic in the ApsaraMQ for MQTT console. You can specify subtopics in the code without the need to create them in the console. A parent topic and its subtopics are named in the following format: <Name of a parent topic>/<Name of a level-2 topic>/<Name of a level-3 topic>. A parent topic and its subtopics are separated by forward slashes (/). Example: SendMessage/demo/producer. The total length of the names of a parent topic and its subtopics cannot exceed 64 characters. For more information about topics, see Terms.

  1. Log on to the ApsaraMQ for MQTT console. In the left-side navigation pane, click Instances.

  2. In the top navigation bar, select the region where the instance that you want to manage resides. On the Instances page, click the instance name to go to the Instance Details page.

  3. In the left-side navigation pane, click Topics. In the upper-left corner of the Topics page, click Create Topic.
  4. In the Create Topic panel, set the Name and Description parameters for the topic and click OK in the lower-left corner.

Step 3: Create a group

For more information about group IDs, see Terms.

  1. Log on to the ApsaraMQ for MQTT console. In the left-side navigation pane, click Instances.

  2. In the top navigation bar, select the region where the instance that you want to manage resides. On the Instances page, click the instance name to go to the Instance Details page.

  3. In the left-side navigation pane, click Groups. In the upper-left corner of the Groups page, click Create Group.
  4. In the Create Group panel, set the Group ID parameter and click OK.

Step 4: Create a data outbound rule

The parameter values that you set in the rule must be the same as those that correspond to the resources you create.

  1. Log on to the ApsaraMQ for MQTT console. In the left-side navigation pane, click Instances.

  2. In the top navigation bar, select the region where the instance that you want to manage resides. On the Instances page, click the instance name to go to the Instance Details page.

  3. In the left-side navigation pane, click Rules. In the upper-left corner of the Rules page, click Create Rule.

  4. On the Create Rule page, perform the following steps:

    1. In the Configure Basic Information step, configure the following parameters and click Next.

      Parameter

      Example

      Description

      Rule ID

      111111

      The ID of the rule. An ID is a globally unique identifier of a rule.

      • A rule ID can contain only letters, digits, hyphens (-), and underscores (_) and must contain at least one letter or digit.

      • A rule ID must be 3 to 64 characters in length. If the value contains more than 64 characters, it is automatically truncated.

      • The ID of a rule cannot be updated after the rule is created.

      Description

      migrate from rocketmq

      The description of the rule.

      Status

      Enable

      Specifies whether to enable the current rule. Valid values:

      • Enable

      • Disable

      Rule Type

      Data Outbound

      The type of the rule. Valid values:

    2. In the Configure Rule Source step, specify the data source and click Next.

      Parameter

      Example

      Description

      Topic

      TopicA

      The source topic from which you want to export data. Specify a topic in ApsaraMQ for MQTT.

    3. In the Configure Rule Destination step, specify the destination to which the data is forwarded and click Create.

      Parameter

      Example

      Description

      Destination Service Type

      Message Queue for Apache RocketMQ

      The cloud service to which the data of the source topic is forwarded.

      Note

      Only ApsaraMQ for RocketMQ is supported.

      Message Queue for Apache RocketMQ Instance

      MQ_INST_13801563067*****_BbyOD2jQ

      The ID of the ApsaraMQ for RocketMQ instance to which the data is forwarded.

      Note

      You can select only an instance that resides in the same region as the ApsaraMQ for MQTT instance.

      Topic

      TopicB

      The ApsaraMQ for RocketMQ topic to which the data is forwarded. In this example, the data of the source topic is forwarded to Topic B.

    You can view the data outbound rule that you create on the Rules page.

Step 5: Send and receive messages by using the SDK for Java

  1. Download a third-party open source SDK for Java. Download link: Eclipse Paho Java Client.

  2. Download the demo of the ApsaraMQ for MQTT SDK for Java for reference during code development. Download link: mqtt-java-demo.

  3. Decompress the demo project package to a specific folder.

  4. In IntelliJ IDEA, import the extracted files to create a project and check whether the pom.xml file contains the following dependencies:

    <dependencies>
            <dependency>
                <groupId>commons-codec</groupId>
                <artifactId>commons-codec</artifactId>
                <version>1.10</version>
            </dependency>
            <dependency>
                <groupId>org.eclipse.paho</groupId>
                <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
                <version>1.2.2</version>
            </dependency>
            <dependency>
                <groupId>org.apache.httpcomponents</groupId>
                <artifactId>httpclient</artifactId>
                <version>4.5.2</version>
            </dependency>
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.83</version>
            </dependency>
            <dependency>
                <groupId>com.aliyun.openservices</groupId>
                <artifactId>ons-client</artifactId>
                <version>1.8.5.Final</version>
            </dependency>
            <dependency>
                <groupId>com.aliyun</groupId>
                <artifactId>aliyun-java-sdk-onsmqtt</artifactId>
                <version>1.0.3</version>
            </dependency>
            <dependency>
                <groupId>com.aliyun</groupId>
                <artifactId>aliyun-java-sdk-core</artifactId>
                <version>4.5.0</version>
            </dependency>
    </dependencies>
    Note

    For information about the ons-client versions of the SDK for Java, see Release notes.

  5. In the MQ4IoTSendMessageToRocketMQ.java class, specify parameter values based on the instructions that are provided in the comments. Most parameter values correspond to the ApsaraMQ for MQTT resources that you created from Step 1 to Step 3 and the resources that you created in ApsaraMQ for RocketMQ. Then, execute the main() function to send and receive messages.

    Sample code:

    Note Before you use the sample code to send and receive messages, you must configure environment variables to obtain the credentials that are used to access ApsaraMQ for MQTT. For information about how to configure environment variables, see Configure an access credential.

    The environment variable name of the AccessKey ID that is used to access ApsaraMQ for MQTT is MQTT_AK_ENV, and the environment variable name of the AccessKey secret that is used to access ApsaraMQ for MQTT is MQTT_SK_ENV.

    package com.aliyun.openservices.lmq.example.demo;
    
    import com.aliyun.openservices.lmq.example.util.ConnectionOptionWrapper;
    import com.aliyun.openservices.ons.api.Action;
    import com.aliyun.openservices.ons.api.ConsumeContext;
    import com.aliyun.openservices.ons.api.Consumer;
    import com.aliyun.openservices.ons.api.Message;
    import com.aliyun.openservices.ons.api.MessageListener;
    import com.aliyun.openservices.ons.api.ONSFactory;
    import com.aliyun.openservices.ons.api.PropertyKeyConst;
    import java.util.Properties;
    import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
    import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
    import org.eclipse.paho.client.mqttv3.MqttClient;
    import org.eclipse.paho.client.mqttv3.MqttMessage;
    import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
    
    public class MQ4IoTSendMessageToRocketMQ {
        public static void main(String[] args) throws Exception {
            /**
             * Initialize the ApsaraMQ for RocketMQ client as a receiver. In most business scenarios, the receiver is deployed on a backend application. 
             */
            Properties properties = new Properties();
            /**
             * The ID of the group that you created in the ApsaraMQ for RocketMQ console. 
             */
            properties.setProperty(PropertyKeyConst.GROUP_ID, "GID-XXXXX");
            
            /**
             * The AccessKey ID that you created in the Alibaba Cloud Resource Access Management (RAM) console for identity authentication. 
             * The AccessKey pair of an Alibaba Cloud account has permissions on all API operations. To prevent security risks, we recommend that you use a RAM user to call API operations or perform routine O&M. 
             * We strongly recommend that you do not save an AccessKey pair in the project code. Otherwise, the AccessKey pair may be leaked and all resources that are contained in your account may be exposed to potential security risks. 
             * In this example, the AccessKey pair is saved in the environment variables. 
             */
            properties.put(PropertyKeyConst.AccessKey, System.getenv("MQTT_AK_ENV"));
            /**
             * The AccessKey secret that you created in the Alibaba Cloud RAM console for identity authentication. The AccessKey secret is required only for signature authentication. 
             */
            properties.put(PropertyKeyConst.SecretKey, System.getenv("MQTT_SK_ENV"));
          
             /**
             * The TCP endpoint that you want to use to access the ApsaraMQ for RocketMQ instance. You can obtain the TCP endpoint on the Instance Details page in the ApsaraMQ for RocketMQ console. 
             */
            properties.put(PropertyKeyConst.NAMESRV_ADDR, "http://xxxxx.XXXXX.mq-internet.aliyuncs.com");
            /**
             * The topic that you created in the ApsaraMQ for RocketMQ console. 
             * When you exchange data between ApsaraMQ for RocketMQ and ApsaraMQ for MQTT, only parent topics can be used by the ApsaraMQ for RocketMQ client. 
             */
            final String parentTopic = "XXXXX";
            Consumer consumer = ONSFactory.createConsumer(properties);
            consumer.subscribe(parentTopic, "*", new MessageListener() {
                public Action consume(Message message, ConsumeContext consumeContext) {
                    System.out.println("recv msg:" + message);
                    return Action.CommitMessage;
                }
            });
            consumer.start();
            //////////////////////////////////////////////////////////////////////////////////////////////////////////////////
            /**
             * Initialize the ApsaraMQ for MQTT client as a sender. In most business scenarios, the sender is deployed on a mobile terminal. 
             */
    
            /**
             * The ID of the ApsaraMQ for MQTT instance that you created in the console. 
             */
            String instanceId = "XXXXX";
             /**
             * The endpoint that you want to use to access the ApsaraMQ for MQTT instance. You can obtain the endpoint on the Instance Details page in the ApsaraMQ for MQTT console. 
             */
            String endPoint = "XXXXXX.mqtt.aliyuncs.com";
            /**
             * The AccessKey ID that you created in the Alibaba Cloud RAM console for identity authentication. 
             * The AccessKey pair of an Alibaba Cloud account has permissions on all API operations. To prevent security risks, we recommend that you use a RAM user to call API operations or perform routine O&M. 
             * We strongly recommend that you do not save an AccessKey pair in the project code. Otherwise, the AccessKey pair may be leaked and all resources that are contained in your account may be exposed to potential security risks. 
             * In this example, the AccessKey pair is saved in the environment variables. 
             */
            String accessKey = System.getenv("MQTT_AK_ENV");
            /**
             * The AccessKey secret that you created in the Alibaba Cloud RAM console for identity authentication. The AccessKey secret is required only for signature authentication. 
             */
            String secretKey = System.getenv("MQTT_SK_ENV");
            /**
             * The globally unique ID that the system assigns to the ApsaraMQ for MQTT client. The client ID must vary based on the TCP connection. If multiple TCP connections use the same client ID, exceptions occur and the connections are unexpectedly closed. 
             * The client ID consists of a group ID and a device ID and is in the GroupID@@@DeviceID format. The group ID is the ID of the group that you create in the ApsaraMQ for MQTT console. The device ID is a custom ID specified by you. The client ID cannot exceed 64 characters in length. 
             */
            String clientId = "GID_XXXX@@@XXXXX";
           /**
             * ApsaraMQ for MQTT allows you to use a subtopic to filter messages. You can specify a string as the name of the subtopic. 
             * The value of the mq4IotTopic parameter can be up to 128 characters in length. 
             */
            final String mq4IotTopic = parentTopic + "/" + "testMq4Iot";
            /**
             * The quality of service (QoS) level for message transmission. Valid values: 0, 1, and 2. For more information, see Terms. 
             */
            final int qosLevel = 0;
            ConnectionOptionWrapper connectionOptionWrapper = new ConnectionOptionWrapper(instanceId, accessKey, secretKey, clientId);
            final MemoryPersistence memoryPersistence = new MemoryPersistence();
             /**
             * The protocol and port that are used by the ApsaraMQ for MQTT client. The protocol and port that are used by the ApsaraMQ for MQTT client must match. If Secure Sockets Layer (SSL) encryption is used, the protocol and port must be specified as ssl://endpoint:8883. 
             */
            final MqttClient mqttClient = new MqttClient("tcp://" + endPoint + ":1883", clientId, memoryPersistence);
            /**
             * The timeout period during which the ApsaraMQ for MQTT client waits for a response. The timeout period prevents the ApsaraMQ for MQTT client from waiting for a response for an indefinite period of time. 
             */
            mqttClient.setTimeToWait(5000);
            mqttClient.setCallback(new MqttCallbackExtended() {
                @Override
                public void connectComplete(boolean reconnect, String serverURI) {
                    /**
                     * The topic to which the consumer must subscribe at the earliest opportunity after the client connection is established. 
                     */
                    System.out.println("connect success");
                }
    
                @Override
                public void connectionLost(Throwable throwable) {
                    throwable.printStackTrace();
                }
    
                @Override
                public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
                }
    
                @Override
                public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                    System.out.println("send msg succeed topic is : " + iMqttDeliveryToken.getTopics()[0]);
                }
            });
            mqttClient.connect(connectionOptionWrapper.getMqttConnectOptions());
            for (int i = 0; i < 10; i++) {
                MqttMessage message = new MqttMessage("hello mq4Iot pub sub msg".getBytes());
                message.setQos(qosLevel);
                /**
                 * The topic to which messages are sent. If the messages that are sent are normal messages, this topic must be the topic to which the consumer subscribes or the topic that can be matched by using wildcards. 
                 */
                mqttClient.publish(mq4IotTopic, message);
            }
            Thread.sleep(Long.MAX_VALUE);
    
        }
    
    }

Verify the results

After the producer sends a message and the consumer subscribes to the message, you can query the trace of the message in the ApsaraMQ for MQTT console to verify whether the message is sent and received. For more information, see Query message traces.

References