如何向connectableflowable发送取消信号?

kcugc4gi  于 2021-07-03  发布在  Java
关注(0)|答案(1)|浏览(408)

我使用一次性flowable来发布和订阅项目。但当我尝试使用connectableflowable时,我无法向发射器发送取消信号。我如何理解flowable是在flowable.create方法中处理的?
您可以通过注解和取消注解“publish().autoconnect()”代码来查看场景。

Disposable disposable = Flowable.create(emitter -> {
        AtomicBoolean isRunning = new AtomicBoolean(true);
        AtomicInteger i = new AtomicInteger();
        new Thread(() -> {
            while (isRunning.get()) {
                i.getAndIncrement();
                System.out.println("Emitting:" + i.get());
                emitter.onNext(i.get());
                try {
                    Thread.sleep(1_000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();

        emitter.setCancellable(() -> {
            System.out.println("Cancelled");
            isRunning.set(false);
        });
    }, BackpressureStrategy.BUFFER)
            .publish() //comment here
            .autoConnect() //and here
            .subscribe(s -> {
                System.out.println("Subscribed:" + s);
            });

    Thread.sleep(10_000);
    disposable.dispose();
    Thread.sleep(100_000);
vzgqcmou

vzgqcmou1#

有一个超负荷,让您访问 Disposable 要取消连接:

SerialDisposable sd = new SerialDisposable();

source.publish().autoConnect(1, sd::set);

// ...

sd.dispose();

相关问题