springboot实现webSocket服务端和客户端demo

秒速五厘米 2024-03-08 09:05 256阅读 0赞

1:pom导入依赖

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-websocket</artifactId>
  4. <version>2.2.7.RELEASE</version>
  5. </dependency>

2:myWebSocketClient自定义webSocket客户端

  1. package com.example.poi.utils;
  2. import org.springframework.stereotype.Component;
  3. import javax.websocket.*;
  4. import java.io.IOException;
  5. /**
  6. * @Author xu
  7. * @create 2023/9/11 18
  8. */
  9. @ClientEndpoint
  10. @Component
  11. public class MyWebSocketClient {
  12. public Session session;
  13. @OnOpen
  14. public void onOpen(Session session) {
  15. this.session = session;
  16. System.out.println("WebSocket2连接已打开");
  17. }
  18. @OnMessage
  19. public void onMessage(String message) {
  20. System.out.println("收到消息2:" + message);
  21. }
  22. @OnClose
  23. public void onClose() {
  24. System.out.println("客户端关闭2");
  25. }
  26. @OnError
  27. public void onError(Throwable throwable) {
  28. System.err.println("发生错误2:" + throwable.getMessage());
  29. }
  30. public void sendMessage(String message) throws IOException {
  31. session.getBasicRemote().sendText(message);
  32. }
  33. }

3:WebSocketServer自定义webSocket服务端

  1. package com.example.poi.utils;
  2. import cn.hutool.json.JSONObject;
  3. import cn.hutool.log.Log;
  4. import cn.hutool.log.LogFactory;
  5. import lombok.extern.slf4j.Slf4j;
  6. import org.apache.commons.lang.ObjectUtils;
  7. import org.springframework.context.annotation.Bean;
  8. import org.springframework.stereotype.Component;
  9. import org.springframework.web.socket.server.standard.ServerEndpointExporter;
  10. import javax.websocket.*;
  11. import javax.websocket.server.PathParam;
  12. import javax.websocket.server.ServerEndpoint;
  13. import java.io.IOException;
  14. import java.util.concurrent.CopyOnWriteArraySet;
  15. /**
  16. * @Author xu
  17. * @create 2023/7/21 19
  18. */
  19. @ServerEndpoint("/websocket/{sid}")
  20. @Component
  21. @Slf4j
  22. public class WebSocketServer {
  23. static Log log = LogFactory.get(WebSocketServer.class);
  24. //静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
  25. private static int onlineCount = 0;
  26. //concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
  27. private static CopyOnWriteArraySet<WebSocketServer> webSocketSet = new CopyOnWriteArraySet<WebSocketServer>();
  28. //与某个客户端的连接会话,需要通过它来给客户端发送数据
  29. public Session session;
  30. //接收sid
  31. private String sid = "";
  32. /**
  33. * 连接建立成功调用的方法
  34. *
  35. * @param session
  36. * @param sid
  37. */
  38. @OnOpen
  39. public void onOpen(Session session, @PathParam("sid") String sid) {
  40. this.session = session;
  41. webSocketSet.add(this); //加入set中
  42. addOnlineCount(); //在线数加1
  43. log.info("有新窗口开始监听:" + sid + ",当前在线人数为" + getOnlineCount());
  44. this.sid = sid;
  45. /*try {
  46. sendMessage("连接成功");
  47. } catch (IOException e) {
  48. log.error("websocket IO异常");
  49. }*/
  50. }
  51. /**
  52. * 连接关闭调用的方法
  53. */
  54. @OnClose
  55. public void onClose() {
  56. webSocketSet.remove(this); //从set中删除
  57. subOnlineCount(); //在线数减1
  58. log.info("有一连接关闭!当前在线人数为" + getOnlineCount());
  59. }
  60. /**
  61. * 收到客户端消息后调用的方法
  62. * 客户端发送过来的消息
  63. *
  64. * @param message
  65. * @param session
  66. */
  67. @OnMessage
  68. public void onMessage(String message, Session session) {
  69. log.info("收到来自窗口" + sid + "的信息:" + message);
  70. //群发消息
  71. for (WebSocketServer item : webSocketSet) {
  72. if (ObjectUtils.equals(item.sid, sid)) {
  73. try {
  74. JSONObject jsonObject = new JSONObject();
  75. jsonObject.put("name", sid);
  76. item.sendMessage(jsonObject.toString());
  77. } catch (IOException e) {
  78. e.printStackTrace();
  79. }
  80. }
  81. }
  82. }
  83. /**
  84. * @param session
  85. * @param error
  86. */
  87. @OnError
  88. public void onError(Session session, Throwable error) {
  89. log.error("发生错误");
  90. error.printStackTrace();
  91. }
  92. /**
  93. * 实现服务器主动推送
  94. *
  95. * @param message
  96. * @throws IOException
  97. */
  98. public void sendMessage(String message) throws IOException {
  99. this.session.getBasicRemote().sendText(message);
  100. }
  101. /**
  102. * 获取存在的webSocket
  103. */
  104. public CopyOnWriteArraySet<WebSocketServer> getWebSocketServer() {
  105. return webSocketSet;
  106. }
  107. public String getSid(){
  108. return sid;
  109. }
  110. public void close2(String ss){
  111. for (WebSocketServer webSocketServer : webSocketSet) {
  112. if (webSocketServer.sid.equals(ss)) {
  113. webSocketSet.remove(webSocketServer);
  114. log.info("删除了:"+ss);
  115. }
  116. }
  117. subOnlineCount(); //在线数减1
  118. log.info("有一连接关闭!当前在线人数为" + getOnlineCount());
  119. }
  120. /**
  121. * 发送消息
  122. *
  123. * @param message
  124. * @param sid
  125. * @throws IOException
  126. */
  127. public static void sendInfo(String message, @PathParam("sid") String sid) throws IOException {
  128. log.info("推送消息到窗口" + sid + ",推送内容:" + message);
  129. for (WebSocketServer item : webSocketSet) {
  130. try {
  131. //这里可以设定只推送给这个sid的,为null则全部推送
  132. if (sid == null) {
  133. item.sendMessage(message);
  134. } else if (item.sid.equals(sid)) {
  135. item.sendMessage(message);
  136. }
  137. } catch (IOException e) {
  138. continue;
  139. }
  140. }
  141. }
  142. public static synchronized int getOnlineCount() {
  143. return onlineCount;
  144. }
  145. public static synchronized void addOnlineCount() {
  146. WebSocketServer.onlineCount++;
  147. }
  148. public static synchronized void subOnlineCount() {
  149. WebSocketServer.onlineCount--;
  150. }
  151. /**
  152. * 必须要有这个bean才能生效使用webSocketServer
  153. */
  154. @Bean
  155. public ServerEndpointExporter serverEndpointExporter() {
  156. return new ServerEndpointExporter();
  157. }
  158. }

4:controller控制层

  1. @SneakyThrows
  2. @GetMapping("/testSql")
  3. public List<EntityDemo> testSql(String id) {
  4. /** WebSocket服务器的地址*/
  5. try {
  6. Random random = new Random();
  7. Integer i = random.nextInt(5) + 1;
  8. CopyOnWriteArraySet<WebSocketServer> webSocketServerSet = webSocketServer.getWebSocketServer();
  9. for (WebSocketServer socketServer : webSocketServerSet) {
  10. if (socketServer.getSid().equals(i.toString())) {
  11. webSocketServer.close2(i.toString());
  12. return null;
  13. }
  14. }
  15. URI uri = new URI("ws://127.0.0.1:9088/test/websocket/" + i);
  16. WebSocketContainer container = ContainerProvider.getWebSocketContainer();
  17. container.connectToServer(myWebSocketClient, uri);
  18. myWebSocketClient.sendMessage("你好" + i);
  19. } catch (Exception e) {
  20. throw new RuntimeException(e);
  21. }
  22. log.info("++++++");
  23. return null;
  24. }

5:注意事项:连接自动断开

  • webSocket连接之后,发现一个问题:就是每隔一段时间如果不传送数据的话,与前端的连接就会自动断开。采用心跳消息的方式,就可以解决这个问题。比如客服端每隔30秒自动发送ping消息给服务端,服务端返回pong

5-1:注意事项:对象无法自动注入

  • 使用了@ServerEndpoint注解的类中使用@Resource或@Autowired注入对象都会失败,并且报空指针异常,解决方法:如下
  • @ServerEndpoint注解的类,使用静态对象,并且对外暴露set方法,这样在对象初始化的时候,将其注入到WebSocketServer

    @ServerEndpoint(“/websocket/{sid}”)
    @Component
    @Slf4j
    public class WebSocketServer {

  1. private static EntityDemoServiceImpl entityDemoService;
  2. public static void setEntityDemoService(EntityDemoServiceImpl entityDemoService) {
  3. WebSocketServer.entityDemoService = entityDemoService;
  4. }
  • 在要注入到@ServerEndpoint注解的类中,使用@PostConstruct后置注入

    @Service
    public class EntityDemoServiceImpl extends ServiceImpl implements IEntityDemoService {

    1. @PostConstruct
    2. public void init() {
    3. WebSocketServer.setEntityDemoService(this);
    4. }

    }

5-2:注意事项:Session无法被序列化

  • 分布式场景会存在这样一个问题,当一次请求负载到第一台服务器时,session在第一台服务器线程上,第二次请求,负载到第二台服务器上,此时通过userId查找当前用户的session时,是查找不到的。
    本来想着把session存入到redis中,就可以从redis获取用户的session,希望用这种方式来解决分布式场景下消息发送的问题。这种场景可以通过发送消息中间件的方式解决。具体这样解决:每次连接时,都将userId和对应的session存入到本机,发送消息时,直接发送给MQ-Broker,然后每台应用负载都去消费这个消息,拿到消息之后,判断在本机能根据userId是否能找到session,找到则推送到客户端

发表评论

表情:
评论列表 (有 0 条评论,256人围观)

还没有评论,来说两句吧...

相关阅读