This topic describes how to initialize an open source MQTT client when the client is used to connect to an ApsaraMQ for MQTT broker for the first time. This topic also provides a solution to the connection failure issue that occurs during connection for the first time. In this topic, the open source client SDK for Java is used as an example.
Download URL of the open source client SDK for Java
SDK version
The following code snippet shows the dependency of the SDK. We recommend that you use the latest version.
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>
Initialize the client
Create MqttClient
final String brokerUrl = properties.getProperty("brokerUrl");
final MemoryPersistence memoryPersistence = new MemoryPersistence();
final String topic = properties.getProperty("topic");
final int qosLevel = Integer.parseInt(properties.getProperty("qos"));
final MqttClient mqttClient = new MqttClient(brokerUrl, recvClientId, memoryPersistence);
mqttClient.setTimeToWait(3000L);
mqttClient.setCallback(new MqttCallbackExtended() {
@Override
public void connectComplete(boolean reconnect, String serverURI) {
// The callback that is fired when the connection is successful. You can subscribe to the topic.
}
@Override
public void connectionLost(Throwable throwable) {
// The callback that is fired when the connection fails. We recommend that you print logs to facilitate troubleshooting.
}
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
// The callback that is fired when the message is received. Do not block this callback. You cannot send the message synchronously here. Otherwise, a deadlock and a heartbeat sending failure may occur, which results in link disruption.
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
// Send the message to the broker.
}
});
Initialize configuration items for connection
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
.....
connOpts.setCleanSession(cleanSession);
connOpts.setKeepAliveInterval(60); // The heartbeat interval. Unit: seconds.
connOpts.setAutomaticReconnect(true); // Make sure that you enable automatic reconnection.
connOpts.setMaxInflight(1000);
.....
Connect for the first time
Initialize mqttClient
and connect to the broker.
mqttClient.connect(mqttConnectOptions);
Troubleshoot reconnection failure
Problem description
The client does not automatically try to reconnect to the broker after a connection failure caused by network jitters or latency occurs during connection for the first time. The following exception is thrown:
Caused by: java.net.ConnectException: Network is unreachable (connect failed)
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:607)
at org.eclipse.paho.client.mqttv3.internal.TCPNetworkModule.start(TCPNetworkModule.java:74)
Possible cause
The wasConnected=false
configuration is used, which causes the client to fail to reconnect upon a connection failure.
The following code snippet is included in the source code of the SDK. You do not need to add the snippet to the SDK code.
if (wasConnected && callback != null) {
// Let the user know client has disconnected either normally or abnormally.
callback.connectionLost(reason);
}
Solution
We recommend that you try to reconnect until the connection is successful.
for(;;) {
try {
mqttClient.connect(mqttConnectOptions);
break;
} catch (Throwable e) {
log.error("",e); // We recommend that you print logs to facilitate troubleshooting.
Thread.sleep(5000L);
}
}
Automatic client reconnection
Sample exception
If the client is disconnected from the broker after a successful connection, the connectionLost
error is reported.
2023-06-20 17:10:26:972 connectionLost clientId=XXX
Disconnected (32109) - java.net.SocketException: Operation timed out (Read failed)
at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:197)
Source code analysis
The following code snippet is included in the source code of the SDK. You do not need to add the snippet to the SDK code.
public void connectionLost(MqttException cause) {
final String methodName = "connectionLost";
// If there was a problem and a client callback has been set inform
// the connection lost listener of the problem.
try {
if (mqttCallback != null && cause != null) {
// @TRACE 708=call connectionLost
log.fine(CLASS_NAME, methodName, "708", new Object[] { cause });
mqttCallback.connectionLost(cause);
}
if(reconnectInternalCallback != null && cause != null){
reconnectInternalCallback.connectionLost(cause);
}
} catch (java.lang.Throwable t) {
// Just log the fact that a throwable has caught connection lost
// is called during shutdown processing so no need to do anything else
// @TRACE 720=exception from connectionLost {0}
log.fine(CLASS_NAME, methodName, "720", new Object[] { t });
}
}
Trigger a reconnection
The following code snippet is included in the source code of the SDK. You do not need to add the snippet to the SDK code.
MqttReconnectCallback
public void connectionLost(Throwable cause) {
if (automaticReconnect) {
// Automatic reconnect is set so make sure comms is in resting
// state
comms.setRestingState(true);
reconnecting = true;
startReconnectCycle();
}
}
MqttReconnectActionListener
The retry interval increases along the number of retry attempts in the following pattern: 1s, 2s, 4s, 8s...128s. The default maximum interval is 128s.
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
// @Trace 502=Automatic Reconnect failed, rescheduling: {0}
log.fine(CLASS_NAME, methodName, "502", new Object[] { asyncActionToken.getClient().getClientId() });
if (reconnectDelay < connOpts.getMaxReconnectDelay()) {
reconnectDelay = reconnectDelay * 2;
}
rescheduleReconnectCycle(reconnectDelay);
}
Successful reconnection
connectComplete
callback:
2023-06-20 17:12:36:764 connect success to: tcp://xxxxxx.mqtt.aliyuncs.com:1883,reconnect=true