我有以下代码:
private static void log(Object msg) {
System.out.println(
Thread.currentThread().getName() +
": " + msg);
}
Observable<Integer> naturalNumbers = Observable.create(emitter -> {
log("Invoked"); // on main thread
Runnable r = () -> {
log("Invoked on another thread");
int i = 0;
while(!emitter.isDisposed()) {
log("Emitting "+ i);
emitter.onNext(i);
i += 1;
}
};
new Thread(r).start();
});
Disposable disposable = naturalNumbers.subscribe(i -> log("Received "+i));
这里我们有两个重要的lambda表达式。第一个是我们传递给observable.create的,第二个是我们传递给observable.subscribe()的回调。在第一个lambda中,我们创建一个新线程,然后在该线程上发出值。在第二个lambda中,我们有代码来接收第一个lambda代码中发出的那些值。我注意到这两个代码都是在同一个线程上执行的。
Thread-0: Invoked on another thread
Thread-0: Emitting 0
Thread-0: Received 0
Thread-0: Emitting 1
Thread-0: Received 1
Thread-0: Emitting 2
Thread-0: Received 2
为什么会这样?rxjava默认情况下是否在同一线程上运行代码发射值(observable)和代码接收值(observable)?
1条答案
按热度按时间yyyllmsg1#
让我们看看,如果你使用
Thread
要执行runnable:试验
输出
看来,主要的切入点是从
main
线程和新创建的Thread
被称为Thread-0
.为什么会这样?rxjava默认情况下是否在同一线程上运行代码发射值(observable)和代码接收值(observable)?
默认情况下
RxJava
是单线程的。因此,如果不是由observeOn
,subscribeOn
或不同的线程布局,将在consumer
(subscriber)-线程。这是因为RxJava
默认情况下,运行订阅堆栈上的所有内容。例2
输出2
在您的示例中,很明显,main方法是从主线程调用的。此外
subscribeActual
调用也在调用线程上运行(main
). 但是Observable#create
lambda呼叫onNext
从新创建的线程Thread-0
. 值从调用线程推送到订阅者。在这种情况下,调用线程是Thread-0
,因为它调用onNext
在下游用户上。如何区分生产者和消费者?
使用
observeOn
/subscribeOn
运算符来处理RxJava
.我应该使用低级线程构造吗ẁ是rxjava吗?
不,你不应该用
new Thread
为了把生产者和消费者分开。很容易违约,因为onNext
不能同时调用(交错调用)从而破坏契约。这就是为什么RxJava
提供一个名为Scheduler
与Worker
是为了减轻这种错误。注意:我认为这篇文章描述得很好:http://introtorx.com/content/v1.0.10621.0/15_schedulingandthreading.html . 请注意,这是rx.net,但原理完全相同。如果您想了解
RxJava
你也可以看看大卫的博客(https://akarnokd.blogspot.com/2015/05/schedulers-part-1.html)或者读这本书(rxjavaReact式编程)https://www.oreilly.com/library/view/reactive-programming-with/9781491931646/)