本文以Java SDK為例介紹開源MQTT用戶端首次串連服務端時如何初始化用戶端和配置自動重連功能。
開源用戶端Java SDK下載地址
SDK版本
SDK依賴如下,建議使用最新版本。
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>初始化用戶端
建立MqttClient
final String brokerUrl = properties.getProperty("brokerUrl");
final MemoryPersistence memoryPersistence = new MemoryPersistence();
final String topic = properties.getProperty("topic");
final int qosLevel = Integer.parseInt(properties.getProperty("qos"));
final MqttClient mqttClient = new MqttClient(brokerUrl, recvClientId, memoryPersistence);
mqttClient.setTimeToWait(3000L);
mqttClient.setCallback(new MqttCallbackExtended() {
@Override
public void connectComplete(boolean reconnect, String serverURI) {
//串連成功回調,可以開始訂閱Topic。
}
@Override
public void connectionLost(Throwable throwable) {
// 串連斷開回調,建議列印日誌方便定位問題。
}
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
// 收到訊息的回調,此次不要阻塞,且不能在這裡同步發送訊息,否則可能會導致死結卡住,以及心跳無法發送導致斷鏈。
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
// 成功發送訊息到服務端。
}
});初始化串連項
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
.....
connOpts.setCleanSession(cleanSession);
connOpts.setKeepAliveInterval(60); // 心跳間隔時間,單位:秒。
connOpts.setAutomaticReconnect(true); // 務必開啟自動重連。
connOpts.setMaxInflight(1000);
.....首次串連
初始化mqttClient,開始串連服務端。
mqttClient.connect(mqttConnectOptions);首次串連不重連問題
問題現象
如果由於網路抖動或延時等其他原因導致用戶端串連服務端失敗,則首次串連不能自動重連。具體報錯如下:
Caused by: java.net.ConnectException: Network is unreachable (connect failed)
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:607)
at org.eclipse.paho.client.mqttv3.internal.TCPNetworkModule.start(TCPNetworkModule.java:74)可能原因
用戶端首次串連,wasConnected=false,所以進不了connectionLost的重連處理。
說明
以下代碼為SDK源碼,您無需在代碼中添加該內容。
if (wasConnected && callback != null) {
// Let the user know client has disconnected either normally or abnormally.
callback.connectionLost(reason);
}解決方案
首次串連建議儘可能進行嘗試,直至串連成功。
for(;;) {
try {
mqttClient.connect(mqttConnectOptions);
break;
} catch (Throwable e) {
log.error("",e); // 建議列印日誌,方便定位問題。
Thread.sleep(5000L);
}
}用戶端自動重連
異常樣本
用戶端串連服務端後,如果後續由於其他原因導致串連斷開,會觸發connectionLost。
2023-06-20 17:10:26:972 connectionLost clientId=XXX
已中斷連線 (32109) - java.net.SocketException: Operation timed out (Read failed)
at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:197)異常原因源碼分析
說明
以下代碼為SDK源碼,您無需在代碼中添加該內容。
public void connectionLost(MqttException cause) {
final String methodName = "connectionLost";
// If there was a problem and a client callback has been set inform
// the connection lost listener of the problem.
try {
if (mqttCallback != null && cause != null) {
// @TRACE 708=call connectionLost
log.fine(CLASS_NAME, methodName, "708", new Object[] { cause });
mqttCallback.connectionLost(cause);
}
if(reconnectInternalCallback != null && cause != null){
reconnectInternalCallback.connectionLost(cause);
}
} catch (java.lang.Throwable t) {
// Just log the fact that a throwable has caught connection lost
// is called during shutdown processing so no need to do anything else
// @TRACE 720=exception from connectionLost {0}
log.fine(CLASS_NAME, methodName, "720", new Object[] { t });
}
}觸發重連
說明
以下代碼為SDK源碼,您無需在代碼中添加該內容。
MqttReconnectCallback
public void connectionLost(Throwable cause) {
if (automaticReconnect) {
// Automatic reconnect is set so make sure comms is in resting
// state
comms.setRestingState(true);
reconnecting = true;
startReconnectCycle();
}
}MqttReconnectActionListener
稍候再試為1 s、2 s、4 s、8 s......128 s,預設最大間隔時間為128 s。
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
// @Trace 502=Automatic Reconnect failed, rescheduling: {0}
log.fine(CLASS_NAME, methodName, "502", new Object[] { asyncActionToken.getClient().getClientId() });
if (reconnectDelay < connOpts.getMaxReconnectDelay()) {
reconnectDelay = reconnectDelay * 2;
}
rescheduleReconnectCycle(reconnectDelay);
}重連成功
connectComplete回調
2023-06-20 17:12:36:764 connect success to: tcp://xxxxxx.mqtt.aliyuncs.com:1883,reconnect=true