终止对外部事件的订阅

xzv2uavs  于 2021-07-24  发布在  Java
关注(0)|答案(1)|浏览(353)

在应用程序中,我对外部http端点使用长轮询。我用Spring的React式来做这个 WebClient . 为了在应用程序停止时完全关闭(并避免难看的网络堆栈跟踪),我使用 takeUntil() 以一个 EmitterProcessor 我称之为 onNext() 当Spring停止我的豆子(我) SmartLifecycle ).
整个过程是这样的:

@Component
@RequiredArgsConstructor
@Slf4j
public class LongPollingMessageReceiver implements SmartLifecycle {
    private boolean running = true;

    private final EmitterProcessor<Boolean> shutdown = EmitterProcessor.create();

    private final BackendMessageReceiver backendMessageReceiver;

    public void waitForMessages() {
        Mono.defer(() -> backendMessageReceiver.receiveMessages()) // Calls WebClient
            .repeat()
            .takeUntilOther(shutdown)
            .subscribe(event -> {
                // do something when the http endpoint answers
            });
    }

    @Override
    public int getPhase() {
        // We need to cancel the subscriptions before Reactor/Netty shuts down.
        // Using @PreDestroy does not work because it is called *after* the Reactor/Netty shutdown.
        return 0;
    }

    @Override
    public void start() {
        // Not needed
    }

    @Override
    public void stop() {
        log.info("Stopping message subscriptions");
        shutdown.onNext(true);
        shutdown.onComplete();
        running = false;
    }

    @Override
    public boolean isRunning() {
        return running;
    }
}

整个机制目前运行良好。然而, EmitterProcessor 标记为 @Deprecated javadoc说使用 Sink 相反。 Sink 不实现 Publisher 接口,因此无法传递到 takeUntilOther() .
我该怎么做才能解决这个问题而不被永远困在<3.5的React堆项目上?

slhcrj9b

slhcrj9b1#

Sinks 旨在作为面向开发人员的api,以编程方式触发React性事件。如果没有一种方法将这些作为一个典型的 Flux 或者 Mono 应用程序的其他部分。 Sinks.Many 有一个 asFlux() 从这个意义上说。同样地, Sinks.One 以及 Sinks.Empty 有一个 asMono() 查看。
这就是你可以用来传递给 takeUntilOther .

相关问题