This topic describes how messaging is implemented between Message Queue for MQTT clients by using Message Queue for MQTT SDK for Java.

Prerequisites

  • Create resources
  • An AccessKey pair is obtained.
  • The integrated development environment (IDE) is installed. For more information, see IDE. You can use IntelliJ IDEA or Eclipse. In the example, IntelliJ IDEA is used.
  • The Java Development Kit (JDK) is installed. For more information, see JDK.

Background information

Message Queue for MQTT applies to scenarios in which Message Queue for MQTT clients interact with each other. In these scenarios, both producers and consumers are Message Queue for MQTT clients. Each Message Queue for MQTT client uses the client SDK of Message Queue for MQTT to connect to the Message Queue for MQTT broker for messaging.

Messaging between clients

This topic describes how to use Message Queue for MQTT SDK for Java to implement messaging between Message Queue for MQTT clients over the Internet.

Endpoints

You must specify an endpoint of your Message Queue for MQTT instance in the code when you call the client SDK or cloud SDK. Then, the Message Queue for MQTT client or backend service application can use the endpoint to connect to Message Queue for MQTT.

  • Formats of endpoints for the client SDK

    When you use the client SDK to connect to Message Queue for MQTT, specify the endpoint in the following format:

    • Public Endpoint: ID of the Message Queue for MQTT instance.mqtt.aliyuncs.com
    • VPC Endpoint: ID of the Message Queue for MQTT instance-internal-vpc.mqtt.aliyuncs.com

    You can also view the endpoint for the client SDK on the Endpoints tab of the Instance Details page in the Message Queue for MQTT console.

  • Formats of endpoints for the cloud SDK

    When you use the cloud SDK to connect to Message Queue for MQTT, specify the endpoint in the following format:

    • Public Endpoint: ID of the Message Queue for MQTT instance-server-internet.mqtt.aliyuncs.com
    • VPC Endpoint: ID of the Message Queue for MQTT instance-server-internal.mqtt.aliyuncs.com
Note You can view the ID of the Message Queue for MQTT instance in the Basic Information section of the Instance Details page in the Message Queue for MQTT console.

An endpoint for the client SDK or cloud SDK can be a Public Endpoint or a VPC Endpoint. A Public Endpoint is an IP address for access from the Internet and is commonly used in IoT and mobile Internet scenarios. A VPC Endpoint is an IP address for access from a private network in the cloud and is usually used by backend applications to connect to Message Queue for MQTT.

Notice When the SDK uses an endpoint to connect to Message Queue for MQTT, use the domain name instead of the IP address. The IP address unexpectedly changes. The technical team of Message Queue for MQTT is not responsible for faults and direct losses or indirect losses in the following scenarios:
  • The client or backend service application uses an IP address instead of a domain name to access the service. The original IP address becomes invalid after the technical team of Message Queue for MQTT updates the domain name resolution.
  • A firewall policy on IP addresses is set in the network on which your Message Queue for MQTT client or backend service application is running. New IP addresses are blocked due to the firewall policy after the technical team of Message Queue for MQTT updates domain name resolution.

Call the SDK for Java to send and subscribe to messages

  1. Download a third-party open source SDK for Java. Download address: Eclipse Paho Java Client.
  2. Download the demo of Alibaba Cloud Message Queue for MQTT SDK for Java for reference during code development. Download address: mqtt-java-demo.
  3. Decompress this demo project package to a specific folder.
  4. In IntelliJ IDEA, import the extracted files to create a project and check whether the pom.xml file contains the following dependencies:
    <dependencies>
            <dependency>
                <groupId>commons-codec</groupId>
                <artifactId>commons-codec</artifactId>
                <version>1.10</version>
            </dependency>
            <dependency>
                <groupId>org.eclipse.paho</groupId>
                <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
                <version>1.2.2</version>
            </dependency>
            <dependency>
                <groupId>org.apache.httpcomponents</groupId>
                <artifactId>httpclient</artifactId>
                <version>4.5.2</version>
            </dependency>
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.48</version>
            </dependency>
            <dependency>
                <groupId>com.aliyun</groupId>
                <artifactId>aliyun-java-sdk-onsmqtt</artifactId>
                <version>1.0.3</version>
            </dependency>
            <dependency>
                <groupId>com.aliyun</groupId>
                <artifactId>aliyun-java-sdk-core</artifactId>
                <version>4.5.0</version>
            </dependency>
    </dependencies>
  5. In the MQ4IoTSendMessageToMQ4IoTUseSignatureMode.java class, set the parameters as instructed in the code annotations. Most of these parameters can be set based on the information of the Message Queue for MQTT resources that you have created. For more information, see Create resources. Use the main() function to run the sample code to send and receive messages.
    Sample code:
    package com.aliyun.openservices.lmq.example.demo;
    
    import com.aliyun.openservices.lmq.example.util.ConnectionOptionWrapper;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
    import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
    import org.eclipse.paho.client.mqttv3.MqttClient;
    import org.eclipse.paho.client.mqttv3.MqttException;
    import org.eclipse.paho.client.mqttv3.MqttMessage;
    import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
    
    public class MQ4IoTSendMessageToMQ4IoTUseSignatureMode {
        public static void main(String[] args) throws Exception {
            /**
             * The ID of the Message Queue for MQTT instance that you created in the Message Queue for MQTT console. 
             */
            String instanceId = "XXXXX";
            /**
             * The endpoint of the Message Queue for MQTT instance. You can obtain the endpoint on the Instance Details page in the Message Queue for MQTT console. 
             */
            String endPoint = "XXXXX.mqtt.aliyuncs.com";
            /**
             * The AccessKey ID that you created in the Alibaba Cloud Resource Access Management (RAM) console for identity authentication. 
             */
            String accessKey = "XXXXX";
            /**
             * The AccessKey secret that you created in the Alibaba Cloud RAM console for identity authentication. The AccessKey secret is required only for signature authentication. 
             */
            String secretKey = "XXXXX";
            /**
             * The globally unique ID that the system assigns to the Message Queue for MQTT client. The client ID must vary based on TCP connections. If multiple TCP connections use the same client ID, an exception occurs and the connections are closed. 
             * The value of the clientId parameter is in the GroupID@@@DeviceID format. GroupID indicates the group ID that you created in the Message Queue for MQTT console and DeviceID indicates the custom ID of the device. The value of the clientId parameter can be up to 64 characters in length. 
             */
            String clientId = "GID_XXXXX@@@XXXXX";
            /**
             * The parent topic that you created in the Message Queue for MQTT console. 
             * If you specify a topic that does not exist or a topic that the Message Queue for MQTT client is not authorized to access, the Message Queue for MQTT broker closes the connection. 
             */
            final String parentTopic = "XXXXX";
            /**
             * Message Queue for MQTT allows you to use a subtopic to filter messages. You can specify a string as a subtopic, as shown in the following code. 
             * The value of the mq4IotTopic parameter can be 128 characters at most in length. 
             */
            final String mq4IotTopic = parentTopic + "/" + "testMq4Iot";
            /**
             * The quality of service (QoS) level in message transmission. Valid values: 0, 1, and 2. For more information, see Terms. 
             */
            final int qosLevel = 0;
            ConnectionOptionWrapper connectionOptionWrapper = new ConnectionOptionWrapper(instanceId, accessKey, secretKey, clientId);
            final MemoryPersistence memoryPersistence = new MemoryPersistence();
            /**
             * The protocol and port that are used by the Message Queue for MQTT client. The protocol and port that are used by the Message Queue for MQTT client must match. If SSL encryption is used, the protocol and port must be specified in the ssl://endpoint:8883 format. 
             */
            final MqttClient mqttClient = new MqttClient("tcp://" + endPoint + ":1883", clientId, memoryPersistence);
            /**
             * The timeout period during which the Message Queue for MQTT client waits for a response. The timeout period prevents the Message Queue for MQTT client from waiting for a response for an indefinite period of time. 
             */
            mqttClient.setTimeToWait(5000);
            final ExecutorService executorService = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>());
            mqttClient.setCallback(new MqttCallbackExtended() {
                @Override
                public void connectComplete(boolean reconnect, String serverURI) {
                    /**
                     * After a connection is established, the Message Queue for MQTT client must subscribe to the required topic at the earliest opportunity. 
                     */
                    System.out.println("connect success");
                    executorService.submit(new Runnable() {
                        @Override
                        public void run() {
                            try {
                                final String topicFilter[] = {mq4IotTopic};
                                final int[] qos = {qosLevel};
                                mqttClient.subscribe(topicFilter, qos);
                            } catch (MqttException e) {
                                e.printStackTrace();
                            }
                        }
                    });
                }
    
                @Override
                public void connectionLost(Throwable throwable) {
                    throwable.printStackTrace();
                }
    
                @Override
                public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
                    /**
                     * The callback that is invoked to consume the messages that are sent. Make sure that the callback does not throw exceptions. If a response is returned for the callback, the messages are consumed. 
                     * The messages must be consumed in a specified period. If a message is not consumed in the timeout period that you specified for the Message Queue for MQTT broker, the Message Queue for MQTT broker may attempt to resend the message in reliable transmission mode. Make sure that the business data is idempotent. 
                     */
                    System.out.println(
                        "receive msg from topic " + s + " , body is " + new String(mqttMessage.getPayload()));
                }
    
                @Override
                public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                    System.out.println("send msg succeed topic is : " + iMqttDeliveryToken.getTopics()[0]);
                }
            });
            mqttClient.connect(connectionOptionWrapper.getMqttConnectOptions());
            for (int i = 0; i < 10; i++) {
                MqttMessage message = new MqttMessage("hello mq4Iot pub sub msg".getBytes());
                message.setQos(qosLevel);
                /**
                 * The topic to which messages are sent. This topic must be the same as the topic to which the consumer subscribes or can be matched by using wildcards. 
                 */
                mqttClient.publish(mq4IotTopic, message);
                /**
                 * Message Queue for MQTT supports P2P messaging. When a producer confirms that only a consumer requires the message and has the clientId parameter value, the producer sends a P2P message to the consumer. 
                 * In P2P messaging, the consumer does not need to subscribe to the topic to which the producer sends messages. The logic is simplified at the consumer side. In P2P messaging, specify a topic in the format of {{parentTopic}}/p2p/{{targetClientId}}. 
                 */
                final String p2pSendTopic = parentTopic + "/p2p/" + clientId;
                message = new MqttMessage("hello mq4Iot p2p msg".getBytes());
                message.setQos(qosLevel);
                mqttClient.publish(p2pSendTopic, message);
            }
            Thread.sleep(Long.MAX_VALUE);
        }
    }

Verify the results

After the producer sends a message and the consumer subscribes to the message, you can query the trace of the message in the Message Queue for MQTT console to verify whether the message is sent and received. For more information, see Query the message traces.

References

Use Message Queue for MQTT SDKs for Java to implement messaging between Message Queue for MQTT clients and backend service applications