SSE技术是基于单工通信模式,只是单纯的客户端向服务端发送请求,服务端不会主动发送给客户端。服务端采取的策略是抓住这个请求不放,等数据更新的时候才返回给客户端,当客户端接收到消息后,再向服务端发送请求,周而复始。
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>SseEmitter</title>
</head>
<body>
<button onclick="closeSse()">关闭连接</button>
<div id="message"></div>
</body>
<script>
let source = null;
// 用时间戳模拟登录用户
const userId = new Date().getTime();
if (!!window.EventSource) {
// 建立连接
source = new EventSource('http://ip:端口/CreateSseConnect?clientId=39bd662b7942418595c21a1ef0af7fad');
/**
* 连接一旦建立,就会触发open事件
* 另一种写法:source.onopen = function (event) {}
*/
source.addEventListener('open', function (e) {
setMessageInnerHTML("建立连接。。。");
}, false);
/**
* 客户端收到服务器发来的数据
* 另一种写法:source.onmessage = function (event) {}
*/
source.addEventListener('message', function (e) {
setMessageInnerHTML(e.data);
});
/**
* 如果发生通信错误(比如连接中断),就会触发error事件
* 或者:
* 另一种写法:source.onerror = function (event) {}
*/
source.addEventListener('error', function (e) {
if (e.readyState === EventSource.CLOSED) {
setMessageInnerHTML("连接关闭");
} else {
console.log(e);
}
}, false);
} else {
setMessageInnerHTML("你的浏览器不支持SSE");
}
// 监听窗口关闭事件,主动去关闭sse连接,如果服务端设置永不过期,浏览器关闭后手动清理服务端数据
window.onbeforeunload = function () {
closeSse();
};
// 关闭Sse连接
function closeSse() {
source.close();
const httpRequest = new XMLHttpRequest();
httpRequest.open('GET', 'http://localhost:8080/sse/CloseConnect/?clientId=e410d4c1d71c469b8d719de5d39783b7', true);
httpRequest.send();
console.log("close");
}
// 将消息显示在网页上
function setMessageInnerHTML(innerHTML) {
document.getElementById('message').innerHTML += innerHTML + '<br/>';
}
</script>
</html>
Controller:
/**
* SSE长链接
*/
@RestController
@RequestMapping("/sse")
public class SseEmitterController {
@Autowired
private SseEmitterService sseEmitterService;
/**
* 创建SSE长链接
*
* @param clientId 客户端唯一ID(如果为空,则由后端生成并返回给前端)
* @return org.springframework.web.servlet.mvc.method.annotation.SseEmitter
**/
@CrossOrigin //如果nginx做了跨域处理,此处可去掉
@GetMapping("/CreateSseConnect")
public SseEmitter createSseConnect(@RequestParam(name = "clientId", required = false) String clientId) {
return sseEmitterService.createSseConnect(clientId);
}
/**
* 关闭SSE连接
*
* @param clientId 客户端ID
**/
@GetMapping("/CloseSseConnect")
public Result closeSseConnect(String clientId) {
sseEmitterService.closeSseConnect(clientId);
return ResultGenerator.genSuccessResult(true);
}
}
ServiceImpl
@Service
public class SseEmitterServiceImpl implements SseEmitterService {
/**
* 容器,保存连接,用于输出返回
*/
private static Map<String, SseEmitter> sseCache = new ConcurrentHashMap<>();
@Override
public SseEmitter createSseConnect(String clientId) {
// 设置超时时间,0表示不过期。默认30秒,超过时间未完成会抛出异常:AsyncRequestTimeoutException
SseEmitter sseEmitter = new SseEmitter(0L);
// 是否需要给客户端推送ID
if (StringUtils.isBlank(clientId)) {
clientId = IdUtil.simpleUUID();
}
// 注册回调
sseEmitter.onCompletion(completionCallBack(clientId));
sseCache.put(clientId, sseEmitter);
logger.info("创建新的sse连接,当前用户:{}", clientId);
try {
sseEmitter.send(SseEmitter.event().id(SseEmitterConstant.CLIENT_ID).data(clientId));
} catch (IOException e) {
logger.error("SseEmitterServiceImpl[createSseConnect]: 创建长链接异常,客户端ID:{}", clientId, e);
throw new BusinessException("创建连接异常!", e);
}
return sseEmitter;
}
@Override
public void closeSseConnect(String clientId) {
SseEmitter sseEmitter = sseCache.get(clientId);
if (sseEmitter != null) {
sseEmitter.complete();
removeUser(clientId);
}
}
// 根据客户端id获取SseEmitter对象
@Override
public SseEmitter getSseEmitterByClientId(String clientId) {
return sseCache.get(clientId);
}
// 推送消息到客户端,此处结合业务代码,业务中需要推送消息处调用即可向客户端主动推送消息
@Override
public void sendMsgToClient(List<SseEmitterResultVO> sseEmitterResultVOList) {
if (CollectionUtil.isEmpty(sseCache)) {
return;
}
for (Map.Entry<String, SseEmitter> entry : sseCache.entrySet()) {
sendMsgToClientByClientId(entry.getKey(), sseEmitterResultVOList, entry.getValue());
}
}
/**
* 推送消息到客户端
* 此处做了推送失败后,重试推送机制,可根据自己业务进行修改
*
* @param clientId 客户端ID
* @param sseEmitterResultVOList 推送信息,此处结合具体业务,定义自己的返回值即可
**/
private void sendMsgToClientByClientId(String clientId, List<SseEmitterResultVO> sseEmitterResultVOList, SseEmitter sseEmitter) {
if (sseEmitter == null) {
logger.error("SseEmitterServiceImpl[sendMsgToClient]: 推送消息失败:客户端{}未创建长链接,失败消息:{}",
clientId, sseEmitterResultVOList.toString());
return;
}
SseEmitter.SseEventBuilder sendData = SseEmitter.event().id(SseEmitterConstant.TASK_RESULT).data(sseEmitterResultVOList, MediaType.APPLICATION_JSON);
try {
sseEmitter.send(sendData);
} catch (IOException e) {
// 推送消息失败,记录错误日志,进行重推
logger.error("SseEmitterServiceImpl[sendMsgToClient]: 推送消息失败:{},尝试进行重推", sseEmitterResultVOList.toString(), e);
boolean isSuccess = true;
// 推送消息失败后,每隔10s推送一次,推送5次
for (int i = 0; i < 5; i++) {
try {
Thread.sleep(10000);
sseEmitter = sseCache.get(clientId);
if (sseEmitter == null) {
logger.error("SseEmitterServiceImpl[sendMsgToClient]:{}的第{}次消息重推失败,未创建长链接", clientId, i + 1);
continue;
}
sseEmitter.send(sendData);
} catch (Exception ex) {
logger.error("SseEmitterServiceImpl[sendMsgToClient]:{}的第{}次消息重推失败", clientId, i + 1, ex);
continue;
}
logger.info("SseEmitterServiceImpl[sendMsgToClient]:{}的第{}次消息重推成功,{}", clientId, i + 1, sseEmitterResultVOList.toString());
return;
}
}
}
/**
* 长链接完成后回调接口(即关闭连接时调用)
*
* @param clientId 客户端ID
* @return java.lang.Runnable
**/
private Runnable completionCallBack(String clientId) {
return () -> {
logger.info("结束连接:{}", clientId);
removeUser(clientId);
};
}
/**
* 连接超时时调用
*
* @param clientId 客户端ID
* @return java.lang.Runnable
**/
private Runnable timeoutCallBack(String clientId) {
return () -> {
logger.info("连接超时:{}", clientId);
removeUser(clientId);
};
}
/**
* 推送消息异常时,回调方法
*
* @param clientId 客户端ID
* @return java.util.function.Consumer<java.lang.Throwable>
**/
private Consumer<Throwable> errorCallBack(String clientId) {
return throwable -> {
logger.error("SseEmitterServiceImpl[errorCallBack]:连接异常,客户端ID:{}", clientId);
// 推送消息失败后,每隔10s推送一次,推送5次
for (int i = 0; i < 5; i++) {
try {
Thread.sleep(10000);
SseEmitter sseEmitter = sseCache.get(clientId);
if (sseEmitter == null) {
logger.error("SseEmitterServiceImpl[errorCallBack]:第{}次消息重推失败,未获取到 {} 对应的长链接", i + 1, clientId);
continue;
}
sseEmitter.send("失败后重新推送");
} catch (Exception e) {
e.printStackTrace();
}
}
};
}
/**
* 移除用户连接
*
* @param clientId 客户端ID
**/
private void removeUser(String clientId) {
sseCache.remove(clientId);
logger.info("SseEmitterServiceImpl[removeUser]:移除用户:{}", clientId);
}
}
如果项目中使用nginx对后端服务做了代理,nginx代理转发后,默认会在1min的时候断掉长链接,SSE需要设置自己的长链接时间,则需要在nginx中进行配置;
在反向代理的location块中加入如下配置
proxy_set_header Host $http_host; ##proxy_set_header用来重定义发往后端服务器的请求头
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
proxy_buffering off;
proxy_http_version 1.1;
proxy_read_timeout 600s; ##设置SSE长链接保持时间为 600s
1、前端报错:EventSource’s response has a MIME type (“application/json”) that is not “text/event-stream”. Aborting the connection
前端在创建SSE长链接时,完整的请求(包括参数和参数值)都必须放在new EventSource(完整请求)中;
2、创建长链接时,接口状态一直处于pending,检查后端nginx是否做相应配置;
3、推送消息失败:检查客户端创建链接时的id,和推送消息时的id是否一致;
整体业务流程为:客户端创建链接——>服务端保持生成SseEmitter对象,并通过SseEmitter对象实现向客户端主动推送消息——>客户端收到推送消息后,刷新页面(根据推送消息,请求相关业务接口)
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://lebron.blog.csdn.net/article/details/125453989
内容来源于网络,如有侵权,请联系作者删除!