springboot webscoket示例:增加定时心跳逻辑
【代码】springboot webscoket示例:增加定时心跳逻辑。
·
websocket服务端增加定时发送心跳机制
@ServerEndpoint(value = "/websocket/{uuid}")
@Component
public class DevMessageHandleController {
private static final Logger logger = LoggerFactory.getLogger(DevMessageHandleController.class);
//concurrent包的线程安全Set,用来存放每个客户端对应的WebSocketController对象。
public static CopyOnWriteArraySet<DevMessageHandleController> webSocketSet = new CopyOnWriteArraySet<>();
// 使用ConcurrentHashMap来存储用户ID和WebSocket会话对象的映射。
private static ConcurrentHashMap<String, DevMessageHandleController> webSocketMap = new ConcurrentHashMap<>();
//与某个客户端的连接会话,需要通过它来给客户端发送数据
private Session session;
private String uuid;
// 心跳尝试次数
private AtomicInteger heartbeatAttempts;
@OnOpen
public void onOpen(@PathParam("uuid") String uuid, Session session) {
logger.info("uuid: {}, sessionId: {}", uuid, session.getId());
// 将新建立的会话添加到webSocketMap中
try {
if (webSocketMap.containsKey(uuid)) {
//如果有旧连接就先断开
webSocketMap.get(uuid).session.close();
webSocketSet.remove(webSocketMap.get(uuid));
}
this.session = session;
this.uuid = uuid;
heartbeatAttempts = new AtomicInteger(0);
webSocketSet.add(this); //加入set中
webSocketMap.put(uuid, this); //加入map中
// 其他代码...
} catch (Exception e) {
logger.error("onOpen error:" + e.getMessage());
}
}
@OnClose
public void onClose(@PathParam("uuid") String uuid, Session session) {
logger.info("会话关闭");
// 当会话关闭时,从webSocketSet中移除该会话
webSocketSet.remove(this);
// 当会话关闭时,从webSocketMap中移除该会话
webSocketMap.remove(uuid);
}
// 收到客户端消息后调用的方法
@OnMessage
public void onMessage(String message, Session session) {
logger.info("Message from client: " + message);
if ("pong".equals(message)) {
// 重置为0
this.heartbeatAttempts.set(0);
System.out.println("Received pong from: " + session.getId());
}
}
// 发生错误时调用
@OnError
public void onError(Session session, Throwable error) {
logger.error("发生错误 session:" + session.getId() + ",error:" + error);
try {
session.close();
// 当会话关闭时,从webSocketSet中移除该会话
webSocketSet.remove(this);
// 当会话关闭时,从webSocketMap中移除该会话
webSocketMap.remove(this.uuid);
} catch (IOException e) {
logger.error("onError error:" + e.getMessage());
}
}
public void sendMessage(Session session, String msg) {
logger.info("发送消息");
try {
//判断当前人员是否连接websocket
if (session.isOpen()) {
session.getAsyncRemote().sendText(msg);
} else {
session.close();
webSocketSet.remove(this); //从set中删除
webSocketMap.remove(this.uuid); //从map中删除
}
} catch (IOException e) {
e.printStackTrace();
}
}
public static CopyOnWriteArraySet<DevMessageHandleController> getWebSocketSet() {
return webSocketSet;
}
public static void setWebSocketSet(CopyOnWriteArraySet<DevMessageHandleController> webSocketSet) {
DevMessageHandleController.webSocketSet = webSocketSet;
}
public static ConcurrentHashMap<String, DevMessageHandleController> getWebSocketMap() {
return webSocketMap;
}
public static void setWebSocketMap(ConcurrentHashMap<String, DevMessageHandleController> webSocketMap) {
DevMessageHandleController.webSocketMap = webSocketMap;
}
public Session getSession() {
return session;
}
public void setSession(Session session) {
this.session = session;
}
public String getUuid() {
return uuid;
}
public void setUuid(String uuid) {
this.uuid = uuid;
}
public AtomicInteger getHeartbeatAttempts() {
return heartbeatAttempts;
}
public void setHeartbeatAttempts(AtomicInteger heartbeatAttempts) {
this.heartbeatAttempts = heartbeatAttempts;
}
}
每间隔10s向客户端发送一次心跳
private static final int MAX_HEARTBEAT_ATTEMPTS = 3;
@Scheduled(fixedDelay = 10000)
public void sendHeartBeat() {
CopyOnWriteArraySet<DevMessageHandleController> webSocketSet;
try {
webSocketSet = DevMessageHandleController.getWebSocketSet();
logger.info("连接数量:" + webSocketSet.size());
if(webSocketSet.size() == 0){
return;
}
logger.info("定时发送心跳");
// 遍历所有连接,发送心跳
webSocketSet.forEach(obj -> {
Session session = obj.getSession();
logger.info("sessionId:" + session.getId() +" 心跳ping发送次数:" + obj.getHeartbeatAttempts().get());
if(obj.getHeartbeatAttempts().get() >= MAX_HEARTBEAT_ATTEMPTS) {
try {
session.close();
} catch (IOException e) {
e.printStackTrace();
logger.error("session close error:" + e.getMessage());
}
} else {
obj.getHeartbeatAttempts().incrementAndGet();
if (session.isOpen()) {
session.getAsyncRemote().sendText("ping");
}
}
});
}catch (Exception e){
logger.error("发送心跳 error:" + e.getMessage());
}
}

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。
更多推荐
所有评论(0)