java Should SpringSseEmitter.complete()trigger an EventSource reconnect - how to close connection server-side

5gfr0r5j  于 2023-03-21  发布在  Java
关注(0)|答案(3)|浏览(316)

我正在尝试设置Spring SseEmitter来发送一系列正在运行的作业的状态更新。它似乎可以工作,但是:
每当我在Java服务器代码中调用emitter.complete()时,javascript EventSource客户端都会调用已注册的onerror函数,然后再次使用新连接调用我的Java端点。这在Firefox和Chrome中都会发生。
我可能可以从Java发送一个显式的“数据结束”消息,然后检测到这一点并在客户机上调用eventSource.close(),但是有更好的方法吗?
emitter.complete()的作用是什么?
此外,如果我总是必须在客户端终止连接,那么我猜服务器端的每个连接都会因超时或写入错误而终止,在这种情况下,我可能需要每隔几秒手动发送一次某种类型的心跳?
如果我不得不做这一切,我觉得我错过了什么。

wsxa1bj1

wsxa1bj11#

我在Sping Boot 应用程序中添加了以下代码来触发SSE连接close()

服务器端:

1.创建一个返回SseEmitter的简单控制器。
1.将后端逻辑 Package 在单线程执行器服务中。
1.将事件发送到SseEmitter。
1.完成后,通过SseEmitter发送类型为complete的事件。

@RestController
public class SearchController {

@Autowired
private SearchDelegate searchDelegate;

@GetMapping(value = "/{customerId}/search")
@ResponseStatus(HttpStatus.OK)
@ApiOperation(value = "Search Sources", notes = "Search Sources")
@ApiResponses(value = {
        @ApiResponse(code = 201, message = "OK"),
        @ApiResponse(code = 401, message = "Unauthorized")
})
@ResponseBody
public SseEmitter search(@ApiParam(name = "searchCriteria", value = "searchCriteria", required = true) @ModelAttribute @Valid final SearchCriteriaDto searchCriteriaDto) throws Exception {
    return searchDelegate.route(searchCriteriaDto);
  }
}


@Service
public class SearchDelegate {
public static final String SEARCH_EVENT_NAME = "SEARCH";
public static final String COMPLETE_EVENT_NAME = "COMPLETE";
public static final String COMPLETE_EVENT_DATA = "{\"name\": \"COMPLETED_STREAM\"}";

@Autowired
private SearchService searchService;

private ExecutorService executor = Executors.newCachedThreadPool();

public SseEmitter route(SearchCriteriaDto searchCriteriaDto) throws Exception {
    SseEmitter emitter = new SseEmitter();
    executor.execute(() -> {
        try {
            if(!searchCriteriaDto.getCustomerSources().isEmpty()) {
                searchCriteriaDto.getCustomerSources().forEach(customerSource -> {
                    try {
                        SearchResponse searchResponse = searchService.search(searchCriteriaDto);
                        emitter.send(SseEmitter.event()
                                .id(customerSource.getSourceId())
                                .name(SEARCH_EVENT_NAME)
                                .data(searchResponse));
                    } catch (Exception e) {
                        log.error("Error while executing query for customer {} with source {}, Caused by {}",
                                customerId, source.getType(), e.getMessage());
                    }
                });
            }else {
                log.debug("No available customerSources for the specified customer");
            }
            emitter.send(SseEmitter.event().
                    id(String.valueOf(System.currentTimeMillis()))
                    .name(COMPLETE_EVENT_NAME)
                    .data(COMPLETE_EVENT_DATA));
            emitter.complete();
        } catch (Exception ex) {
            emitter.completeWithError(ex);
        }
    });
    return emitter;
   }
}

客户端:

1.由于我们在SseEmitter上指定了事件的name,因此事件将在浏览器上被分派到指定事件名称的侦听器;网站源代码应该使用addEventListener()来监听命名事件。(* 注意:如果没有为消息指定事件名称,则调用onmessage处理程序 *)
1.在COMPLETE事件上调用EventSource以释放客户端连接。
https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events

var sse = new EventSource('http://localhost:8080/federation/api/customers/5d96348feb061d13f46aa6ce/search?nativeQuery=true&queryString=*&size=10&customerSources=1,2,3&start=0');

sse.addEventListener("SEARCH", function(evt) {
   var data = JSON.parse(evt.data);
   console.log(data);
});

sse.addEventListener("COMPLETE", function(evt) {
   console.log(evt);
   sse.close();
});
lstz6jyr

lstz6jyr2#

根据服务器发送事件的HTML标准
如果连接关闭,客户端将重新连接;可以使用HTTP 204 NoContent响应代码来告知客户端停止重新连接。
所以Spring的SseEmitter的行为符合预期,complete()的目的是确保所有事件都被发送,然后关闭连接。
您需要实现服务器端逻辑,该逻辑将在后续请求中返回204 http代码(例如,通过检查会话id),或者发送一个特殊事件并在收到suggested by Ashraf Sarhan后关闭客户端的连接

lp0sw83n

lp0sw83n3#

我将继续sergei-kozelko答案。
你得回去

return new ResponseEntity(emitter, HttpStatus.NO_CONTENT);

方法签名应为:

public ResponseEntity<SseEmitter> route(SearchCriteriaDto searchCriteriaDto)

相关问题