When connecting an open-source MQTT client to ApsaraMQ for MQTT for the first time, the default SDK behavior does not retry a failed initial connection -- leaving your client silently offline. This guide walks through client initialization, connection configuration, and a retry pattern that handles first-connection failures with the Eclipse Paho Java SDK.
Quick start
Copy this self-contained program to connect, subscribe, and publish a test message. Replace the placeholder values with your own instance details.
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.util.logging.Logger;
public class MqttQuickStart {
private static final Logger log = Logger.getLogger(MqttQuickStart.class.getName());
public static void main(String[] args) throws Exception {
// Connection parameters
String brokerUrl = "<your-broker-url>"; // Example: tcp://xxxxx.mqtt.aliyuncs.com:1883
String clientId = "<your-client-id>"; // Example: GID_test@@@client1
String topic = "<your-topic>"; // Example: test/topic/1
int qos = 1;
// Create the client
MemoryPersistence persistence = new MemoryPersistence();
MqttClient client = new MqttClient(brokerUrl, clientId, persistence);
client.setTimeToWait(3000L);
// Register callbacks
client.setCallback(new MqttCallbackExtended() {
@Override
public void connectComplete(boolean reconnect, String serverURI) {
log.info("Connected to " + serverURI + ", reconnect=" + reconnect);
try {
// Re-subscribe after every (re)connection.
// Subscriptions are lost when cleanSession is true.
client.subscribe(topic, qos);
log.info("Subscribed to " + topic);
} catch (MqttException e) {
log.severe("Subscribe failed: " + e.getMessage());
}
}
@Override
public void connectionLost(Throwable cause) {
log.warning("Connection lost: " + cause.getMessage());
}
@Override
public void messageArrived(String topic, MqttMessage message) {
// Do not block or call synchronous publish methods here.
// Doing so causes a deadlock that stops heartbeats
// and drops the connection.
log.info("Message on " + topic + ": " + new String(message.getPayload()));
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
// No action needed for this example.
}
});
// Configure connection options
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(true);
options.setKeepAliveInterval(60);
options.setAutomaticReconnect(true);
options.setMaxInflight(1000);
// Connect with retry for initial connection
for (;;) {
try {
client.connect(options);
log.info("Initial connection successful");
break;
} catch (Throwable e) {
log.severe("Connection failed: " + e.getMessage());
Thread.sleep(5000L);
}
}
// Publish a test message
String payload = "Hello from ApsaraMQ for MQTT";
MqttMessage message = new MqttMessage(payload.getBytes());
message.setQos(qos);
client.publish(topic, message);
log.info("Published message to " + topic);
// Keep the process running to receive messages.
Thread.sleep(Long.MAX_VALUE);
}
}The rest of this guide breaks down each step in detail.
Prerequisites
Before you begin, make sure you have:
An ApsaraMQ for MQTT instance with a valid broker URL (for example,
tcp://<your-instance-id>.mqtt.aliyuncs.com:1883)Maven or Gradle configured for dependency management
Add the SDK dependency
Add the Eclipse Paho MQTT v3 client to your pom.xml. Use the latest version available.
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>Source repository: eclipse/paho.mqtt.java
Initialize the client
Create an MqttClient instance
Create an MqttClient with the broker URL, client ID, and a persistence strategy. MemoryPersistence stores in-flight messages in memory, which suits most use cases.
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
final String brokerUrl = "<your-broker-url>"; // Example: tcp://xxxxx.mqtt.aliyuncs.com:1883
final String clientId = "<your-client-id>";
final String topic = "<your-topic>";
final int qos = 1;
MemoryPersistence persistence = new MemoryPersistence();
MqttClient mqttClient = new MqttClient(brokerUrl, clientId, persistence);
mqttClient.setTimeToWait(3000L);Replace the placeholders with your instance details:
| Placeholder | Description | Example |
|---|---|---|
<your-broker-url> | Endpoint of your ApsaraMQ for MQTT instance | tcp://xxxxx.mqtt.aliyuncs.com:1883 |
<your-client-id> | Client ID registered in the console | GID_test@@@client1 |
<your-topic> | Topic for publishing or subscribing | test/topic/1 |
Register callbacks
Register an MqttCallbackExtended to handle connection events, incoming messages, and delivery confirmations.
mqttClient.setCallback(new MqttCallbackExtended() {
@Override
public void connectComplete(boolean reconnect, String serverURI) {
// Called on every successful connection or reconnection.
// Subscribe here so subscriptions are restored after reconnection.
log.info("Connected to {}, reconnect={}", serverURI, reconnect);
try {
mqttClient.subscribe(topic, qos);
} catch (MqttException e) {
log.error("Failed to subscribe after connect", e);
}
}
@Override
public void connectionLost(Throwable cause) {
// Called when the connection drops unexpectedly.
log.error("Connection lost", cause);
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
// Called when a message arrives on a subscribed topic.
// IMPORTANT: Do not block this method or call synchronous publish
// methods here. Doing so causes a deadlock that stops heartbeats
// and drops the connection.
log.info("Received message on {}: {}", topic, new String(message.getPayload()));
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
// Called when a published message reaches the broker.
}
});Never block the messageArrived callback or call synchronous publish methods inside it. This causes a deadlock that prevents heartbeat packets from being sent and results in a disconnection.
Configure connection options
Set up connection behavior with MqttConnectOptions.
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(true);
options.setKeepAliveInterval(60);
options.setAutomaticReconnect(true);
options.setMaxInflight(1000);| Parameter | Value | What it controls |
|---|---|---|
cleanSession | true or false | true: the broker discards previous session state on connect. false: the broker retains subscriptions and queued messages across disconnections. Set to false for durable subscriptions. |
keepAliveInterval | 60 (seconds) | Maximum idle time before the client sends a PINGREQ heartbeat. If the broker receives no packets within 1.5x this interval, it closes the connection. Lower values detect failures faster but increase network traffic. |
automaticReconnect | true | Enables the SDK's built-in reconnection logic with exponential backoff (1 s, 2 s, 4 s, ... up to 128 s). Takes effect only after a successful initial connection. |
maxInflight | 1000 | Maximum number of QoS 1 and QoS 2 messages in transit simultaneously. Increase this value for high-throughput workloads. |
Handle initial connection failures
The SDK's automatic reconnection activates only after the first successful connection. If the initial connection fails due to a network issue, the SDK does not retry -- the client stops silently.
This happens because the Paho SDK internally tracks whether the client has ever connected (wasConnected). When wasConnected is false, the connectionLost callback never fires, so the reconnection logic never triggers.
Wrap the first connection in a retry loop:
for (;;) {
try {
mqttClient.connect(options);
log.info("Initial connection successful");
break;
} catch (Throwable e) {
log.error("Initial connection failed, retrying in 5 seconds", e);
Thread.sleep(5000L);
}
}Without this retry loop, a network failure during the initial connection throws the following exception and the client stops:
Caused by: java.net.ConnectException: Network is unreachable (connect failed)
at java.net.PlainSocketImpl.socketConnect(Native Method)
at org.eclipse.paho.client.mqttv3.internal.TCPNetworkModule.start(TCPNetworkModule.java:74)Automatic reconnection after the initial connection
After the first successful connection, the SDK handles reconnection automatically when automaticReconnect is set to true. The sequence is:
The SDK detects a disconnection and fires
connectionLost.Reconnection attempts begin with exponential backoff: 1 s, 2 s, 4 s, 8 s, up to 128 seconds.
On success,
connectCompleteis called withreconnect=true.
Example log output for a disconnection and successful reconnection:
2023-06-20 17:10:26:972 connectionLost clientId=XXX
Disconnected (32109) - java.net.SocketException: Operation timed out (Read failed)
...
2023-06-20 17:12:36:764 connect success to: tcp://xxxxxx.mqtt.aliyuncs.com:1883,reconnect=trueSubscribe to your topics inside the connectComplete callback (as shown in the callback example above). Subscriptions are not automatically restored after reconnection when cleanSession is set to true.
What to do next
Send and subscribe to messages -- Learn how to publish and subscribe with QoS levels.
Use MQTT over TLS -- Secure the connection between your client and the broker.
Manage client permissions -- Configure access control for topics and client IDs.