This topic describes how Message Queue for MQTT clients send messages to and subscribe to messages from each other by using Message Queue for MQTT SDK for Java. This scenario does not involve data exchanges across cloud services.

Prerequisites

  • The integrated development environment (IDE) is installed. For more information, see IDE. You can use IntelliJ IDEA or Eclipse. In the example, IntelliJ IDEA is used.
  • The Java Development Kit (JDK) is installed. For more information, see JDK.

Background information

The scenario where Message Queue for MQTT clients send messages to and subscribe to messages from each other is the simplest scenario of Message Queue for MQTT. You can use third-party open source SDKs for multiple programming languages to send and subscribe to messages in this scenario. For more information about supported languages, see Download the SDK.

This topic describes how Message Queue for MQTT clients send messages to and subscribe to messages from each other by using Message Queue for MQTT SDK for Java that is designed for the Internet.

mqtt_client_send_receive

As shown in the preceding figure, in IoT and mobile Internet scenarios, your Message Queue for MQTT clients are developed in Java and deployed on the Internet. The clients need to send messages to and subscribe to messages from each other by using a Message Queue for MQTT broker. You must embed the code of Message Queue for MQTT SDK for Java into the code of the Message Queue for MQTT clients, and set relevant parameters in the SDK to communicate with the Message Queue for MQTT broker.

Network access

Message Queue for MQTT provides the Public Endpoint and VPC Endpoint. We recommend that you use different types of endpoints based on scenarios:
  • In Internet of Things (IoT) and mobile Internet scenarios, we recommend that you use the Public Endpoint to connect your Message Queue for MQTT client to Message Queue for MQTT.
  • The VPC Endpoint is used only in specific special scenarios. If your applications are deployed on cloud servers, we recommend that you use a server-side messaging service, such as Message Queue for Apache RocketMQ.
Notice To connect a Message Queue for MQTT client to Message Queue for MQTT by using an endpoint, use the domain name instead of the IP address because the IP address can change at any time. The service team of Message Queue for MQTT is not liable for direct or indirect faults or losses that arise in the following scenarios:
  • Your Message Queue for MQTT client uses an IP address instead of a domain name to access the service. The original IP address becomes invalid after the service team of Message Queue for MQTT updates domain name resolution.
  • A firewall policy on IP addresses is set in the network where your Message Queue for MQTT client is running. New IP addresses are blocked due to the firewall policy after the product team of Message Queue for MQTT updates domain name resolution.
In the example, the public endpoint of Message Queue for MQTT is used. For more information about the comparison of scenarios and mappings of message attributes between Message Queue for MQTT and Message Queue for Apache RocketMQ, see the following two topics:

Procedure

The following figure shows the procedure of using Message Queue for MQTT SDK for Java to send and subscribe to messages. quick_start_no_cross_product

Step 1: Create a Message Queue for MQTT instance and obtain its endpoint

  1. Log on to the Message Queue for MQTT console.
  2. In the left-side navigation pane, click Instances.
  3. In the top navigation bar, select a region.
  4. On the Instances page, click Create Instance in the upper-left corner.
  5. On the buy page, select the desired instance type and click Buy Now.
    You can purchase subscription or pay-as-you-go instances. For more information about two billing methods, see Overview. In this example, a pay-as-you-go instance is created.
  6. On the Confirm Order page, select I have read and agree to Alibaba Cloud Message Queue for MQTT Agreement of Service and click Activate Now.
    In this example, a pay-as-you-go instance is created.
  7. Go back to the Message Queue for MQTT console. In the left-side navigation pane, click Instances. In the top navigation bar, select the region where your instance resides.
  8. On the Instances page, click the name of the instance that you purchased or click Details in the Actions column to go to the Instance Details page.
  9. On the Instance Details page, click the Endpoints tab. On this tab, you can view the endpoint information. In the example, the public endpoint is used.

Step 2: Create a parent topic

The MQTT protocol supports multi-level topics. You must create a parent topic in the Message Queue for MQTT console or by calling an API operation. You do not need to create a subtopic. For more information about topics, see Terms. In this example, a parent topic is created in the Message Queue for MQTT console.

  1. Log on to the Message Queue for MQTT console.
  2. In the left-side navigation pane, click Instances.
  3. In the top navigation bar, select a region.
  4. Find the instance for which you want to create a topic in the instance list. In the Actions column, choose More > Topics.
  5. In the upper-left corner of the Topics page, click Create Topic.
  6. In the Create Topic panel, set the Name and Description parameters for the topic and click OK in the lower-left corner.
    You can view the topic that you create on the Topics page.

Step 3: Create a group ID

For more information about group IDs, see Terms.

  1. Log on to the Message Queue for MQTT console.
  2. In the left-side navigation pane, click Instances.
  3. In the top navigation bar, select a region.
  4. Find the instance for which you want to create a group ID in the instance list. In the Actions column, choose More > Groups.
  5. In the upper-left corner of the Groups page, click Create Group.
  6. In the Create Group panel, set the Group ID parameter and click OK.
    You can view the created group ID on the Groups page.

Step 4: Send and subscribe to messages by using Message Queue for MQTT SDK for Java

  1. Download a third-party open source SDK for Java. Download address: Eclipse Paho Java Client.
  2. Download the demo of Alibaba Cloud Message Queue for MQTT SDK for Java for reference during code development. Download address: mqtt-java-demo.
  3. Decompress this demo project package to a specific folder.
  4. In IntelliJ IDEA, import the decompressed files to create a project and check whether the following dependencies are included in the pom.xml file.
    <dependencies>
            <dependency>
                <groupId>commons-codec</groupId>
                <artifactId>commons-codec</artifactId>
                <version>1.10</version>
            </dependency>
            <dependency>
                <groupId>org.eclipse.paho</groupId>
                <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
                <version>1.2.2</version>
            </dependency>
            <dependency>
                <groupId>org.apache.httpcomponents</groupId>
                <artifactId>httpclient</artifactId>
                <version>4.5.2</version>
            </dependency>
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.48</version>
            </dependency>
            <dependency>
                <groupId>com.aliyun</groupId>
                <artifactId>aliyun-java-sdk-onsmqtt</artifactId>
                <version>1.0.3</version>
            </dependency>
            <dependency>
                <groupId>com.aliyun</groupId>
                <artifactId>aliyun-java-sdk-core</artifactId>
                <version>4.5.0</version>
            </dependency>
    </dependencies>
  5. In the MQ4IoTSendMessageToMQ4IoTUseSignatureMode.java class, specify parameter values as instructed in comments. Most of these parameter values correspond to the Message Queue for MQTT resources that you create in Steps 1 to Step 3. Then, execute the main() function to send and subscribe to messages.
    The following code shows an example:
    package com.aliyun.openservices.lmq.example.demo;
    
    import com.aliyun.openservices.lmq.example.util.ConnectionOptionWrapper;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
    import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
    import org.eclipse.paho.client.mqttv3.MqttClient;
    import org.eclipse.paho.client.mqttv3.MqttException;
    import org.eclipse.paho.client.mqttv3.MqttMessage;
    import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
    
    public class MQ4IoTSendMessageToMQ4IoTUseSignatureMode {
        public static void main(String[] args) throws Exception {
            /**
             * The ID of the Message Queue for MQTT instance that you created in the console.
             */
            String instanceId = "XXXXX";
            /**
             // The endpoint of the Message Queue for MQTT instance. You can obtain the endpoint on the Instance Details page in the Message Queue for MQTT console.
             */
            String endPoint = "XXXXX.mqtt.aliyuncs.com";
            /**
             // The AccessKey ID that you created in the Alibaba Cloud RAM console for identity verification.
             */
            String accessKey = "XXXXX";
            /**
             // The AccessKey secret that you created in the Alibaba Cloud RAM console for identity verification. The AccessKey secret is required only for signature authentication.
             */
            String secretKey = "XXXXX";
            /**
             * The globally unique ID that the system assigns to the Message Queue for MQTT client. The client ID must be different in each TCP connection. If different TCP connections use the same client ID, an exception occurs and the connections are closed.
             * The value of the clientId parameter is in the format of GroupID@@@DeviceID. GroupID is the group ID that you created in the Message Queue for MQTT console and DeviceID is the custom ID of the device. The value of the clientId parameter can be up to 64 characters in length.
             */
            String clientId = "GID_XXXXX@@@XXXXX";
            /**
             * The parent topic that you created in the Message Queue for MQTT console.
             * If you specify a topic that is not created or a topic that the Message Queue for MQTT client is not authorized to access, the Message Queue for MQTT broker closes the connection.
             */
            final String parentTopic = "XXXXX";
            /**
             * Message Queue for MQTT allows you to use a subtopic to filter messages. You can specify a string as a subtopic, as shown in the following code.
             * The value of the mq4IotTopic parameter can be up to 128 characters in length.
             */
            final String mq4IotTopic = parentTopic + "/" + "testMq4Iot";
            /**
             * The quality of service (QoS) level in message transmission. Valid values: 0, 1, and 2. For more information, see Terms.
             */
            final int qosLevel = 0;
            ConnectionOptionWrapper connectionOptionWrapper = new ConnectionOptionWrapper(instanceId, accessKey, secretKey, clientId);
            final MemoryPersistence memoryPersistence = new MemoryPersistence();
            /**
             * The protocol and port of the Message Queue for MQTT client. The protocol and port used by the Message Queue for MQTT client must match. If SSL encryption is used, the protocol and port must be specified by following the format of ssl://endpoint:8883.
             */
            final MqttClient mqttClient = new MqttClient("tcp://" + endPoint + ":1883", clientId, memoryPersistence);
            /**
             * The timeout period for the Message Queue for MQTT client to wait for a response. The timeout period can prevent the Message Queue for MQTT client from keeping waiting for a response.
             */
            mqttClient.setTimeToWait(5000);
            final ExecutorService executorService = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>());
            mqttClient.setCallback(new MqttCallbackExtended() {
                @Override
                public void connectComplete(boolean reconnect, String serverURI) {
                    /**
                     * After a connection is established, the Message Queue for MQTT client must subscribe to the required topic as soon as possible.
                     */
                    System.out.println("connect success");
                    executorService.submit(new Runnable() {
                        @Override
                        public void run() {
                            try {
                                final String topicFilter[] = {mq4IotTopic};
                                final int[] qos = {qosLevel};
                                mqttClient.subscribe(topicFilter, qos);
                            } catch (MqttException e) {
                                e.printStackTrace();
                            }
                        }
                    });
                }
    
                @Override
                public void connectionLost(Throwable throwable) {
                    throwable.printStackTrace();
                }
    
                @Override
                public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
                    /**
                     * The callback that is triggered to consume messages. Make sure that the callback has no exception. If a response is returned for the callback, the messages are consumed.
                     * The messages must be consumed in a specified period of time. If the time used to consume messages exceeds the timeout period that is specified by the Message Queue for MQTT broker, the Message Queue for MQTT broker retries sending messages. Make sure that the idempotence of the business data is kept and the business data is de-duplicated.
                     */
                    System.out.println(
                        "receive msg from topic " + s + " , body is " + new String(mqttMessage.getPayload()));
                }
    
                @Override
                public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                    System.out.println("send msg succeed topic is : " + iMqttDeliveryToken.getTopics()[0]);
                }
            });
            mqttClient.connect(connectionOptionWrapper.getMqttConnectOptions());
            for (int i = 0; i < 10; i++) {
                MqttMessage message = new MqttMessage("hello mq4Iot pub sub msg".getBytes());
                message.setQos(qosLevel);
                /**
                 * The topic to which messages are sent. This topic must be the same as the topic that the receiver subscribes to or can be matched by using wildcard characters.
                 */
                mqttClient.publish(mq4IotTopic, message);
                /**
                 * Message Queue for MQTT supports point-to-point (P2P) messaging. If the sender sends a message to the only receiver and obtains the value of the clientId parameter, a P2P message is sent.
                 * In P2P messaging, the receiver does not need to subscribe to the topic to which the sender sends messages. The logic at the receiver side is simplified In P2P messaging, specify a topic in the format of {{parentTopic}}/p2p/{{targetClientId}}.
                 */
                final String p2pSendTopic = parentTopic + "/p2p/" + clientId;
                message = new MqttMessage("hello mq4Iot p2p msg".getBytes());
                message.setQos(qosLevel);
                mqttClient.publish(p2pSendTopic, message);
            }
            Thread.sleep(Long.MAX_VALUE);
        }
    }

Verify the result

After the producer sends the message and the consumer subscribes to the message, you can query the trace of the message in the Message Queue for MQTT console to verify whether the message is sent and received. For more information, see Query the message traces.

References