终止对外部事件的订阅

xzv2uavs  于 2021-07-24  发布在  Java
关注(0)|答案(1)|浏览(395)

在应用程序中,我对外部http端点使用长轮询。我用Spring的React式来做这个 WebClient . 为了在应用程序停止时完全关闭(并避免难看的网络堆栈跟踪),我使用 takeUntil() 以一个 EmitterProcessor 我称之为 onNext() 当Spring停止我的豆子(我) SmartLifecycle ).
整个过程是这样的:

  1. @Component
  2. @RequiredArgsConstructor
  3. @Slf4j
  4. public class LongPollingMessageReceiver implements SmartLifecycle {
  5. private boolean running = true;
  6. private final EmitterProcessor<Boolean> shutdown = EmitterProcessor.create();
  7. private final BackendMessageReceiver backendMessageReceiver;
  8. public void waitForMessages() {
  9. Mono.defer(() -> backendMessageReceiver.receiveMessages()) // Calls WebClient
  10. .repeat()
  11. .takeUntilOther(shutdown)
  12. .subscribe(event -> {
  13. // do something when the http endpoint answers
  14. });
  15. }
  16. @Override
  17. public int getPhase() {
  18. // We need to cancel the subscriptions before Reactor/Netty shuts down.
  19. // Using @PreDestroy does not work because it is called *after* the Reactor/Netty shutdown.
  20. return 0;
  21. }
  22. @Override
  23. public void start() {
  24. // Not needed
  25. }
  26. @Override
  27. public void stop() {
  28. log.info("Stopping message subscriptions");
  29. shutdown.onNext(true);
  30. shutdown.onComplete();
  31. running = false;
  32. }
  33. @Override
  34. public boolean isRunning() {
  35. return running;
  36. }
  37. }

整个机制目前运行良好。然而, EmitterProcessor 标记为 @Deprecated javadoc说使用 Sink 相反。 Sink 不实现 Publisher 接口,因此无法传递到 takeUntilOther() .
我该怎么做才能解决这个问题而不被永远困在<3.5的React堆项目上?

slhcrj9b

slhcrj9b1#

Sinks 旨在作为面向开发人员的api,以编程方式触发React性事件。如果没有一种方法将这些作为一个典型的 Flux 或者 Mono 应用程序的其他部分。 Sinks.Many 有一个 asFlux() 从这个意义上说。同样地, Sinks.One 以及 Sinks.Empty 有一个 asMono() 查看。
这就是你可以用来传递给 takeUntilOther .

相关问题