Springboot集成SSE实现消息推送之单工通信

向右看齐 2024-04-01 18:55 191阅读 0赞

前言

通常在一些web项目中,会涉及到想客户端推送消息,常见的有Ajax轮询、webSocket,本篇文章主要使用Springboot集成SSE实现向客户端持续推送信息。

SSE简介

服务发送事件SSE(Sever-Sent Event),就是基于 HTTP 的技术,浏览器向服务器发送一个保持长连接HTTP请求,服务器单向地向客户端以流形式持续传输数据 。这样可以节约网络资源,不需要建立新连接。

优点

服务端不需要其他的类库,开发难度较低。
不用每次建立新连接,延迟较低。
数据通过简单且广泛使用的HTTP协议而不是专有协议进行同步。

对重新建立连接和事件ID功能的内置支持。

对于利用单向通信的应用程序和服务非常有用。

缺点

客户端越多连接越多,会占用服务器大量内存和连接数。

SSE只支持UTF-8编码,不支持二进制数据。

对最大打开连接数的严格限制可能使事情变得困难,每个浏览器都设置了限制。

SSE是单向的。

Springboot集成SSE简约版

客户端发送请求到服务端,服务端以流的形式不断向客户端推送数据示例,增加帅气值。

xml依赖:

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-web</artifactId>
  4. </dependency>

html代码:

  1. <!DOCTYPE html>
  2. <html>
  3. <head>
  4. <meta charset="UTF-8">
  5. <title>Springboot集成SSE简约版</title>
  6. <script type="text/javascript">
  7. let source = new EventSource('/get');
  8. source.onmessage = function (event) {
  9. console.info(event.data);
  10. document.getElementById('text').innerText = event.data
  11. };
  12. </script>
  13. </head>
  14. <body>
  15. <div id="text"></div>
  16. </body>
  17. </html>

后端代码:

  1. @RequestMapping(value = "/get", produces = "text/event-stream;charset=UTF-8")
  2. public void push(HttpServletResponse response) {
  3. response.setContentType("text/event-stream");
  4. response.setCharacterEncoding("utf-8");
  5. int i = 0;
  6. while (true) {
  7. try {
  8. Thread.sleep(1000);
  9. PrintWriter pw = response.getWriter();
  10. //注意返回数据必须以data:开头,"\n\n"结尾
  11. pw.write("data:xdm帅气值加" + i + "\n\n");
  12. pw.flush();
  13. //检测异常时断开连接
  14. if (pw.checkError()) {
  15. log.error("客户端断开连接");
  16. return;
  17. }
  18. } catch (Exception e) {
  19. e.printStackTrace();
  20. }
  21. i++;
  22. }
  23. }

效果:
在这里插入图片描述

Springboot集成SSE升级版

演示SSE的连接建立、接收数据和异常情况监听处理。

注:若浏览器不兼容在页面引入evensource.js。

  1. <script src=/eventsource-polyfill.js></script>

客户端代码:

  1. <!DOCTYPE html>
  2. <html lang="en">
  3. <head>
  4. <meta charset="UTF-8">
  5. <title> Springboot集成SSE升级版</title>
  6. </head>
  7. <script>
  8. let source = null;
  9. const clientId = new Date().getTime();
  10. if (!!window.EventSource) {
  11. source = new EventSource('/sse/create?clientId=' + clientId);
  12. //建立连接
  13. source.onopen = function (event) {
  14. setMessageInnerHTML("建立连接" + event);
  15. }
  16. //接收数据
  17. source.onmessage = function (event) {
  18. setMessageInnerHTML(event.data);
  19. }
  20. //错误监听
  21. source.onerror = function (event) {
  22. if (event.readyState === EventSource.CLOSED) {
  23. setMessageInnerHTML("连接关闭");
  24. } else {
  25. console.log(event);
  26. }
  27. }
  28. } else {
  29. setMessageInnerHTML("浏览器不支持SSE");
  30. }
  31. window.onbeforeunload = function () {
  32. close();
  33. };
  34. // 关闭
  35. function close() {
  36. source.close();
  37. const httpRequest = new XMLHttpRequest();
  38. httpRequest.open('GET', '/sse/close/?clientId=' + clientId, true);
  39. httpRequest.send();
  40. console.log("close");
  41. }
  42. // 显示消息
  43. function setMessageInnerHTML(innerHTML) {
  44. document.getElementById('text').innerHTML += innerHTML + '<br/>';
  45. }
  46. </script>
  47. <body>
  48. <button onclick="close()">关闭连接</button>
  49. <div id="text"></div>
  50. </body>
  51. </html>

服务端代码:

  1. private static Map<String, SseEmitter> cache = new ConcurrentHashMap<>();
  2. String clientId;
  3. int sseId;
  4. @GetMapping("/create")
  5. public SseEmitter create(@RequestParam(name = "clientId", required = false) String clientId) {
  6. // 设置超时时间,0表示不过期。默认30000毫秒
  7. //可以在客户端一直断网、直接关闭页面但未提醒后端的情况下,服务端在一定时间等待后自动关闭网络连接
  8. SseEmitter sseEmitter = new SseEmitter(0L);
  9. // 是否需要给客户端推送ID
  10. if (Strings.isBlank(clientId)) {
  11. clientId = UUID.randomUUID().toString();
  12. }
  13. this.clientId = clientId;
  14. cache.put(clientId, sseEmitter);
  15. log.info("sse连接,当前客户端:{}", clientId);
  16. return sseEmitter;
  17. }
  18. @Scheduled(cron = "0/3 * * * * ? ")
  19. public void pushMessage() {
  20. try {
  21. sseId++;
  22. SseEmitter sseEmitter = cache.get(clientId);
  23. sseEmitter.send(
  24. SseEmitter
  25. .event()
  26. .data("帅气值暴增" + sseId)
  27. .id("" + sseId)
  28. .reconnectTime(3000)
  29. );
  30. } catch (Exception e) {
  31. log.error(e.getMessage());
  32. sseId--;
  33. }
  34. }
  35. @GetMapping("/close")
  36. public void close(String clientId) {
  37. SseEmitter sseEmitter = cache.get(clientId);
  38. if (sseEmitter != null) {
  39. sseEmitter.complete();
  40. cache.remove(clientId);
  41. }
  42. }

方法参数说明:

  1. SseEmitter
  2. .event()
  3. .data("帅气值暴增" + sseId)
  4. .id("" + sseId)
  5. .reconnectTime(3000)

SseEmitter.event()
用来得到一个记录数据的容器。

.data("帅气值暴增" + sseId)
发送给客户端的数据。

.id("" + sseId)
记录发送数据的标识,服务端可以通过HttpServletRequest的请求头中拿到这个id,判断是否中间有误漏发数据。

.reconnectTime(3000)
定义在网络连接断开后,客户端向后端发起重连的时间间隔(以毫秒为单位)。

效果:
在这里插入图片描述
参考资料:
https://docs.spring.io/spring-framework/docs/current/reference/html/web.html#mvc-ann-async-objects

https://docs.spring.io/spring-framework/docs/current/javadoc-api/org/springframework/web/servlet/mvc/method/annotation/SseEmitter.html

发表评论

表情:
评论列表 (有 0 条评论,191人围观)

还没有评论,来说两句吧...

相关阅读