springboot整合websorket推送消息实战
springboot整合websorket消息推送
由于http或https只支持客户端向服务端发送请求,不支持服务端主动推送消息到客户端,所以当客户端需要实时获取服务端的变化时,一般采用了客户端轮询的方式,这种方式不仅消耗性能,还容易导致宕机问题,这时候就需要websorket这种方式
原理
SpringBoot整合WebSocket推送消息的原理主要基于WebSocket协议的双向通信机制。以下是主要的步骤和过程:
建立连接: 当客户端启动应用程序并需要与服务器建立WebSocket连接时,它首先会在HTTP协议的基础上与服务器进行握手。这个过程中,客户端会向服务器传送WebSocket支持的版本号等信息,同时建立起TCP连接。
协议转换: 在这个阶段,协议从HTTP转换为WebSocket。在WebSocket协议中,服务器和客户端之间可以直接进行双向的数据传输,而不需要像HTTP协议那样,每次发送数据都要通过请求和响应的方式。
消息推送: 一旦WebSocket连接建立,服务器就可以随时向客户端发送消息。服务器将消息封装在WebSocket的帧(frame)中,然后通过已建立的TCP连接发送给客户端。客户端在接收到消息后,解析WebSocket帧,取出消息内容并进行相应的处理。
消息接收: 客户端也可以随时向服务器发送消息。客户端将消息封装在WebSocket帧中,然后通过TCP连接发送给服务器。服务器在接收到消息后,解析WebSocket帧,取出消息内容并进行相应的处理,然后通过WebSocket帧将处理结果返回给客户端。
总的来说,SpringBoot整合WebSocket推送消息的原理就是基于WebSocket协议的双向通信机制,通过建立TCP连接,实现服务器与客户端之间的全双工通信,从而使得数据的推送和接收更加高效和灵活。
实战
前端html代码
<!DOCTYPE HTML>
<html>
<head>
<title>My WebSocket</title>
</head>
<body>
Welcome<br/>
<input id="text" type="text" /><button onclick="send()">Send</button> <button onclick="closeWebSocket()">Close</button>
<div id="message">
</div>
</body>
<script type="text/javascript">
var websocket = null;
//判断当前浏览器是否支持WebSocket
if('WebSocket' in window){
websocket = new WebSocket("ws://127.0.0.1:8092/websocket/yuanrenjie");
}
else{
alert('Not support websocket')
}
//连接发生错误的回调方法
websocket.onerror = function(){
setMessageInnerHTML("error");
};
//连接成功建立的回调方法
websocket.onopen = function(event){
setMessageInnerHTML("open");
}
//接收到消息的回调方法
websocket.onmessage = function(event){
setMessageInnerHTML(event.data);
}
//连接关闭的回调方法
websocket.onclose = function(){
setMessageInnerHTML("close");
}
//监听窗口关闭事件,当窗口关闭时,主动去关闭websocket连接,防止连接还没断开就关闭窗口,server端会抛异常。
window.onbeforeunload = function(){
websocket.close();
}
//将消息显示在网页上
function setMessageInnerHTML(innerHTML){
document.getElementById('message').innerHTML += innerHTML + '<br/>';
}
//关闭连接
function closeWebSocket(){
websocket.close();
}
//发送消息
function send(){
var message = document.getElementById('text').value;
websocket.send(message);
}
</script>
</html>
方案
第一步引入pom
// 一般父级项目都是springboot,这里无需设置版本,如果实在需要可百度自行选取合适的版本
<!-- 引入websocket -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
第二步:websorket配置
单链接版
@Component
@Slf4j
@ServerEndpoint("/websocket/{userId}") // 接口路径 ws://localhost:8087/webSocket/userId;
public class WebSocketServer {
//与某个客户端的连接会话,需要通过它来给客户端发送数据
private Session session;
/**
* 用户ID
*/
private String userId;
//concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
//虽然@Component默认是单例模式的,但springboot还是会为每个websocket连接初始化一个bean,所以可以用一个静态set保存起来。
// 注:底下WebSocket是当前类名
private static CopyOnWriteArraySet<WebSocketServer> webSockets =new CopyOnWriteArraySet<>();
// 用来存在线连接用户信息
private static ConcurrentHashMap<String,Session> sessionPool = new ConcurrentHashMap<String,Session>();
/**
* 链接成功调用的方法
*/
@OnOpen
public void onOpen(Session session, @PathParam(value="userId")String userId) {
try {
this.session = session;
this.userId = userId;
webSockets.add(this);
sessionPool.put(userId, session);
log.info("【websocket消息】有新的连接,总数为:"+webSockets.size());
} catch (Exception e) {
}
}
/**
* 链接关闭调用的方法
*/
@OnClose
public void onClose() {
try {
webSockets.remove(this);
sessionPool.remove(this.userId);
log.info("【websocket消息】连接断开,总数为:"+webSockets.size());
} catch (Exception e) {
}
}
/**
* 收到客户端消息后调用的方法
*
* @param message
* @param
*/
@OnMessage
public void onMessage(String message) {
log.info("【websocket消息】收到客户端消息:"+message);
}
/** 发送错误时的处理
* @param session
* @param error
*/
@OnError
public void onError(Session session, Throwable error) {
log.error("用户错误,原因:"+error.getMessage());
error.printStackTrace();
}
// 此为广播消息
public void sendAllMessage(String message) {
log.info("【websocket消息】广播消息:"+message);
for(WebSocketServer webSocket : webSockets) {
try {
if(webSocket.session.isOpen()) {
webSocket.session.getAsyncRemote().sendText(message);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
// 此为单点消息
public void sendOneMessage(String userId, String message) {
Session session = sessionPool.get(userId);
if (session != null&&session.isOpen()) {
try {
log.info("【websocket消息】 单点消息:"+message);
session.getAsyncRemote().sendText(message);
} catch (Exception e) {
e.printStackTrace();
}
}
}
// 此为单点消息(多人)
public void sendMoreMessage(String[] userIds, String message) {
for(String userId:userIds) {
Session session = sessionPool.get(userId);
if (session != null&&session.isOpen()) {
try {
log.info("【websocket消息】 单点消息:"+message);
session.getAsyncRemote().sendText(message);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
多链接版
@Component
@Slf4j
@ServerEndpoint("/websocket/{userId}") // 接口路径 ws://localhost:8087/webSocket/userId;
public class WebSocketServer {
//与某个客户端的连接会话,需要通过它来给客户端发送数据
private Session session;
/**
* 用户ID
*/
private String userId;
// //concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
// //虽然@Component默认是单例模式的,但springboot还是会为每个websocket连接初始化一个bean,所以可以用一个静态set保存起来。
// // 注:底下WebSocket是当前类名
// private static CopyOnWriteArraySet<WebSocketServer> webSockets =new CopyOnWriteArraySet<>();
// // 用来存在线连接用户信息
// private static ConcurrentHashMap<String,Session> sessionPool = new ConcurrentHashMap<String,Session>();
private static ConcurrentHashMap<String, CopyOnWriteArraySet<WebSocketServer>> userwebSocketMap = new ConcurrentHashMap<String, CopyOnWriteArraySet<WebSocketServer>>();
private static ConcurrentHashMap<String, Integer> count = new ConcurrentHashMap<String, Integer>();
/**
* 链接成功调用的方法
*/
@OnOpen
public void onOpen(Session session, @PathParam(value="userId") final String userId) {
this.session = session;
this.userId = userId;
if (!exitUser(userId)) {
initUserInfo(userId);
} else {
CopyOnWriteArraySet<WebSocketServer> WebSocketServerSet = getUserSocketSet(userId);
WebSocketServerSet.add(this);
userCountIncrease(userId);
}
System.out.println("有" + userId + "新连接加入!当前在线人数为" + getCurrUserCount(userId));
}
/**
* 链接关闭调用的方法
*/
@OnClose
public void onClose() {
CopyOnWriteArraySet<WebSocketServer> WebSocketServerSet = userwebSocketMap.get(userId);
//从set中删除
WebSocketServerSet.remove(this);
//在线数减1
userCountDecrement(userId);
System.out.println("有一连接关闭!当前在线人数为" + getCurrUserCount(userId));
}
/**
* 收到客户端消息后调用的方法
*
* @param message
* @param
*/
@OnMessage
public void onMessage(String message) {
onMessageMethod(userId, message);
}
/** 发送错误时的处理
* @param session
* @param error
*/
@OnError
public void onError(Session session, Throwable error) {
log.error("用户错误,原因:"+error.getMessage());
error.printStackTrace();
}
// 此为广播消息
public void sendAllMessage(String message) {
log.info("【websocket消息】广播消息:"+message);
for(WebSocketServer webSocket : userwebSocketMap.get(userId)) {
try {
if(webSocket.session.isOpen()) {
webSocket.session.getAsyncRemote().sendText(message);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
// 此为单点消息
public void sendOneMessage(String userId, String message) {
onMessageMethod(userId, message);
}
private void onMessageMethod(String userId, String message) {
CopyOnWriteArraySet<WebSocketServer> webSocketSet = userwebSocketMap.get(userId);
if (webSocketSet!=null){
System.out.println("来自客户端" + userId + "的消息:" + message);
//群发消息
for (WebSocketServer item : webSocketSet) {
try {
item.sendMessage(message);
} catch (IOException e) {
e.printStackTrace();
continue;
}
}
}
else {
log.error("消息接收人为null"+message);
}
}
// 此为单点消息(多人)
public void sendMoreMessage(String[] userIds, String message) {
for(String userId:userIds) {
Session session = this.session;
if (session != null&&session.isOpen()) {
try {
log.info("【websocket消息】 单点消息:"+message);
session.getAsyncRemote().sendText(message);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
/**
* 这个方法与上面几个方法不一样。没有用注解,是根据自己需要添加的方法。
*
* @param message
* @throws IOException
*/
public void sendMessage(String message) throws IOException {
this.session.getBasicRemote().sendText(message);
//this.session.getAsyncRemote().sendText(message);
}
public boolean exitUser(String userId) {
return userwebSocketMap.containsKey(userId);
}
public CopyOnWriteArraySet<WebSocketServer> getUserSocketSet(String userId) {
return userwebSocketMap.get(userId);
}
public void userCountIncrease(String userId) {
if (count.containsKey(userId)) {
count.put(userId, count.get(userId) + 1);
}
}
public void userCountDecrement(String userId) {
if (count.containsKey(userId)) {
count.put(userId, count.get(userId) - 1);
}
}
public void removeUserConunt(String userId) {
count.remove(userId);
}
public Integer getCurrUserCount(String userId) {
return count.get(userId);
}
private void initUserInfo(String userId) {
CopyOnWriteArraySet<WebSocketServer> WebSocketServerSet = new CopyOnWriteArraySet<WebSocketServer>();
WebSocketServerSet.add(this);
userwebSocketMap.put(userId, WebSocketServerSet);
count.put(userId, 1);
}
}
测试类
@RestController
@RequestMapping("/test")
public class TestController {
@Resource
private WebSocketServer webSocketServer;
/**
* 查询消息list
* @param
* @return
*/
@GetMapping("getTest")
public void getMessageinfoList(){
// 推送websorket
//创建业务消息信息
JSONObject obj = new JSONObject();
obj.put("cmd", "topic");//业务类型
obj.put("msgId", "1");//消息id
obj.put("msgTxt", "1");//消息内容
//全体发送
webSocketServer.sendAllMessage(obj.toJSONString(0));
//单个用户发送 (userId为用户id)
webSocketServer.sendOneMessage("1", obj.toJSONString(0));
//多个用户发送 (userIds为多个用户id,逗号‘,’分隔)
String a[]={"1","2"};
webSocketServer.sendMoreMessage(a, obj.toJSONString(0));
}
}
调用测试类的接口,即可在network的ws栏的请求看到消息
配置wss
因为网站之前已经配置过Https了就不用配置证书了
修改nginx
server {
listen 443 ssl;
server_name localhost;
ssl_certificate /opt/ssl/test.com.pem;
ssl_certificate_key /opt/ssl/test.com.key;
ssl_session_cache shared:SSL:1m;
ssl_session_timeout 5m;
ssl_ciphers HIGH:!aNULL:!MD5;
ssl_prefer_server_ciphers on;
location / {
root html;
index index.html index.htm;
proxy_set_header Host $host;
proxy_set_header X-Real-Ip $remote_addr;
proxy_set_header X-Forwarded-For $remote_addr;
proxy_pass http://127.0.0.1:90;
}
location /websocket/ {
proxy_pass http://172.20.0.113:9528;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
#由于服务器端源码(建议大家做好大小写匹配)只匹配了"Upgrade"字符串,所以如果这里填"upgrade"服务器端会将这条http请求当成普通的请求,导致websocket握手失败
proxy_set_header Connection "Upgrade";
proxy_set_header Remote_addr $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_read_timeout 600s;
}
}
gateway无需修改
- id: message-sorket
uri: ws://192.168.0.33:9995
predicates:
- Path=/websocket/**
延伸
WebSocket是一种网络通信协议,提供了全双工的通信频道,允许服务器和客户端之间的实时双向通信。这在传统HTTP轮询技术上做出了显著的提升,允许服务器在数据更新时主动推送给客户端,减少了不必要的请求和响应,提高了数据传输的效率和实时性。
WebSocket在实现推送消息的过程中,有一套详细的工作机制。
连接建立: 客户端和服务器通过WebSocket协议进行握手,建立一次性的TCP连接。握手过程中,服务器和客户端可以协商一些参数,例如消息的最大长度、是否支持二进制消息等。一旦连接建立,服务器和客户端就可以通过这个连接进行数据的发送和接收。
消息发送: 在WebSocket中,服务器和客户端都可以随时发送消息。发送消息时,需要将消息内容封装在一个WebSocket帧中,然后通过网络连接发送给对方。对方接收到消息后,解析WebSocket帧,取出消息内容进行处理。
消息接收: 除了发送消息,WebSocket还提供了接收消息的功能。当服务器或客户端接收到一个WebSocket帧时,会解析出其中的消息内容,然后进行处理。
连接关闭: WebSocket连接可以随时关闭。关闭连接后,服务器和客户端就不再通过这个连接进行数据传输。
另外值得注意的是,在使用WebSocket的过程中,有时会遇到网络断开的情况。虽然服务器端仍然会向客户端发送数据,但是客户端无法接收到这些数据。为了解决这个问题,WebSocket提供了一种心跳机制。
WebSocket的心跳机制允许客户端和服务器在连接建立后,每隔一段时间向对方发送一个心跳消息,以检查连接是否仍然有效。如果长时间没有接收到对方的心跳消息,服务器或客户端可以认为连接已经断开,然后采取相应的措施,例如关闭连接、重新连接等。
总结起来,WebSocket协议的推送消息机制主要包括建立连接、发送和接收消息、关闭连接以及心跳机制等环节。这些环节协同工作,使得WebSocket成为实现实时、高效数据传输的重要工具。

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。
更多推荐
所有评论(0)