spring 向多个客户端广播SseEmitter

sbtkgmzw  于 2023-04-10  发布在  Spring
关注(0)|答案(1)|浏览(497)

我目前正在实现一个通知系统,该系统基于我在阅读有关Spring和SSE的文章时发现的一些示例(以非React式的方式)。
我已经成功地实现了一个解决方案,如果我有一个单一的客户端消费后端发送的事件,它工作得很好。
问题是当我打开几个浏览器并尝试向所有消费者触发事件时:只有订阅SSE广播器端点的最后一个客户端接收该通知。

如何同时向多个客户端触发事件?
如果客户端在同一个网络上,可能只有一个SSE连接是正常的?

下面是我的 * 控制器 *:

  1. @RestController
  2. @RequestMapping("/sse/servlet")
  3. class EventController {
  4. private val executorService: ScheduledExecutorService = Executors.newSingleThreadScheduledExecutor()
  5. private val emitters: MutableMap<String, SseEmitter> = mutableMapOf()
  6. private val objectMapper: ObjectMapper = ObjectMapper()
  7. private val log = KotlinLogging.logger {}
  8. @GetMapping("/notifications")
  9. fun listenNotifications(@RequestParam eventId: String): SseEmitter? {
  10. if (emitters[eventId] != null) {
  11. log.info("SSE connection already exists")
  12. return emitters[eventId]
  13. }
  14. val emitter = SseEmitter(TimeUnit.MINUTES.toMillis(10))
  15. emitter.onCompletion {
  16. log.info("SSE connection closed")
  17. emitters.remove(eventId)
  18. }
  19. emitter.onTimeout {
  20. log.info("SSE connection timed out")
  21. emitter.complete()
  22. }
  23. emitter.onError { throwable: Throwable? ->
  24. log.error("Listen SSE exception", throwable)
  25. }
  26. emitters[eventId] = emitter
  27. return emitter
  28. }
  29. @PostMapping("/notifications")
  30. @ResponseStatus(ACCEPTED)
  31. fun fireNotification(
  32. @RequestParam eventId: String,
  33. @RequestBody notification: Notification
  34. ) {
  35. val sseEmitter = emitters[eventId]
  36. if (sseEmitter === null) {
  37. log.info("SSE connection does not exist")
  38. return
  39. }
  40. handleEmitter(notification, sseEmitter, eventId)
  41. }
  42. private fun handleEmitter(
  43. notification: Notification,
  44. sseEmitter: SseEmitter,
  45. eventId: String
  46. ) = try {
  47. val data = objectMapper.writeValueAsString(notification)
  48. val sseEventBuilder = event().data(data)
  49. sseEmitter.send(sseEventBuilder)
  50. } catch (ioException: IOException) {
  51. log.error("Send SSE exception", ioException)
  52. emitters.remove(eventId)
  53. }

通知模型

  1. data class Notification(val message: String)

我非常简单的 application.yaml 属性文件

  1. server:
  2. servlet:
  3. context-path: /api

我的 gradle 配置

  1. import org.jetbrains.kotlin.gradle.tasks.KotlinCompile
  2. plugins {
  3. id("org.springframework.boot") version "3.0.4"
  4. id("io.spring.dependency-management") version "1.1.0"
  5. kotlin("jvm") version "1.7.22"
  6. kotlin("plugin.spring") version "1.7.22"
  7. }
  8. group = "com.ggardet"
  9. version = "0.0.1-SNAPSHOT"
  10. java.sourceCompatibility = JavaVersion.VERSION_17
  11. repositories {
  12. mavenCentral()
  13. }
  14. dependencies {
  15. implementation("com.fasterxml.jackson.module:jackson-module-kotlin")
  16. implementation("io.github.microutils:kotlin-logging-jvm:2.0.11")
  17. implementation("org.jetbrains.kotlin:kotlin-reflect")
  18. implementation("org.springframework.boot:spring-boot-starter-web")
  19. implementation("org.springframework.boot:spring-boot-starter-security")
  20. }
  21. tasks.withType<KotlinCompile> {
  22. kotlinOptions {
  23. freeCompilerArgs = listOf("-Xjsr305=strict")
  24. jvmTarget = "17"
  25. }
  26. }

最后是用于创建/使用事件的请求(使用httpie)

  1. # to listen for events I open 2 or 3 terminals and launch the following request
  2. http --stream -a user:user -v GET http://localhost:8080/api/sse/servlet/notifications\?eventId\=1
  3. # to fire a new event I open another terminal instance and launch this single request
  4. http -a user:user -v POST http://localhost:8080/api/sse/servlet/notifications\?eventId\=1 message=test

x1c 0d1x感谢任何形式的帮助

  • 注意:如果我删除“emitters”map并在类级别使用单个SseEmitter发送事件,我会遇到同样的问题。
nzkunb0c

nzkunb0c1#

所以我没有意识到客户端不能共享同一个SseEmitter。
我不得不创建一个SseEmitter每个订阅使其工作。
回答问题:

  • 每次新客户端订阅 */notification * 端点时,我都必须存储一个新的SseEmitter,而不是将同一个SeeEmitter共享给所有订阅者
  • 不需要,客户端不需要在不同的网络上即可订阅 */notification * 端点

不确定这是否是最好的方法,但我在这里发布我的解决方案,以防有人误解SseEmitter的工作原理,就像我做的一样:

  1. @RestController
  2. @RequestMapping("/sse/servlet")
  3. class EventController {
  4. private val executorService: ScheduledExecutorService = Executors.newSingleThreadScheduledExecutor()
  5. private val emitters = ConcurrentHashMap<String, CopyOnWriteArrayList<SseEmitter>>()
  6. private val objectMapper: ObjectMapper = ObjectMapper()
  7. private val log = KotlinLogging.logger {}
  8. @GetMapping("/notifications", produces = [MediaType.TEXT_EVENT_STREAM_VALUE])
  9. fun listenNotifications(@RequestParam eventId: String): SseEmitter? {
  10. val emitter = SseEmitter(Duration.ofMinutes(10).toMillis())
  11. emitter.onCompletion {
  12. log.info("SSE connection closed")
  13. emitters.remove(eventId)
  14. }
  15. emitter.onTimeout {
  16. log.info("SSE connection timed out")
  17. emitters.remove(eventId)
  18. emitter.complete()
  19. }
  20. emitter.onError { throwable: Throwable? ->
  21. log.error("Listen SSE exception", throwable)
  22. emitters.remove(eventId)
  23. }
  24. emitters.computeIfAbsent(eventId) { CopyOnWriteArrayList() }.add(emitter)
  25. return emitter
  26. }
  27. @PostMapping("/notifications")
  28. @ResponseStatus(ACCEPTED)
  29. fun fireNotification(
  30. @RequestParam eventId: String,
  31. @RequestBody notification: Notification
  32. ) = emitters[eventId]?.forEach { handleEmitter(notification, it) }
  33. private fun handleEmitter(
  34. notification: Notification,
  35. sseEmitter: SseEmitter,
  36. ) = try {
  37. val data = objectMapper.writeValueAsString(notification)
  38. val sseEventBuilder = event().data(data)
  39. sseEmitter.send(sseEventBuilder)
  40. } catch (ioException: IOException) {
  41. log.error("Send SSE exception", ioException)
  42. }
  43. }
展开查看全部

相关问题