All Products
Search
Document Center

ApsaraMQ for MQTT:Connect an Eclipse Paho Java client to ApsaraMQ for MQTT

Last Updated:Mar 11, 2026

When connecting an open-source MQTT client to ApsaraMQ for MQTT for the first time, the default SDK behavior does not retry a failed initial connection -- leaving your client silently offline. This guide walks through client initialization, connection configuration, and a retry pattern that handles first-connection failures with the Eclipse Paho Java SDK.

Quick start

Copy this self-contained program to connect, subscribe, and publish a test message. Replace the placeholder values with your own instance details.

import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.util.logging.Logger;

public class MqttQuickStart {
    private static final Logger log = Logger.getLogger(MqttQuickStart.class.getName());

    public static void main(String[] args) throws Exception {
        // Connection parameters
        String brokerUrl = "<your-broker-url>";   // Example: tcp://xxxxx.mqtt.aliyuncs.com:1883
        String clientId  = "<your-client-id>";    // Example: GID_test@@@client1
        String topic     = "<your-topic>";        // Example: test/topic/1
        int qos          = 1;

        // Create the client
        MemoryPersistence persistence = new MemoryPersistence();
        MqttClient client = new MqttClient(brokerUrl, clientId, persistence);
        client.setTimeToWait(3000L);

        // Register callbacks
        client.setCallback(new MqttCallbackExtended() {
            @Override
            public void connectComplete(boolean reconnect, String serverURI) {
                log.info("Connected to " + serverURI + ", reconnect=" + reconnect);
                try {
                    // Re-subscribe after every (re)connection.
                    // Subscriptions are lost when cleanSession is true.
                    client.subscribe(topic, qos);
                    log.info("Subscribed to " + topic);
                } catch (MqttException e) {
                    log.severe("Subscribe failed: " + e.getMessage());
                }
            }

            @Override
            public void connectionLost(Throwable cause) {
                log.warning("Connection lost: " + cause.getMessage());
            }

            @Override
            public void messageArrived(String topic, MqttMessage message) {
                // Do not block or call synchronous publish methods here.
                // Doing so causes a deadlock that stops heartbeats
                // and drops the connection.
                log.info("Message on " + topic + ": " + new String(message.getPayload()));
            }

            @Override
            public void deliveryComplete(IMqttDeliveryToken token) {
                // No action needed for this example.
            }
        });

        // Configure connection options
        MqttConnectOptions options = new MqttConnectOptions();
        options.setCleanSession(true);
        options.setKeepAliveInterval(60);
        options.setAutomaticReconnect(true);
        options.setMaxInflight(1000);

        // Connect with retry for initial connection
        for (;;) {
            try {
                client.connect(options);
                log.info("Initial connection successful");
                break;
            } catch (Throwable e) {
                log.severe("Connection failed: " + e.getMessage());
                Thread.sleep(5000L);
            }
        }

        // Publish a test message
        String payload = "Hello from ApsaraMQ for MQTT";
        MqttMessage message = new MqttMessage(payload.getBytes());
        message.setQos(qos);
        client.publish(topic, message);
        log.info("Published message to " + topic);

        // Keep the process running to receive messages.
        Thread.sleep(Long.MAX_VALUE);
    }
}

The rest of this guide breaks down each step in detail.

Prerequisites

Before you begin, make sure you have:

  • An ApsaraMQ for MQTT instance with a valid broker URL (for example, tcp://<your-instance-id>.mqtt.aliyuncs.com:1883)

  • Maven or Gradle configured for dependency management

Add the SDK dependency

Add the Eclipse Paho MQTT v3 client to your pom.xml. Use the latest version available.

<dependency>
    <groupId>org.eclipse.paho</groupId>
    <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
    <version>1.2.5</version>
</dependency>

Source repository: eclipse/paho.mqtt.java

Initialize the client

Create an MqttClient instance

Create an MqttClient with the broker URL, client ID, and a persistence strategy. MemoryPersistence stores in-flight messages in memory, which suits most use cases.

import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

final String brokerUrl = "<your-broker-url>";       // Example: tcp://xxxxx.mqtt.aliyuncs.com:1883
final String clientId = "<your-client-id>";
final String topic = "<your-topic>";
final int qos = 1;

MemoryPersistence persistence = new MemoryPersistence();
MqttClient mqttClient = new MqttClient(brokerUrl, clientId, persistence);
mqttClient.setTimeToWait(3000L);

Replace the placeholders with your instance details:

PlaceholderDescriptionExample
<your-broker-url>Endpoint of your ApsaraMQ for MQTT instancetcp://xxxxx.mqtt.aliyuncs.com:1883
<your-client-id>Client ID registered in the consoleGID_test@@@client1
<your-topic>Topic for publishing or subscribingtest/topic/1

Register callbacks

Register an MqttCallbackExtended to handle connection events, incoming messages, and delivery confirmations.

mqttClient.setCallback(new MqttCallbackExtended() {
    @Override
    public void connectComplete(boolean reconnect, String serverURI) {
        // Called on every successful connection or reconnection.
        // Subscribe here so subscriptions are restored after reconnection.
        log.info("Connected to {}, reconnect={}", serverURI, reconnect);
        try {
            mqttClient.subscribe(topic, qos);
        } catch (MqttException e) {
            log.error("Failed to subscribe after connect", e);
        }
    }

    @Override
    public void connectionLost(Throwable cause) {
        // Called when the connection drops unexpectedly.
        log.error("Connection lost", cause);
    }

    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        // Called when a message arrives on a subscribed topic.
        // IMPORTANT: Do not block this method or call synchronous publish
        // methods here. Doing so causes a deadlock that stops heartbeats
        // and drops the connection.
        log.info("Received message on {}: {}", topic, new String(message.getPayload()));
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        // Called when a published message reaches the broker.
    }
});
Important

Never block the messageArrived callback or call synchronous publish methods inside it. This causes a deadlock that prevents heartbeat packets from being sent and results in a disconnection.

Configure connection options

Set up connection behavior with MqttConnectOptions.

MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(true);
options.setKeepAliveInterval(60);
options.setAutomaticReconnect(true);
options.setMaxInflight(1000);
ParameterValueWhat it controls
cleanSessiontrue or falsetrue: the broker discards previous session state on connect. false: the broker retains subscriptions and queued messages across disconnections. Set to false for durable subscriptions.
keepAliveInterval60 (seconds)Maximum idle time before the client sends a PINGREQ heartbeat. If the broker receives no packets within 1.5x this interval, it closes the connection. Lower values detect failures faster but increase network traffic.
automaticReconnecttrueEnables the SDK's built-in reconnection logic with exponential backoff (1 s, 2 s, 4 s, ... up to 128 s). Takes effect only after a successful initial connection.
maxInflight1000Maximum number of QoS 1 and QoS 2 messages in transit simultaneously. Increase this value for high-throughput workloads.

Handle initial connection failures

The SDK's automatic reconnection activates only after the first successful connection. If the initial connection fails due to a network issue, the SDK does not retry -- the client stops silently.

This happens because the Paho SDK internally tracks whether the client has ever connected (wasConnected). When wasConnected is false, the connectionLost callback never fires, so the reconnection logic never triggers.

Wrap the first connection in a retry loop:

for (;;) {
    try {
        mqttClient.connect(options);
        log.info("Initial connection successful");
        break;
    } catch (Throwable e) {
        log.error("Initial connection failed, retrying in 5 seconds", e);
        Thread.sleep(5000L);
    }
}

Without this retry loop, a network failure during the initial connection throws the following exception and the client stops:

Caused by: java.net.ConnectException: Network is unreachable (connect failed)
    at java.net.PlainSocketImpl.socketConnect(Native Method)
    at org.eclipse.paho.client.mqttv3.internal.TCPNetworkModule.start(TCPNetworkModule.java:74)

Automatic reconnection after the initial connection

After the first successful connection, the SDK handles reconnection automatically when automaticReconnect is set to true. The sequence is:

  1. The SDK detects a disconnection and fires connectionLost.

  2. Reconnection attempts begin with exponential backoff: 1 s, 2 s, 4 s, 8 s, up to 128 seconds.

  3. On success, connectComplete is called with reconnect=true.

Example log output for a disconnection and successful reconnection:

2023-06-20 17:10:26:972  connectionLost clientId=XXX
Disconnected (32109) - java.net.SocketException: Operation timed out (Read failed)
...
2023-06-20 17:12:36:764  connect success to: tcp://xxxxxx.mqtt.aliyuncs.com:1883,reconnect=true
Note

Subscribe to your topics inside the connectComplete callback (as shown in the callback example above). Subscriptions are not automatically restored after reconnection when cleanSession is set to true.

What to do next