paho mqtt不定时抛出Connection reset异常导致客户端掉线
paho mqtt不定时抛出Connection reset异常导致客户端掉线现象分析优化方案现象我们的项目采用paho的mqttv3库作客户端,起初运行状况良好,随着终端设备和用户量的增加,发现服务端经常自动抛出异常导致mqtt client掉线且无法reconnect。异常信息如下:org.eclipse.paho.client.mqttv3.MqttException: 已断开连接at or
现象
我们的项目采用paho的mqttv3库作客户端,起初运行状况良好,随着终端设备和用户量的增加,发现服务端经常自动抛出异常导致mqtt client掉线且无法reconnect。异常信息如下:
org.eclipse.paho.client.mqttv3.MqttException: 已断开连接
at org.eclipse.paho.client.mqttv3.internal.CommsSender.handleRunException(CommsSender.java:194)
at org.eclipse.paho.client.mqttv3.internal.CommsSender.run(CommsSender.java:171)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.SocketException: Connection reset
at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:115)
at java.net.SocketOutputStream.write(SocketOutputStream.java:155)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
at org.eclipse.paho.client.mqttv3.internal.wire.MqttOutputStream.flush(MqttOutputStream.java:49)
at org.eclipse.paho.client.mqttv3.internal.CommsSender.run(CommsSender.java:149)
... 1 common frames omitted
分析
我们知道,Connection reset的原因是服务端关闭了connection,而客户端依然在读写数据,此时服务器返回复位标识RST,此时客户端就会提示java.net.SocketException: Connection reset。
我们在排除了服务器、网络、EMQX的嫌疑后重新分析,发现问题在随着设备和用量的增加而出现得愈加频繁,怀疑是mqtt消息量太大造成了积压,于是进行了模拟测试——我们频繁发送mqtt数据到服务,而每次接收到mqtt消息后让线程sleep 30秒,人为造成消息的积压。果然,几分钟后又抛出了java.net.SocketException异常。
可见,该异常的产生是由于我们处理mqtt消息速度较慢,导致了后面消息的积压。在处理最后这条消息的时候,由于发送消息的connection已经关闭,读取该消息就会出错。
优化方案
知道了问题产生的原因,自然就可以进行修改优化了。我们提出了以下几个方案,小伙伴们可以根据自己的实际情况考虑:
- 采用缓存技术处理数据,减少消费消息时的耗时;
- 如果mqtt消息是设备端定时发送的数据,考虑抛弃掉一些不关键的消息;
- 采用多线程技术异步处理消息,降低消息处理的延迟;
- 启用多个客户端处理来自不同设备的消息;
- 考虑引入流处理计算引擎等。
魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。
更多推荐


所有评论(0)