This tutorial walks you through sending messages from an ApsaraMQ for MQTT client and receiving them in a backend service application. The client uses the client SDK (Eclipse Paho) to publish messages, and the backend uses the cloud SDK to consume them.
How it works
An ApsaraMQ for MQTT client and a backend service application communicate through the ApsaraMQ for MQTT broker. Each side uses a different SDK:
| SDK | Library | Connects | Use case |
|---|---|---|---|
| Client SDK | Eclipse Paho Java Client | IoT devices, mobile apps | Publish and subscribe to messages over MQTT |
| Cloud SDK | ApsaraMQ for MQTT server SDK | Backend service applications | Consume or send messages at scale, subscribing at the parent topic level |

For complete sample projects, see Demo projects or Demo.
Prerequisites
Before you begin, make sure you have:
An ApsaraMQ for MQTT instance with the required resources (group, topic). See Create resources
An AccessKey pair. See Obtain an AccessKey pair
JDK installed
IntelliJ IDEA or Eclipse installed (this tutorial uses IntelliJ IDEA)
Endpoints
Both SDKs require an endpoint to connect to your ApsaraMQ for MQTT instance.
Client SDK endpoints
| Access type | Format | Typical use case |
|---|---|---|
| Public endpoint | <instance-id>.mqtt.aliyuncs.com | IoT devices, mobile apps |
| VPC endpoint | <instance-id>-internal-vpc.mqtt.aliyuncs.com | Clients inside a VPC |
You can find the client SDK endpoint on the Endpoints tab of the Instance Details page in the ApsaraMQ for MQTT console.
Cloud SDK endpoints
| Access type | Format | Typical use case |
|---|---|---|
| Public endpoint | <instance-id>-server-internet.mqtt.aliyuncs.com | Backend apps over the Internet |
| VPC endpoint | <instance-id>-server-internal.mqtt.aliyuncs.com | Backend apps inside a VPC |
Not all regions support cloud SDK access. See Supported regions for details.
You can find the instance ID in the Basic Information section of the Instance Details page in the ApsaraMQ for MQTT console.
Always use the domain name, not the IP address. IP addresses may change without notice during domain resolution updates. ApsaraMQ for MQTT is not responsible for connection failures caused by:
Using a hardcoded IP address that becomes invalid after a DNS update
Firewall rules that block new IP addresses after a DNS update
Step 1: Set up the client SDK project
Download the third-party open source SDK for Java: Eclipse Paho Java Client.
Download the demo project mqtt-java-demo, extract it, and import it into IntelliJ IDEA.
Verify that your pom.xml includes the following dependencies:
<dependencies>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.10</version>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.2</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.2</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83</version>
</dependency>
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>aliyun-java-sdk-onsmqtt</artifactId>
<version>1.0.3</version>
</dependency>
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>aliyun-java-sdk-core</artifactId>
<version>4.5.0</version>
</dependency>
</dependencies>Step 2: Configure credentials
Set the following environment variables for authentication. See Configure an access credential for details.
# AccessKey ID
export MQTT_AK_ENV=<your-access-key-id>
# AccessKey secret
export MQTT_SK_ENV=<your-access-key-secret>Do not hardcode AccessKey pairs in your source code. Store them in environment variables to avoid credential leaks.
Step 3: Send messages with the client SDK
Open MQ4IoTProducerDemo.java and update the following parameters with values from your ApsaraMQ for MQTT instance. See Create resources for where to find these values.
| Parameter | Description | Example |
|---|---|---|
instanceId | ApsaraMQ for MQTT instance ID | post-cn-xxxxx |
endPoint | Client SDK endpoint (see the Endpoints section above) | post-cn-xxxxx.mqtt.aliyuncs.com |
clientId | Globally unique client ID in {GroupID}@@@{DeviceID} format. Maximum 64 characters. Each TCP connection must use a different client ID. | GID_test@@@device_001 |
parentTopic | Parent topic created in the console | testTopic |
Connect to the broker
Create an MQTT client and establish a connection. Use tcp://endpoint:1883 for plaintext or ssl://endpoint:8883 for SSL/TLS.
String instanceId = "XXXXX";
String endPoint = "XXXXX.mqtt.aliyuncs.com";
String accessKey = System.getenv("MQTT_AK_ENV");
String secretKey = System.getenv("MQTT_SK_ENV");
String clientId = "GID_XXXXX@@@XXXXX";
ConnectionOptionWrapper connectionOptionWrapper =
new ConnectionOptionWrapper(instanceId, accessKey, secretKey, clientId);
final MemoryPersistence memoryPersistence = new MemoryPersistence();
final MqttClient mqttClient =
new MqttClient("tcp://" + endPoint + ":1883", clientId, memoryPersistence);
// Set the response timeout (ms) to avoid indefinite waits
mqttClient.setTimeToWait(5000);The client ID uses {GroupID}@@@{DeviceID} format and must be globally unique per TCP connection. Reusing a client ID across connections causes the broker to disconnect the existing session.Set up callbacks
Set callbacks before connecting to avoid missing messages during session resumption.
mqttClient.setCallback(new MqttCallbackExtended() {
@Override
public void connectComplete(boolean reconnect, String serverURI) {
System.out.println("connect success");
}
@Override
public void connectionLost(Throwable throwable) {
throwable.printStackTrace();
}
@Override
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
// Process messages within the broker's timeout to avoid redelivery.
// Implement deduplication to ensure idempotent processing.
System.out.println(
"receive msg from topic " + s + " , body is "
+ new String(mqttMessage.getPayload()));
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
System.out.println("send msg succeed topic is : "
+ iMqttDeliveryToken.getTopics()[0]);
}
});
mqttClient.connect(connectionOptionWrapper.getMqttConnectOptions());Publish messages
Publish messages to a subtopic. Subtopics are appended to the parent topic and can be up to 128 characters.
final String parentTopic = "XXXXX";
final String mq4IotTopic = parentTopic + "/" + "testMq4Iot";
final int qosLevel = 0;
MqttMessage message = new MqttMessage("hello mq4Iot pub sub msg".getBytes());
message.setQos(qosLevel);
mqttClient.publish(mq4IotTopic, message);QoS levels: 0 (at most once), 1 (at least once), or 2 (exactly once).
Send point-to-point (P2P) messages
P2P messaging sends a message directly to a specific client. The target client does not need to subscribe to the topic. Publish to {parentTopic}/p2p/{targetClientId}.
String receiverId = "GID_test@@@device_002";
final String p2pSendTopic = parentTopic + "/p2p/" + receiverId;
MqttMessage p2pMessage = new MqttMessage("hello mq4Iot p2p msg".getBytes());
p2pMessage.setQos(qosLevel);
mqttClient.publish(p2pSendTopic, p2pMessage);Complete producer code
The following code combines all the steps above. Open MQ4IoTProducerDemo.java and update the parameters.
package com.aliyun.openservices.lmq.example.demo;
import com.aliyun.openservices.lmq.example.util.ConnectionOptionWrapper;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class MQ4IoTProducerDemo {
public static void main(String[] args) throws Exception {
// ApsaraMQ for MQTT instance ID
String instanceId = "XXXXX";
// Client SDK endpoint. Use the domain name, not the IP address.
String endPoint = "XXXXX.mqtt.aliyuncs.com";
// Load credentials from environment variables
String accessKey = System.getenv("MQTT_AK_ENV");
String secretKey = System.getenv("MQTT_SK_ENV");
// Client ID format: {GroupID}@@@{DeviceID}
// Must be unique per TCP connection. Max 64 characters.
String clientId = "GID_XXXXX@@@XXXXX";
// Parent topic created in the ApsaraMQ for MQTT console.
// An invalid or unauthorized topic causes the broker to close the connection.
final String parentTopic = "XXXXX";
// Subtopic for message filtering. Max 128 characters.
final String mq4IotTopic = parentTopic + "/" + "testMq4Iot";
// QoS level: 0, 1, or 2
final int qosLevel = 0;
ConnectionOptionWrapper connectionOptionWrapper =
new ConnectionOptionWrapper(instanceId, accessKey, secretKey, clientId);
final MemoryPersistence memoryPersistence = new MemoryPersistence();
// Connect over TCP on port 1883. For SSL, use ssl://endpoint:8883.
final MqttClient mqttClient =
new MqttClient("tcp://" + endPoint + ":1883", clientId, memoryPersistence);
// Set the response timeout (ms) to avoid indefinite waits
mqttClient.setTimeToWait(5000);
final ExecutorService executorService = new ThreadPoolExecutor(
1, 1, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
// Set callbacks before connecting to avoid missing messages
mqttClient.setCallback(new MqttCallbackExtended() {
@Override
public void connectComplete(boolean reconnect, String serverURI) {
System.out.println("connect success");
}
@Override
public void connectionLost(Throwable throwable) {
throwable.printStackTrace();
}
@Override
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
// Process messages within the broker's timeout to avoid redelivery.
// Implement deduplication to ensure idempotent processing.
System.out.println(
"receive msg from topic " + s + " , body is "
+ new String(mqttMessage.getPayload()));
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
System.out.println("send msg succeed topic is : "
+ iMqttDeliveryToken.getTopics()[0]);
}
});
mqttClient.connect(connectionOptionWrapper.getMqttConnectOptions());
for (int i = 0; i < 10; i++) {
// Publish to a subtopic (standard pub/sub)
MqttMessage message = new MqttMessage("hello mq4Iot pub sub msg".getBytes());
message.setQos(qosLevel);
mqttClient.publish(mq4IotTopic, message);
// Publish a point-to-point (P2P) message directly to a specific client.
// The target client does not need to subscribe to this topic.
// Topic format: {parentTopic}/p2p/{targetClientId}
String receiverId = "xxx";
final String p2pSendTopic = parentTopic + "/p2p/" + receiverId;
message = new MqttMessage("hello mq4Iot p2p msg".getBytes());
message.setQos(qosLevel);
mqttClient.publish(p2pSendTopic, message);
}
Thread.sleep(Long.MAX_VALUE);
}
}Verify the producer
Run MQ4IoTProducerDemo.java. If the connection succeeds, you see output similar to:
connect success
send msg succeed topic is : XXXXX/testMq4Iot
send msg succeed topic is : XXXXX/p2p/xxx
...Step 4: Receive messages with the cloud SDK
The cloud SDK connects your backend application to the ApsaraMQ for MQTT broker. Unlike the client SDK, the cloud SDK subscribes to parent topics only (not subtopics), making it suitable for backend scenarios where applications collect and analyze all messages under a topic.
Set up the project
Download the cloud SDK. See Release notes for the latest version.
Download the demo project: mqtt-server-sdk-demo.
Extract the package and import it into IntelliJ IDEA.
Verify that your
pom.xmlincludes the following dependencies:<dependencies> <dependency> <groupId>com.alibaba.mqtt</groupId> <artifactId>server-sdk</artifactId> <version>1.0.0.Final</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.83</version> </dependency> </dependencies>
Configure and run the consumer
Open MQTTConsumerDemo.java and update the following parameters:
| Parameter | Description | Example |
|---|---|---|
domain | Cloud SDK endpoint (see the Endpoints section above) | post-cn-xxxxx-server-internet.mqtt.aliyuncs.com |
port | Cloud SDK port. Always 5672. | 5672 |
instanceId | ApsaraMQ for MQTT instance ID | post-cn-xxxxx |
firstTopic | Parent topic to subscribe to | testTopic |
Credentials are loaded from the same environment variables (MQTT_AK_ENV and MQTT_SK_ENV) configured in Step 2.
Connect and subscribe
String domain = "post-cn-jaj3h8i****.mqtt.aliyuncs.com";
int port = 5672;
String instanceId = "post-cn-jaj3h8i****";
String accessKey = System.getenv("MQTT_AK_ENV");
String secretKey = System.getenv("MQTT_SK_ENV");
String firstTopic = "firstTopic";
ChannelConfig channelConfig = new ChannelConfig();
channelConfig.setDomain(domain);
channelConfig.setPort(port);
channelConfig.setInstanceId(instanceId);
channelConfig.setAccessKey(accessKey);
channelConfig.setSecretKey(secretKey);
ServerConsumer serverConsumer = new ServerConsumer(channelConfig, new ConsumerConfig());
serverConsumer.start();
serverConsumer.subscribeTopic(firstTopic, new MessageListener() {
@Override
public void process(String msgId, MessageProperties messageProperties,
byte[] payload) {
System.out.println("Receive:" + msgId + ","
+ JSONObject.toJSONString(messageProperties) + ","
+ new String(payload));
}
});Complete consumer code
package com.aliyun.openservices.lmq.example;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.mqtt.server.ServerConsumer;
import com.alibaba.mqtt.server.callback.MessageListener;
import com.alibaba.mqtt.server.config.ChannelConfig;
import com.alibaba.mqtt.server.config.ConsumerConfig;
import com.alibaba.mqtt.server.model.MessageProperties;
public class MQTTConsumerDemo {
public static void main(String[] args) throws Exception {
// Cloud SDK endpoint. Use the domain name, not the IP address.
String domain = "post-cn-jaj3h8i****.mqtt.aliyuncs.com";
// Cloud SDK port. Must be 5672.
int port = 5672;
// ApsaraMQ for MQTT instance ID
String instanceId = "post-cn-jaj3h8i****";
// Load credentials from environment variables
String accessKey = System.getenv("MQTT_AK_ENV");
String secretKey = System.getenv("MQTT_SK_ENV");
// Parent topic to subscribe to.
// The cloud SDK subscribes to parent topics only (no subtopics).
// An invalid or unauthorized topic causes the broker to close the connection.
String firstTopic = "firstTopic";
ChannelConfig channelConfig = new ChannelConfig();
channelConfig.setDomain(domain);
channelConfig.setPort(port);
channelConfig.setInstanceId(instanceId);
channelConfig.setAccessKey(accessKey);
channelConfig.setSecretKey(secretKey);
ServerConsumer serverConsumer = new ServerConsumer(channelConfig, new ConsumerConfig());
serverConsumer.start();
// Subscribe and process incoming messages
serverConsumer.subscribeTopic(firstTopic, new MessageListener() {
@Override
public void process(String msgId, MessageProperties messageProperties,
byte[] payload) {
System.out.println("Receive:" + msgId + ","
+ JSONObject.toJSONString(messageProperties) + ","
+ new String(payload));
}
});
}
}Key concepts
| Concept | Description |
|---|---|
| Client ID | Format: {GroupID}@@@{DeviceID}. Must be globally unique per TCP connection. Maximum 64 characters. Reusing a client ID across connections causes the broker to disconnect the existing session. |
| Parent topic | Created in the ApsaraMQ for MQTT console. An invalid or unauthorized topic causes the broker to close the connection. |
| Subtopic | Appended to the parent topic (for example, parentTopic/testMq4Iot). Used for message filtering. Maximum 128 characters. |
| QoS | Quality of Service levels: 0 (at most once), 1 (at least once), 2 (exactly once). |
| P2P messaging | Publish to {parentTopic}/p2p/{targetClientId} to send directly to a specific client without requiring the client to subscribe. |
| Protocol and port | Client SDK: tcp://endpoint:1883 (plaintext) or ssl://endpoint:8883 (SSL/TLS). Cloud SDK: port 5672. |
FAQ
How do I enable automatic reconnection?
Set setAutomaticReconnect(true) on MqttConnectOptions before connecting:
MqttConnectOptions options = connectionOptionWrapper.getMqttConnectOptions();
options.setAutomaticReconnect(true);
mqttClient.connect(options);How do I connect over SSL/TLS?
Replace tcp:// with ssl:// and use port 8883:
final MqttClient mqttClient =
new MqttClient("ssl://" + endPoint + ":8883", clientId, memoryPersistence);Can I send messages from a backend application to an MQTT client?
Yes. Use MQTTProducerDemo.java in the cloud SDK demo project for the reverse messaging direction (backend to client).
What's next
Explore more messaging patterns in the Demo projects
Check supported regions for cloud SDK access