在应用程序中,我对外部http端点使用长轮询。我用Spring的React式来做这个 WebClient
. 为了在应用程序停止时完全关闭(并避免难看的网络堆栈跟踪),我使用 takeUntil()
以一个 EmitterProcessor
我称之为 onNext()
当Spring停止我的豆子(我) SmartLifecycle
).
整个过程是这样的:
@Component
@RequiredArgsConstructor
@Slf4j
public class LongPollingMessageReceiver implements SmartLifecycle {
private boolean running = true;
private final EmitterProcessor<Boolean> shutdown = EmitterProcessor.create();
private final BackendMessageReceiver backendMessageReceiver;
public void waitForMessages() {
Mono.defer(() -> backendMessageReceiver.receiveMessages()) // Calls WebClient
.repeat()
.takeUntilOther(shutdown)
.subscribe(event -> {
// do something when the http endpoint answers
});
}
@Override
public int getPhase() {
// We need to cancel the subscriptions before Reactor/Netty shuts down.
// Using @PreDestroy does not work because it is called *after* the Reactor/Netty shutdown.
return 0;
}
@Override
public void start() {
// Not needed
}
@Override
public void stop() {
log.info("Stopping message subscriptions");
shutdown.onNext(true);
shutdown.onComplete();
running = false;
}
@Override
public boolean isRunning() {
return running;
}
}
整个机制目前运行良好。然而, EmitterProcessor
标记为 @Deprecated
javadoc说使用 Sink
相反。 Sink
不实现 Publisher
接口,因此无法传递到 takeUntilOther()
.
我该怎么做才能解决这个问题而不被永远困在<3.5的React堆项目上?
1条答案
按热度按时间slhcrj9b1#
Sinks
旨在作为面向开发人员的api,以编程方式触发React性事件。如果没有一种方法将这些作为一个典型的Flux
或者Mono
应用程序的其他部分。Sinks.Many
有一个asFlux()
从这个意义上说。同样地,Sinks.One
以及Sinks.Empty
有一个asMono()
查看。这就是你可以用来传递给
takeUntilOther
.