Springboot之整合SSE实现消息推送

x33g5p2x  于2022-06-27 转载在 Spring  
字(7.5k)|赞(0)|评价(0)|浏览(1314)

Springboot之整合SSE实现消息推送

前言

项目中涉及到部分请求,后端处理时间较长,使用常规Http请求,页面等待时间太长,对用户不友好,故考虑使用长链接进行消息推送,可选方案有WebSocket、SSE,WebSocket可实现双工通信,SSE仅支持服务端向客户端推送消息,根据实际使用场景,SSE即可满足,故选用SSE。

一、SSE是什么?

SSE技术是基于单工通信模式,只是单纯的客户端向服务端发送请求,服务端不会主动发送给客户端。服务端采取的策略是抓住这个请求不放,等数据更新的时候才返回给客户端,当客户端接收到消息后,再向服务端发送请求,周而复始。

  • 注意:因为EventSource对象是SSE的客户端,可能会有浏览器对其不支持,但谷歌、火狐、360是可以的,IE不可以。
  • 优点:SSE和WebSocket相比,最大的优势是便利,服务端不需要其他的类库,开发难度较低,SSE和轮询相比它不用处理很多请求,不用每次建立新连接,延迟较低。
  • 缺点:如果客户端有很多,那就要保持很多长连接,这会占用服务器大量内存和连接数
  • sse 规范:在 html5 的定义中,服务端 sse,一般需要遵循以下要求:
    Content-Type: text/event-stream;
    charset=UTF-8Cache-Control: no-cache
    Connection: keep-alive

二、使用步骤

1.客户端代码示例

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>SseEmitter</title>
</head>
<body>
<button onclick="closeSse()">关闭连接</button>
<div id="message"></div>
</body>
<script>
    let source = null;

    // 用时间戳模拟登录用户
    const userId = new Date().getTime();

    if (!!window.EventSource) {

        // 建立连接
        source = new EventSource('http://ip:端口/CreateSseConnect?clientId=39bd662b7942418595c21a1ef0af7fad');

        /**
         * 连接一旦建立,就会触发open事件
         * 另一种写法:source.onopen = function (event) {}
         */
        source.addEventListener('open', function (e) {
            setMessageInnerHTML("建立连接。。。");
        }, false);

        /**
         * 客户端收到服务器发来的数据
         * 另一种写法:source.onmessage = function (event) {}
         */
        source.addEventListener('message', function (e) {
            setMessageInnerHTML(e.data);
        });

        /**
         * 如果发生通信错误(比如连接中断),就会触发error事件
         * 或者:
         * 另一种写法:source.onerror = function (event) {}
         */
        source.addEventListener('error', function (e) {
            if (e.readyState === EventSource.CLOSED) {
                setMessageInnerHTML("连接关闭");
            } else {
                console.log(e);
            }
        }, false);

    } else {
        setMessageInnerHTML("你的浏览器不支持SSE");
    }

    // 监听窗口关闭事件,主动去关闭sse连接,如果服务端设置永不过期,浏览器关闭后手动清理服务端数据
    window.onbeforeunload = function () {
        closeSse();
    };

    // 关闭Sse连接
    function closeSse() {
        source.close();
        const httpRequest = new XMLHttpRequest();
        httpRequest.open('GET', 'http://localhost:8080/sse/CloseConnect/?clientId=e410d4c1d71c469b8d719de5d39783b7', true);
        httpRequest.send();
        console.log("close");
    }

    // 将消息显示在网页上
    function setMessageInnerHTML(innerHTML) {
        document.getElementById('message').innerHTML += innerHTML + '<br/>';
    }
</script>
</html>

2.服务端整合

Controller:

/**
 * SSE长链接
 */
@RestController
@RequestMapping("/sse")
public class SseEmitterController {

    @Autowired
    private SseEmitterService sseEmitterService;

    /**
     * 创建SSE长链接
     *
     * @param clientId   客户端唯一ID(如果为空,则由后端生成并返回给前端)
     * @return org.springframework.web.servlet.mvc.method.annotation.SseEmitter
     **/
    @CrossOrigin //如果nginx做了跨域处理,此处可去掉
    @GetMapping("/CreateSseConnect")
    public SseEmitter createSseConnect(@RequestParam(name = "clientId", required = false) String clientId) {
        return sseEmitterService.createSseConnect(clientId);
    }

    /**
     * 关闭SSE连接
     *
     * @param clientId 客户端ID
     **/
    @GetMapping("/CloseSseConnect")
    public Result closeSseConnect(String clientId) {
        sseEmitterService.closeSseConnect(clientId);
        return ResultGenerator.genSuccessResult(true);
    }

}

ServiceImpl

@Service
public class SseEmitterServiceImpl implements SseEmitterService {
	
    /**
     * 容器,保存连接,用于输出返回
     */
    private static Map<String, SseEmitter> sseCache = new ConcurrentHashMap<>();
    
    @Override
    public SseEmitter createSseConnect(String clientId) {
        // 设置超时时间,0表示不过期。默认30秒,超过时间未完成会抛出异常:AsyncRequestTimeoutException
        SseEmitter sseEmitter = new SseEmitter(0L);
        // 是否需要给客户端推送ID
        if (StringUtils.isBlank(clientId)) {
            clientId = IdUtil.simpleUUID();
        }
        // 注册回调
        sseEmitter.onCompletion(completionCallBack(clientId));
        sseCache.put(clientId, sseEmitter);
        logger.info("创建新的sse连接,当前用户:{}", clientId);

        try {
            sseEmitter.send(SseEmitter.event().id(SseEmitterConstant.CLIENT_ID).data(clientId));
        } catch (IOException e) {
            logger.error("SseEmitterServiceImpl[createSseConnect]: 创建长链接异常,客户端ID:{}", clientId, e);
            throw new BusinessException("创建连接异常!", e);
        }
        return sseEmitter;
    }

    @Override
    public void closeSseConnect(String clientId) {
        SseEmitter sseEmitter = sseCache.get(clientId);
        if (sseEmitter != null) {
            sseEmitter.complete();
            removeUser(clientId);
        }
    }

	// 根据客户端id获取SseEmitter对象
    @Override
    public SseEmitter getSseEmitterByClientId(String clientId) {
        return sseCache.get(clientId);
    }

	// 推送消息到客户端,此处结合业务代码,业务中需要推送消息处调用即可向客户端主动推送消息
    @Override
    public void sendMsgToClient(List<SseEmitterResultVO> sseEmitterResultVOList) {
        if (CollectionUtil.isEmpty(sseCache)) {
            return;
        }
        for (Map.Entry<String, SseEmitter> entry : sseCache.entrySet()) {
            sendMsgToClientByClientId(entry.getKey(), sseEmitterResultVOList, entry.getValue());
        }
    }

    /**
     * 推送消息到客户端
     * 此处做了推送失败后,重试推送机制,可根据自己业务进行修改
     *
     * @param clientId               客户端ID
     * @param sseEmitterResultVOList 推送信息,此处结合具体业务,定义自己的返回值即可
     **/
    private void sendMsgToClientByClientId(String clientId, List<SseEmitterResultVO> sseEmitterResultVOList, SseEmitter sseEmitter) {
        if (sseEmitter == null) {
            logger.error("SseEmitterServiceImpl[sendMsgToClient]: 推送消息失败:客户端{}未创建长链接,失败消息:{}",
                    clientId, sseEmitterResultVOList.toString());
            return;
        }

        SseEmitter.SseEventBuilder sendData = SseEmitter.event().id(SseEmitterConstant.TASK_RESULT).data(sseEmitterResultVOList, MediaType.APPLICATION_JSON);
        try {
            sseEmitter.send(sendData);
        } catch (IOException e) {
            // 推送消息失败,记录错误日志,进行重推
            logger.error("SseEmitterServiceImpl[sendMsgToClient]: 推送消息失败:{},尝试进行重推", sseEmitterResultVOList.toString(), e);
            boolean isSuccess = true;
            // 推送消息失败后,每隔10s推送一次,推送5次
            for (int i = 0; i < 5; i++) {
                try {
                    Thread.sleep(10000);
                    sseEmitter = sseCache.get(clientId);
                    if (sseEmitter == null) {
                        logger.error("SseEmitterServiceImpl[sendMsgToClient]:{}的第{}次消息重推失败,未创建长链接", clientId, i + 1);
                        continue;
                    }
                    sseEmitter.send(sendData);
                } catch (Exception ex) {
                    logger.error("SseEmitterServiceImpl[sendMsgToClient]:{}的第{}次消息重推失败", clientId, i + 1, ex);
                    continue;
                }
                logger.info("SseEmitterServiceImpl[sendMsgToClient]:{}的第{}次消息重推成功,{}", clientId, i + 1, sseEmitterResultVOList.toString());
                return;
            }
        }
    }

    /**
     * 长链接完成后回调接口(即关闭连接时调用)
     *
     * @param clientId 客户端ID
     * @return java.lang.Runnable
     **/
    private Runnable completionCallBack(String clientId) {
        return () -> {
            logger.info("结束连接:{}", clientId);
            removeUser(clientId);
        };
    }

    /**
     * 连接超时时调用
     *
     * @param clientId 客户端ID
     * @return java.lang.Runnable
     **/
    private Runnable timeoutCallBack(String clientId) {
        return () -> {
            logger.info("连接超时:{}", clientId);
            removeUser(clientId);
        };
    }

    /**
     * 推送消息异常时,回调方法
     *
     * @param clientId 客户端ID
     * @return java.util.function.Consumer<java.lang.Throwable>
     **/
    private Consumer<Throwable> errorCallBack(String clientId) {
        return throwable -> {
            logger.error("SseEmitterServiceImpl[errorCallBack]:连接异常,客户端ID:{}", clientId);

            // 推送消息失败后,每隔10s推送一次,推送5次
            for (int i = 0; i < 5; i++) {
                try {
                    Thread.sleep(10000);
                    SseEmitter sseEmitter = sseCache.get(clientId);
                    if (sseEmitter == null) {
                        logger.error("SseEmitterServiceImpl[errorCallBack]:第{}次消息重推失败,未获取到 {} 对应的长链接", i + 1, clientId);
                        continue;
                    }
                    sseEmitter.send("失败后重新推送");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        };
    }

    /**
     * 移除用户连接
     *
     * @param clientId 客户端ID
     **/
    private void removeUser(String clientId) {
        sseCache.remove(clientId);
        logger.info("SseEmitterServiceImpl[removeUser]:移除用户:{}", clientId);
    }
}

3. Nginx配置

如果项目中使用nginx对后端服务做了代理,nginx代理转发后,默认会在1min的时候断掉长链接,SSE需要设置自己的长链接时间,则需要在nginx中进行配置;
在反向代理的location块中加入如下配置

proxy_set_header Host $http_host;  ##proxy_set_header用来重定义发往后端服务器的请求头
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
proxy_buffering off;
proxy_http_version  1.1;
proxy_read_timeout 600s; ##设置SSE长链接保持时间为 600s

4. 请求示例

常见问题

1、前端报错:EventSource’s response has a MIME type (“application/json”) that is not “text/event-stream”. Aborting the connection
前端在创建SSE长链接时,完整的请求(包括参数和参数值)都必须放在new EventSource(完整请求)中;
2、创建长链接时,接口状态一直处于pending,检查后端nginx是否做相应配置;
3、推送消息失败:检查客户端创建链接时的id,和推送消息时的id是否一致;

总结

整体业务流程为:客户端创建链接——>服务端保持生成SseEmitter对象,并通过SseEmitter对象实现向客户端主动推送消息——>客户端收到推送消息后,刷新页面(根据推送消息,请求相关业务接口)

相关文章