All Products
Search
Document Center

ApsaraMQ for MQTT:Send and receive messages between MQTT clients and backend applications

Last Updated:Mar 11, 2026

This tutorial walks you through sending messages from an ApsaraMQ for MQTT client and receiving them in a backend service application. The client uses the client SDK (Eclipse Paho) to publish messages, and the backend uses the cloud SDK to consume them.

How it works

An ApsaraMQ for MQTT client and a backend service application communicate through the ApsaraMQ for MQTT broker. Each side uses a different SDK:

SDKLibraryConnectsUse case
Client SDKEclipse Paho Java ClientIoT devices, mobile appsPublish and subscribe to messages over MQTT
Cloud SDKApsaraMQ for MQTT server SDKBackend service applicationsConsume or send messages at scale, subscribing at the parent topic level
Messaging between an ApsaraMQ for MQTT client and a backend application

For complete sample projects, see Demo projects or Demo.

Prerequisites

Before you begin, make sure you have:

Endpoints

Both SDKs require an endpoint to connect to your ApsaraMQ for MQTT instance.

Client SDK endpoints

Access typeFormatTypical use case
Public endpoint<instance-id>.mqtt.aliyuncs.comIoT devices, mobile apps
VPC endpoint<instance-id>-internal-vpc.mqtt.aliyuncs.comClients inside a VPC

You can find the client SDK endpoint on the Endpoints tab of the Instance Details page in the ApsaraMQ for MQTT console.

Cloud SDK endpoints

Access typeFormatTypical use case
Public endpoint<instance-id>-server-internet.mqtt.aliyuncs.comBackend apps over the Internet
VPC endpoint<instance-id>-server-internal.mqtt.aliyuncs.comBackend apps inside a VPC
Important

Not all regions support cloud SDK access. See Supported regions for details.

You can find the instance ID in the Basic Information section of the Instance Details page in the ApsaraMQ for MQTT console.

Important

Always use the domain name, not the IP address. IP addresses may change without notice during domain resolution updates. ApsaraMQ for MQTT is not responsible for connection failures caused by:

  • Using a hardcoded IP address that becomes invalid after a DNS update

  • Firewall rules that block new IP addresses after a DNS update

Step 1: Set up the client SDK project

  1. Download the third-party open source SDK for Java: Eclipse Paho Java Client.

  2. Download the demo project mqtt-java-demo, extract it, and import it into IntelliJ IDEA.

Verify that your pom.xml includes the following dependencies:

<dependencies>
    <dependency>
        <groupId>commons-codec</groupId>
        <artifactId>commons-codec</artifactId>
        <version>1.10</version>
    </dependency>
    <dependency>
        <groupId>org.eclipse.paho</groupId>
        <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
        <version>1.2.2</version>
    </dependency>
    <dependency>
        <groupId>org.apache.httpcomponents</groupId>
        <artifactId>httpclient</artifactId>
        <version>4.5.2</version>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.83</version>
    </dependency>
    <dependency>
        <groupId>com.aliyun</groupId>
        <artifactId>aliyun-java-sdk-onsmqtt</artifactId>
        <version>1.0.3</version>
    </dependency>
    <dependency>
        <groupId>com.aliyun</groupId>
        <artifactId>aliyun-java-sdk-core</artifactId>
        <version>4.5.0</version>
    </dependency>
</dependencies>

Step 2: Configure credentials

Set the following environment variables for authentication. See Configure an access credential for details.

# AccessKey ID
export MQTT_AK_ENV=<your-access-key-id>
# AccessKey secret
export MQTT_SK_ENV=<your-access-key-secret>
Important

Do not hardcode AccessKey pairs in your source code. Store them in environment variables to avoid credential leaks.

Step 3: Send messages with the client SDK

Open MQ4IoTProducerDemo.java and update the following parameters with values from your ApsaraMQ for MQTT instance. See Create resources for where to find these values.

ParameterDescriptionExample
instanceIdApsaraMQ for MQTT instance IDpost-cn-xxxxx
endPointClient SDK endpoint (see the Endpoints section above)post-cn-xxxxx.mqtt.aliyuncs.com
clientIdGlobally unique client ID in {GroupID}@@@{DeviceID} format. Maximum 64 characters. Each TCP connection must use a different client ID.GID_test@@@device_001
parentTopicParent topic created in the consoletestTopic

Connect to the broker

Create an MQTT client and establish a connection. Use tcp://endpoint:1883 for plaintext or ssl://endpoint:8883 for SSL/TLS.

String instanceId = "XXXXX";
String endPoint = "XXXXX.mqtt.aliyuncs.com";
String accessKey = System.getenv("MQTT_AK_ENV");
String secretKey = System.getenv("MQTT_SK_ENV");
String clientId = "GID_XXXXX@@@XXXXX";

ConnectionOptionWrapper connectionOptionWrapper =
    new ConnectionOptionWrapper(instanceId, accessKey, secretKey, clientId);
final MemoryPersistence memoryPersistence = new MemoryPersistence();

final MqttClient mqttClient =
    new MqttClient("tcp://" + endPoint + ":1883", clientId, memoryPersistence);

// Set the response timeout (ms) to avoid indefinite waits
mqttClient.setTimeToWait(5000);
The client ID uses {GroupID}@@@{DeviceID} format and must be globally unique per TCP connection. Reusing a client ID across connections causes the broker to disconnect the existing session.

Set up callbacks

Set callbacks before connecting to avoid missing messages during session resumption.

mqttClient.setCallback(new MqttCallbackExtended() {
    @Override
    public void connectComplete(boolean reconnect, String serverURI) {
        System.out.println("connect success");
    }

    @Override
    public void connectionLost(Throwable throwable) {
        throwable.printStackTrace();
    }

    @Override
    public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
        // Process messages within the broker's timeout to avoid redelivery.
        // Implement deduplication to ensure idempotent processing.
        System.out.println(
            "receive msg from topic " + s + " , body is "
            + new String(mqttMessage.getPayload()));
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
        System.out.println("send msg succeed topic is : "
            + iMqttDeliveryToken.getTopics()[0]);
    }
});

mqttClient.connect(connectionOptionWrapper.getMqttConnectOptions());

Publish messages

Publish messages to a subtopic. Subtopics are appended to the parent topic and can be up to 128 characters.

final String parentTopic = "XXXXX";
final String mq4IotTopic = parentTopic + "/" + "testMq4Iot";
final int qosLevel = 0;

MqttMessage message = new MqttMessage("hello mq4Iot pub sub msg".getBytes());
message.setQos(qosLevel);
mqttClient.publish(mq4IotTopic, message);

QoS levels: 0 (at most once), 1 (at least once), or 2 (exactly once).

Send point-to-point (P2P) messages

P2P messaging sends a message directly to a specific client. The target client does not need to subscribe to the topic. Publish to {parentTopic}/p2p/{targetClientId}.

String receiverId = "GID_test@@@device_002";
final String p2pSendTopic = parentTopic + "/p2p/" + receiverId;
MqttMessage p2pMessage = new MqttMessage("hello mq4Iot p2p msg".getBytes());
p2pMessage.setQos(qosLevel);
mqttClient.publish(p2pSendTopic, p2pMessage);

Complete producer code

The following code combines all the steps above. Open MQ4IoTProducerDemo.java and update the parameters.

package com.aliyun.openservices.lmq.example.demo;

import com.aliyun.openservices.lmq.example.util.ConnectionOptionWrapper;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;


public class MQ4IoTProducerDemo {
    public static void main(String[] args) throws Exception {
        // ApsaraMQ for MQTT instance ID
        String instanceId = "XXXXX";

        // Client SDK endpoint. Use the domain name, not the IP address.
        String endPoint = "XXXXX.mqtt.aliyuncs.com";

        // Load credentials from environment variables
        String accessKey = System.getenv("MQTT_AK_ENV");
        String secretKey = System.getenv("MQTT_SK_ENV");

        // Client ID format: {GroupID}@@@{DeviceID}
        // Must be unique per TCP connection. Max 64 characters.
        String clientId = "GID_XXXXX@@@XXXXX";

        // Parent topic created in the ApsaraMQ for MQTT console.
        // An invalid or unauthorized topic causes the broker to close the connection.
        final String parentTopic = "XXXXX";

        // Subtopic for message filtering. Max 128 characters.
        final String mq4IotTopic = parentTopic + "/" + "testMq4Iot";

        // QoS level: 0, 1, or 2
        final int qosLevel = 0;

        ConnectionOptionWrapper connectionOptionWrapper =
            new ConnectionOptionWrapper(instanceId, accessKey, secretKey, clientId);
        final MemoryPersistence memoryPersistence = new MemoryPersistence();

        // Connect over TCP on port 1883. For SSL, use ssl://endpoint:8883.
        final MqttClient mqttClient =
            new MqttClient("tcp://" + endPoint + ":1883", clientId, memoryPersistence);

        // Set the response timeout (ms) to avoid indefinite waits
        mqttClient.setTimeToWait(5000);

        final ExecutorService executorService = new ThreadPoolExecutor(
            1, 1, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());

        // Set callbacks before connecting to avoid missing messages
        mqttClient.setCallback(new MqttCallbackExtended() {
            @Override
            public void connectComplete(boolean reconnect, String serverURI) {
                System.out.println("connect success");
            }

            @Override
            public void connectionLost(Throwable throwable) {
                throwable.printStackTrace();
            }

            @Override
            public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
                // Process messages within the broker's timeout to avoid redelivery.
                // Implement deduplication to ensure idempotent processing.
                System.out.println(
                    "receive msg from topic " + s + " , body is "
                    + new String(mqttMessage.getPayload()));
            }

            @Override
            public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                System.out.println("send msg succeed topic is : "
                    + iMqttDeliveryToken.getTopics()[0]);
            }
        });

        mqttClient.connect(connectionOptionWrapper.getMqttConnectOptions());

        for (int i = 0; i < 10; i++) {
            // Publish to a subtopic (standard pub/sub)
            MqttMessage message = new MqttMessage("hello mq4Iot pub sub msg".getBytes());
            message.setQos(qosLevel);
            mqttClient.publish(mq4IotTopic, message);

            // Publish a point-to-point (P2P) message directly to a specific client.
            // The target client does not need to subscribe to this topic.
            // Topic format: {parentTopic}/p2p/{targetClientId}
            String receiverId = "xxx";
            final String p2pSendTopic = parentTopic + "/p2p/" + receiverId;
            message = new MqttMessage("hello mq4Iot p2p msg".getBytes());
            message.setQos(qosLevel);
            mqttClient.publish(p2pSendTopic, message);
        }
        Thread.sleep(Long.MAX_VALUE);
    }
}

Verify the producer

Run MQ4IoTProducerDemo.java. If the connection succeeds, you see output similar to:

connect success
send msg succeed topic is : XXXXX/testMq4Iot
send msg succeed topic is : XXXXX/p2p/xxx
...

Step 4: Receive messages with the cloud SDK

The cloud SDK connects your backend application to the ApsaraMQ for MQTT broker. Unlike the client SDK, the cloud SDK subscribes to parent topics only (not subtopics), making it suitable for backend scenarios where applications collect and analyze all messages under a topic.

Set up the project

  1. Download the cloud SDK. See Release notes for the latest version.

  2. Download the demo project: mqtt-server-sdk-demo.

  3. Extract the package and import it into IntelliJ IDEA.

  4. Verify that your pom.xml includes the following dependencies:

       <dependencies>
           <dependency>
               <groupId>com.alibaba.mqtt</groupId>
               <artifactId>server-sdk</artifactId>
               <version>1.0.0.Final</version>
           </dependency>
           <dependency>
               <groupId>com.alibaba</groupId>
               <artifactId>fastjson</artifactId>
               <version>1.2.83</version>
           </dependency>
       </dependencies>

Configure and run the consumer

Open MQTTConsumerDemo.java and update the following parameters:

ParameterDescriptionExample
domainCloud SDK endpoint (see the Endpoints section above)post-cn-xxxxx-server-internet.mqtt.aliyuncs.com
portCloud SDK port. Always 5672.5672
instanceIdApsaraMQ for MQTT instance IDpost-cn-xxxxx
firstTopicParent topic to subscribe totestTopic

Credentials are loaded from the same environment variables (MQTT_AK_ENV and MQTT_SK_ENV) configured in Step 2.

Connect and subscribe

String domain = "post-cn-jaj3h8i****.mqtt.aliyuncs.com";
int port = 5672;
String instanceId = "post-cn-jaj3h8i****";
String accessKey = System.getenv("MQTT_AK_ENV");
String secretKey = System.getenv("MQTT_SK_ENV");
String firstTopic = "firstTopic";

ChannelConfig channelConfig = new ChannelConfig();
channelConfig.setDomain(domain);
channelConfig.setPort(port);
channelConfig.setInstanceId(instanceId);
channelConfig.setAccessKey(accessKey);
channelConfig.setSecretKey(secretKey);

ServerConsumer serverConsumer = new ServerConsumer(channelConfig, new ConsumerConfig());
serverConsumer.start();

serverConsumer.subscribeTopic(firstTopic, new MessageListener() {
    @Override
    public void process(String msgId, MessageProperties messageProperties,
                        byte[] payload) {
        System.out.println("Receive:" + msgId + ","
            + JSONObject.toJSONString(messageProperties) + ","
            + new String(payload));
    }
});

Complete consumer code

package com.aliyun.openservices.lmq.example;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.mqtt.server.ServerConsumer;
import com.alibaba.mqtt.server.callback.MessageListener;
import com.alibaba.mqtt.server.config.ChannelConfig;
import com.alibaba.mqtt.server.config.ConsumerConfig;
import com.alibaba.mqtt.server.model.MessageProperties;

public class MQTTConsumerDemo {
    public static void main(String[] args) throws Exception {
        // Cloud SDK endpoint. Use the domain name, not the IP address.
        String domain = "post-cn-jaj3h8i****.mqtt.aliyuncs.com";

        // Cloud SDK port. Must be 5672.
        int port = 5672;

        // ApsaraMQ for MQTT instance ID
        String instanceId = "post-cn-jaj3h8i****";

        // Load credentials from environment variables
        String accessKey = System.getenv("MQTT_AK_ENV");
        String secretKey = System.getenv("MQTT_SK_ENV");

        // Parent topic to subscribe to.
        // The cloud SDK subscribes to parent topics only (no subtopics).
        // An invalid or unauthorized topic causes the broker to close the connection.
        String firstTopic = "firstTopic";

        ChannelConfig channelConfig = new ChannelConfig();
        channelConfig.setDomain(domain);
        channelConfig.setPort(port);
        channelConfig.setInstanceId(instanceId);
        channelConfig.setAccessKey(accessKey);
        channelConfig.setSecretKey(secretKey);

        ServerConsumer serverConsumer = new ServerConsumer(channelConfig, new ConsumerConfig());
        serverConsumer.start();

        // Subscribe and process incoming messages
        serverConsumer.subscribeTopic(firstTopic, new MessageListener() {
            @Override
            public void process(String msgId, MessageProperties messageProperties,
                                byte[] payload) {
                System.out.println("Receive:" + msgId + ","
                    + JSONObject.toJSONString(messageProperties) + ","
                    + new String(payload));
            }
        });
    }
}

Key concepts

ConceptDescription
Client IDFormat: {GroupID}@@@{DeviceID}. Must be globally unique per TCP connection. Maximum 64 characters. Reusing a client ID across connections causes the broker to disconnect the existing session.
Parent topicCreated in the ApsaraMQ for MQTT console. An invalid or unauthorized topic causes the broker to close the connection.
SubtopicAppended to the parent topic (for example, parentTopic/testMq4Iot). Used for message filtering. Maximum 128 characters.
QoSQuality of Service levels: 0 (at most once), 1 (at least once), 2 (exactly once).
P2P messagingPublish to {parentTopic}/p2p/{targetClientId} to send directly to a specific client without requiring the client to subscribe.
Protocol and portClient SDK: tcp://endpoint:1883 (plaintext) or ssl://endpoint:8883 (SSL/TLS). Cloud SDK: port 5672.

FAQ

How do I enable automatic reconnection?

Set setAutomaticReconnect(true) on MqttConnectOptions before connecting:

MqttConnectOptions options = connectionOptionWrapper.getMqttConnectOptions();
options.setAutomaticReconnect(true);
mqttClient.connect(options);

How do I connect over SSL/TLS?

Replace tcp:// with ssl:// and use port 8883:

final MqttClient mqttClient =
    new MqttClient("ssl://" + endPoint + ":8883", clientId, memoryPersistence);

Can I send messages from a backend application to an MQTT client?

Yes. Use MQTTProducerDemo.java in the cloud SDK demo project for the reverse messaging direction (backend to client).

What's next

  • Explore more messaging patterns in the Demo projects

  • Check supported regions for cloud SDK access