springboot整合sse

  • 阿里云国际版折扣https://www.yundadi.com

  • 阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6

    链接: SpringBoot 实现SSE 服务器发送事件
    链接: SpringBoot 实现SSE 服务器发送事件
    链接: Springboot之整合SSE实现消息推送
    链接: springboot SseEmitter 消息推送
    链接: 在springboot中使用Sse(Server-sent Events)Web实时通信技术-服务器发送事件SseEmitter
    链接: Web 实时消息推送详解

    什么是sse

    链接: 著作权归所有原文链接
    服务器发送事件(Server-Sent Events)简称 SSE。这是一种服务器端到客户端(浏览器)的单向消息推送。SSE 基于 HTTP 协议的我们知道一般意义上的 HTTP 协议是无法做到服务端主动向客户端推送消息的但 SSE 是个例外它变换了一种思路。

    SSE 在服务器和客户端之间打开一个单向通道服务端响应的不再是一次性的数据包而是text/event-stream类型的数据流信息在有数据变更时从服务器流式传输到客户端。整体的实现思路有点类似于在线视频播放视频流会连续不断的推送到浏览器你也可以理解成客户端在完成一次用时很长网络不畅的下载。

    SSE 与 WebSocket 作用相似都可以建立服务端与浏览器之间的通信实现服务端向客户端推送消息但还是有些许不同

    • SSE 是基于 HTTP 协议的它们不需要特殊的协议或服务器实现即可工作WebSocket 需单独服务器来处理协议。
    • SSE 单向通信只能由服务端向客户端单向通信
      WebSocket 全双工通信即通信的双方可以同时发送和接受信息。
    • SSE 实现简单开发成本低无需引入其他组件
      WebSocket 传输数据需做二次解析开发门槛高一些。
    • SSE 默认支持断线重连WebSocket 则需要自己实现。
    • SSE 只能传送文本消息二进制数据需要经过编码后传送WebSocket 默认支持传送二进制数据。

    注意 SSE 不支持 IE 浏览器对其他主流浏览器兼容性做的还不错。

    SSE 与 WebSocket 该如何选择

    链接: 著作权归所有原文链接
    SSE 好像一直不被大家所熟知一部分原因是出现了 WebSocket这个提供了更丰富的协议来执行双向、全双工通信。对于游戏、即时通信以及需要双向近乎实时更新的场景拥有双向通道更具吸引力。但是在某些情况下不需要从客户端发送数据。而你只需要一些服务器操作的更新。比如站内信、未读消息数、状态更新、股票行情、监控数量等场景SEE 不管是从实现的难易和成本上都更加有优势。此外SSE 具有 WebSocket 在设计上缺乏的多种功能例如自动重新连接、事件 ID 和发送任意事件的能力。

    sse 规范

    在 html5 的定义中服务端 sse一般需要遵循以下要求

    • 请求头
    开启长连接 + 流方式传递
    Content-Type: text/event-stream;charset=UTF-8
    Cache-Control: no-cache
    Connection: keep-alive
    
    • 数据格式

    服务端发送的消息由 message 组成其格式如下:

    field:value\n\n
    

    其中 field 有五种可能

    • 空: 即以:开头表示注释可以理解为服务端向客户端发送的心跳确保连接不中断
    • data数据
    • event: 事件默认值
    • id: 数据标识符 id 字段表示相当于每一条数据的编号
    • retry: 重连时间

    后端

    import org.springframework.http.MediaType;
    import org.springframework.stereotype.Controller;
    import org.springframework.web.bind.annotation.*;
    import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
    
    import java.io.IOException;
    import java.util.Map;
    import java.util.concurrent.ConcurrentHashMap;
    
    /**
     * sse服务
     * 客户端关闭时只能等待超时 服务端断开连接
     */
    @Controller
    @RequestMapping(path = "sse")
    public class SseRest {
    
        private final static Map<String, SseEmitter> sseCache = new ConcurrentHashMap<>();
    
        /**
         * 连接sse服务
         * @param id
         * @return
         * @throws IOException
         */
        @GetMapping(path = "subscribe", produces = {MediaType.TEXT_EVENT_STREAM_VALUE})
        public SseEmitter push(String id) throws IOException {
            // 超时时间设置为5分钟用于演示客户端自动重连
            SseEmitter sseEmitter = new SseEmitter(5_60_000L);
            // 设置前端的重试时间为1s
            // send(): 发送数据如果传入的是一个非SseEventBuilder对象那么传递参数会被封装到 data 中
            sseEmitter.send(SseEmitter.event().reconnectTime(1000).data("连接成功"));
            sseCache.put(id, sseEmitter);
            System.out.println("add " + id);
            sseEmitter.send("你好", MediaType.APPLICATION_JSON);
            SseEmitter.SseEventBuilder data = SseEmitter.event().name("finish").id("6666").data("哈哈");
            sseEmitter.send(data);
            // onTimeout(): 超时回调触发
            sseEmitter.onTimeout(() -> {
                System.out.println(id + "超时");
                sseCache.remove(id);
            });
            // onCompletion(): 结束之后的回调触发
            sseEmitter.onCompletion(() -> System.out.println("完成"));
            return sseEmitter;
        }
    
        /**
         * http://127.0.0.1:8080/sse/push?id=7777&content=%E4%BD%A0%E5%93%88aaaaaa
         * @param id
         * @param content
         * @return
         * @throws IOException
         */
        @ResponseBody
        @GetMapping(path = "push")
        public String push(String id, String content) throws IOException {
            SseEmitter sseEmitter = sseCache.get(id);
            if (sseEmitter != null) {
                sseEmitter.send(content);
            }
            return "over";
        }
    
        @ResponseBody
        @GetMapping(path = "over")
        public String over(String id) {
            SseEmitter sseEmitter = sseCache.get(id);
            if (sseEmitter != null) {
                // complete(): 表示执行完毕会断开连接
                sseEmitter.complete();
                sseCache.remove(id);
            }
            return "over";
        }
    }
    

    前端

    <!doctype html>
    <html lang="en">
    <head>
        <title>Sse测试文档</title>
    </head>
    <body>
    <div>sse测试</div>
    <div id="result"></div>
    </body>
    </html>
    <script>
        let source = null;
        let userId = 7777
        if (window.EventSource) {
            // 建立连接
            source = new EventSource('http://localhost:8080/sse/subscribe?id='+userId);
            console.log("连接用户=" + userId);
            /**
             * 连接一旦建立就会触发open事件
             * 另一种写法source.onopen = function (event) {}
             */
            source.addEventListener('open', function (e) {
                console.log("建立连接。。。");
            }, false);
            /**
             * 客户端收到服务器发来的数据
             * 另一种写法source.onmessage = function (event) {}
             */
            source.addEventListener('message', function (e) {
                console.log(e.data);
            });
    
            source.addEventListener('finish', function (e) {
                console.log(e.id);
                console.log(e.data);
            });
    
        } else {
            console.log("你的浏览器不支持SSE");
        }
    </script>
    

    上面的实现用到了 SseEmitter 的几个方法解释如下
    send(): 发送数据如果传入的是一个非SseEventBuilder对象那么传递参数会被封装到 data 中
    complete(): 表示执行完毕会断开连接
    onTimeout(): 超时回调触发
    onCompletion(): 结束之后的回调触发

    在实际的业务开发中推荐使用SseEmitterSseEmitter已经帮我们把这些封装好了

  • 阿里云国际版折扣https://www.yundadi.com

  • 阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6
    标签: Spring