This topic describes how to implement messaging between Message Queue for MQTT clients and backend service applications by using Message Queue for MQTT SDKs for Java.

Prerequisites

  • Resources are created. For more information, see Create resources.
  • An AccessKey pair is obtained.
  • The integrated development environment (IDE) is installed. For more information, see IDE. You can use IntelliJ IDEA or Eclipse. In the example, IntelliJ IDEA is used.
  • The Java Development Kit (JDK) is installed. For more information, see JDK.

Background information

The following figure shows the messaging process between a Message Queue for MQTT client and a backend service application. The Message Queue for MQTT client can use the client SDK and the backend service application can use the cloud SDK to access Message Queue for MQTT and implement two-way communication. Interactions between clients and backend service applicationsThis topic describes how to use Message Queue for MQTT SDKs for Java to implement messaging between a Message Queue for MQTT client and a backend service application over the Internet. For more information about the sample code used to send and receive messages, see Client demos and Demo.

Usage notes

To use the cloud SDK to connect to Message Queue for MQTT for messaging, to enable this feature.

Endpoints

You must specify an endpoint of your Message Queue for MQTT instance in the code when you call the client SDK or cloud SDK. Then, the Message Queue for MQTT client or backend service application can use the endpoint to connect to Message Queue for MQTT. For more information about how to obtain endpoints, see Create an instance and obtain an endpoint of the instance.

Message Queue for MQTT provides Public Endpoint and VPC Endpoint. Public Endpoint is the public endpoint, which usually applies to IoT and mobile Internet scenarios. VPC Endpoint is the private endpoint, which is used by backend service applications to connect to Message Queue for MQTT.
  • Endpoints for access by using the client SDK
    Formats of the endpoints:
    • Public Endpoint: ID of the Message Queue for MQTT instance.mqtt.aliyuncs.com
    • VPC Endpoint: ID of the Message Queue for MQTT instance-internal-vpc.mqtt.aliyuncs.com
    You can view the ID of the Message Queue for MQTT instance in the Basic Information section of the Instance Details page in the Message Queue for MQTT console.
    Notice To connect a client to Message Queue for MQTT by using an endpoint, use the domain name instead of the IP address. The IP address is dynamic and can change without notice. The Message Queue for MQTT technical team is not liable for faults and direct or indirect losses in the following scenarios:
    • The client uses an IP address instead of a domain name to access the Message Queue for MQTT service, and the IP address becomes invalid when DNS records are changed.
    • A firewall policy is set for IP addresses on the network where your Message Queue for MQTT client resides, and the access from a new IP address is blocked when related DNS records are changed.
  • Endpoints for access by using the cloud SDK

    To use the cloud SDK to access Message Queue for MQTT, you must to obtain an endpoint.

    Formats of the endpoints:
    • Public Endpoint: ID of the Message Queue for MQTT instance-server-internet.mqtt.aliyuncs.com
    • VPC Endpoint: ID of the Message Queue for MQTT instance-server-internal.mqtt.aliyuncs.com

Call the client SDK to send messages

  1. Download a third-party open source SDK for Java. Download address: Eclipse Paho Java Client.
  2. Download the demo of the client SDK as a reference for code development. To download the demo, visit mqtt-java-demo.
  3. Decompress this demo project package to a specific folder.
  4. In IntelliJ IDEA, import the decompressed files to create a project and check whether the following dependencies are included in the pom.xml file:
    <dependencies>
            <dependency>
                <groupId>commons-codec</groupId>
                <artifactId>commons-codec</artifactId>
                <version>1.10</version>
            </dependency>
            <dependency>
                <groupId>org.eclipse.paho</groupId>
                <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
                <version>1.2.2</version>
            </dependency>
            <dependency>
                <groupId>org.apache.httpcomponents</groupId>
                <artifactId>httpclient</artifactId>
                <version>4.5.2</version>
            </dependency>
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.48</version>
            </dependency>
            <dependency>
                <groupId>com.aliyun</groupId>
                <artifactId>aliyun-java-sdk-onsmqtt</artifactId>
                <version>1.0.3</version>
            </dependency>
            <dependency>
                <groupId>com.aliyun</groupId>
                <artifactId>aliyun-java-sdk-core</artifactId>
                <version>4.5.0</version>
            </dependency>
    </dependencies>
  5. In the MQ4IoTProducerDemo.java class, set the parameters as instructed in the code annotations. Most of these parameters can be set based on the information of Message Queue for MQTT resources that you have created. For more information, see Create resources. Use the main() function to run the sample code to send messages.
    You can use the following sample code:
    package com.aliyun.openservices.lmq.example.demo;
    
    import com.aliyun.openservices.lmq.example.util.ConnectionOptionWrapper;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
    import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
    import org.eclipse.paho.client.mqttv3.MqttClient;
    import org.eclipse.paho.client.mqttv3.MqttException;
    import org.eclipse.paho.client.mqttv3.MqttMessage;
    import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
    
    
    public class MQ4IoTProducerDemo {
        public static void main(String[] args) throws Exception {
            /**
             * The ID of the Message Queue for MQTT instance that you created. 
             */
            String instanceId = "XXXXX";
            /**
             * The endpoint for accessing the Message Queue for MQTT instance by using the client SDK. You can obtain the endpoint on the Instance Details page in the Message Queue for MQTT console. 
         * Use a domain name instead of an IP address as the endpoint. Otherwise, a client exception may occur. 
             */
            String endPoint = "XXXXX.mqtt.aliyuncs.com";
            /**
             * The AccessKey ID that you created in the Alibaba Cloud RAM console for identity authentication. For information about how to obtain your AccessKey ID, see Obtain an AccessKey pair. 
             */
            String accessKey = "XXXXX";
            /**
             * The AccessKey secret that you created in the Alibaba Cloud RAM console for identity authentication. The AccessKey secret is required only for signature authentication. For information about how to obtain your AccessKey secret, see Obtain an AccessKey pair. 
             */
            String secretKey = "XXXXX";
            /**
             * The globally unique ID that the system assigns to the Message Queue for MQTT client. The client ID must vary with TCP connections. If multiple TCP connections use the same client ID, an exception occurs and the connections are closed. 
             * The value of the clientId parameter is in the format of GroupID@@@DeviceID. GroupID is the group ID that you created in the Message Queue for MQTT console, and DeviceID is the custom ID of the device. The value of the clientId parameter can be up to 64 characters in length. 
             */
            String clientId = "GID_XXXXX@@@XXXXX";
            /**
             * The parent topic that you created in the Message Queue for MQTT console. 
             * If you specify a topic that does not exist or a topic that the Message Queue for MQTT client is not authorized to access, the Message Queue for MQTT broker closes the connection. 
             */
            final String parentTopic = "XXXXX";
            /**
             * Message Queue for MQTT allows you to use a subtopic to filter messages. You can specify a string as a subtopic, as shown in the following code. 
             * The value of the mq4IotTopic parameter can be 128 characters at most in length. 
             */
            final String mq4IotTopic = parentTopic + "/" + "testMq4Iot";
            /**
             * The quality of service (QoS) level in message transmission. Valid values: 0, 1, and 2. For more information, see Terms. 
             */
            final int qosLevel = 0;
            ConnectionOptionWrapper connectionOptionWrapper = new ConnectionOptionWrapper(instanceId, accessKey, secretKey, clientId);
            final MemoryPersistence memoryPersistence = new MemoryPersistence();
             /**
             * The protocol and port that are used by the Message Queue for MQTT client. The protocol and port that are used by the Message Queue for MQTT client must match. If SSL encryption is used, the protocol and port must be specified in the format of ssl://endpoint:8883. 
             */
            final MqttClient mqttClient = new MqttClient("tcp://" + endPoint + ":1883", clientId, memoryPersistence);
             /**
             * The timeout period during which the Message Queue for MQTT client waits for a response. The timeout period can prevent the Message Queue for MQTT client from waiting for a response indefinitely. 
             */
            mqttClient.setTimeToWait(5000);
            final ExecutorService executorService = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>());
            mqttClient.setCallback(new MqttCallbackExtended() {
                @Override
                public void connectComplete(boolean reconnect, String serverURI) {
                    /**
                     * After a connection is established, the Message Queue for MQTT client must subscribe to the required topic as soon as possible. 
                     */
                    System.out.println("connect success");
                }
    
                @Override
                public void connectionLost(Throwable throwable) {
                    throwable.printStackTrace();
                }
    
                @Override
                public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
                     /**
                     * The callback that is invoked to consume the messages that are sent. Make sure that the callback does not throw exceptions. If a response is returned for the callback, the messages are consumed. 
                     * The messages must be consumed in a specified period. If a message is not consumed within the timeout period specified by the Message Queue for MQTT broker, the Message Queue for MQTT broker may attempt to send the message again in reliable transmission. Make sure that the business data is idempotent. 
                     */
                    System.out.println(
                        "receive msg from topic " + s + " , body is " + new String(mqttMessage.getPayload()));
                }
    
                @Override
                public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                    System.out.println("send msg succeed topic is : " + iMqttDeliveryToken.getTopics()[0]);
                }
            });
            mqttClient.connect(connectionOptionWrapper.getMqttConnectOptions());
            for (int i = 0; i < 10; i++) {
                MqttMessage message = new MqttMessage("hello mq4Iot pub sub msg".getBytes());
                message.setQos(qosLevel);
                 /**
                 * The topic to which messages are sent. This topic must be the same as the topic to which the consumer subscribes or can be matched by using wildcards. 
                 */
                mqttClient.publish(mq4IotTopic, message);
                /**
                 * Message Queue for MQTT supports point-to-point (P2P) messaging. When a producer confirms that only a consumer requires the message and has the clientId parameter value, the producer sends a P2P message to the consumer. 
                 * In P2P messaging, the consumer does not need to subscribe to the topic to which the producer sends messages. The logic is simplified at the consumer side. In P2P messaging, specify a topic in the format of {{parentTopic}}/p2p/{{targetClientId}}. 
                 */
                String receiverId = "xxx";
                final String p2pSendTopic = parentTopic + "/p2p/" + receiverId;
                message = new MqttMessage("hello mq4Iot p2p msg".getBytes());
                message.setQos(qosLevel);
                mqttClient.publish(p2pSendTopic, message);
            }
            Thread.sleep(Long.MAX_VALUE);
        }
    }

Call the cloud SDK to receive messages

  1. Download the cloud SDK provided by Message Queue for MQTT. For more information about the download URL, see Release notes of the cloud SDK.
  2. Download the demo of the cloud SDK as a reference for code development. To download the demo, visit mqtt-server-sdk-demo.
  3. Decompress this demo project package to a specific folder.
  4. In IntelliJ IDEA, import the decompressed files to create a project and check whether the following dependencies are included in the pom.xml file:
    <dependencies>
            <dependency>
                <groupId>com.alibaba.mqtt</groupId>
                <artifactId>server-sdk</artifactId>
                <version>1.0.0.Final</version>
            </dependency>
    
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.48</version>
            </dependency>
    </dependencies>
  5. In the MQTTConsumerDemo.java class, set the parameters as instructed in the code annotations. Most of these parameters can be set based on the information of Message Queue for MQTT resources that you have created. For more information, see Create resources. Use the main() function to run the sample code to receive messages.
    You can use the following sample code:
    package com.aliyun.openservices.lmq.example;
    
    import com.alibaba.fastjson.JSONObject;
    import com.alibaba.mqtt.server.ServerConsumer;
    import com.alibaba.mqtt.server.callback.MessageListener;
    import com.alibaba.mqtt.server.config.ChannelConfig;
    import com.alibaba.mqtt.server.config.ConsumerConfig;
    import com.alibaba.mqtt.server.model.MessageProperties;
    
    public class MQTTConsumerDemo {
        public static void main(String[] args) throws Exception {
            /**
             * The endpoint of the Message Queue for MQTT instance that you created. 
             * To obtain the endpoint for accessing the instance by using the cloud SDK, . 
             * Use a domain name instead of an IP address as the endpoint. Otherwise, a broker exception may occur. 
             */
            String domain = "domain";
    
            /**
             * The port that is used by the cloud SDK. The protocol and port that are used by the cloud SDK must match. Set the port number to 5672. 
             */
            int port = "port";
    
            /**
             * The ID of the Message Queue for MQTT instance that you created. 
             */
            String instanceId = "instanceId";
    
            /**
             * The AccessKey ID that you created in the Alibaba Cloud RAM console for identity authentication. For information about how to obtain your AccessKey ID, see Obtain an AccessKey pair. 
             */
            String accessKey = "accessKey";
    
            /**
             * The AccessKey secret that you created in the Alibaba Cloud RAM console for identity authentication. The AccessKey secret is required only for signature authentication. For information about how to obtain your AccessKey secret, see Obtain an AccessKey pair. 
             */
            String secretKey = "secretKey";
    
            /**
             * The parent topic that you created in the Message Queue for MQTT console. 
             * If you specify a topic that does not exist or a topic that the Message Queue for MQTT client is not authorized to access, the Message Queue for MQTT broker closes the connection. 
             */
            String firstTopic = "firstTopic";
    
            ChannelConfig channelConfig = new ChannelConfig();
            channelConfig.setDomain(domain);
            channelConfig.setPort(port);
            channelConfig.setInstanceId(instanceId);
            channelConfig.setAccessKey(accessKey);
            channelConfig.setSecretKey(secretKey);
    
            ServerConsumer serverConsumer = new ServerConsumer(channelConfig, new ConsumerConfig());
            serverConsumer.start();
            serverConsumer.subscribeTopic(firstTopic, new MessageListener() {
                @Override
                public void process(String msgId, MessageProperties messageProperties, byte[] payload) {
                    System.out.println("Receive:" + msgId + "," + JSONObject.toJSONString(messageProperties) + "," + new String(payload));
                }
            });
        }
    
    }
    Note For more information about the sample code used to send messages by using the cloud SDK, go to the page for MQTTProducerDemo.java in the Alibaba Cloud Management console.