java—当底层数据源有新值时理解rxjava observable

jtjikinw  于 2021-06-26  发布在  Java
关注(0)|答案(1)|浏览(276)

我正在尝试rxjava可观察和观察者代码。我的目标是检查当底层源接收到新的数据值时事情是如何工作的。我的代码是:

List<Integer> numbers = new ArrayList<>();
Runnable r = new Runnable() {
            @Override
            public void run() {
                int i = 100;
                while(i < 110) {
                    numbers.add(i);
                    try {
                        Thread.sleep(10);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    i++;
                }
            }
        };
        numbers.add(0);
        numbers.add(1);
        numbers.add(2);
        Observable.fromIterable(numbers)
                .observeOn(Schedulers.io())
                .subscribe(i -> System.out.println("Received "+i+ " on "+ Thread.currentThread().getName()),
                        e -> e.printStackTrace());
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        Thread t = new Thread(r);
        t.start();

        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

所以我有一个数字列表。然后我有一个runnable,它将新的数字添加到这个列表中,添加之间有一个时间间隔。我还没开始写呢。我将0,1,2添加到列表中,然后用它创建一个observate,将observate调度到池中的线程上,最后订阅这个observate。当订阅发生时,observate发出值0、1、2并调用observate(执行传递给subscribe的lambda)。然后在主线程上引入1秒的延迟,然后使用前面创建的runnable生成一个新线程,还添加了最后一个延迟,这样应用程序就不会立即退出。
我所期望的是,当新的数字被添加到列表中时,必须调用observer,从而打印消息。但那不会发生。我的理解肯定错了。我是否也需要在调度器上放置observable?

tsm1rwdh

tsm1rwdh1#

这个 Observable.fromIterable() 方法是每次生成订阅时可观察的值的“一次性”加载。构建订阅之后发生的事情不再有影响。当你使用 subscribe(onNext, onError, onComplete) 方法与 onComplete 参数您将看到订阅已完全使用,并且已打印出三个初始值。
你可以使用 Subject (有点像 PublishSubject )在哪里使用 onNext() 方法添加“新值”,而先前生成的订阅仍处于活动状态(且未完成)。这样,您就可以先构建订阅并继续调用 onNext() 在主题中输入新值,直到完成并调用 onCompleted() .

相关问题