本文整理了Java中io.reactivex.Flowable.observeOn()
方法的一些代码示例,展示了Flowable.observeOn()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.observeOn()
方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:observeOn
[英]Modifies a Publisher to perform its emissions and notifications on a specified Scheduler, asynchronously with a bounded buffer of #bufferSize() slots.
Note that onError notifications will cut ahead of onNext notifications on the emission thread if Scheduler is truly asynchronous. If strict event ordering is required, consider using the #observeOn(Scheduler,boolean) overload.
Backpressure: This operator honors backpressure from downstream and expects it from the source Publisher. Violating this expectation will lead to MissingBackpressureException. This is the most common operator where the exception pops up; look for sources up the chain that don't support backpressure, such as interval, timer, {code PublishSubject} or BehaviorSubject and apply any of the onBackpressureXXX operators before applying observeOn itself. Scheduler: You specify which Scheduler this operator will use.
[中]修改发布服务器以在指定的计划程序上执行其发射和通知,并使用#bufferSize()插槽的有界缓冲区异步执行。
请注意,如果调度程序是真正异步的,onError通知将在发出线程上的onNext通知之前切断。如果需要严格的事件排序,可以考虑使用超额观察(调度器,布尔)过载。
背压:该操作符接受来自下游的背压,并期望来自源发布者的背压。违反此预期将导致缺少BackPressureException。这是弹出异常的最常见运算符;查找链上不支持反压力的源,例如interval、timer、{code PublishSubject}或BehaviorSubject,并在应用observeOn自身之前应用任何onBackpressureXXX操作符。计划程序:指定此操作员将使用的计划程序。
代码示例来源:origin: ReactiveX/RxJava
@Override
public Flowable<Object> apply(Flowable<Object> f) throws Exception {
return f.observeOn(new TestScheduler());
}
});
代码示例来源:origin: ReactiveX/RxJava
@Test
public void doubleObserveOn() {
Flowable.just(1).hide()
.observeOn(Schedulers.computation())
.observeOn(Schedulers.single())
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(1);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void doubleObserveOnError() {
Flowable.error(new TestException())
.observeOn(Schedulers.computation())
.observeOn(Schedulers.single())
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertFailure(TestException.class);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void doubleObserveOnErrorConditional() {
Flowable.error(new TestException())
.observeOn(Schedulers.computation())
.distinct()
.observeOn(Schedulers.single())
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertFailure(TestException.class);
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = IllegalArgumentException.class)
public void testMapWithIssue417() {
Flowable.just(1).observeOn(Schedulers.computation())
.map(new Function<Integer, Integer>() {
@Override
public Integer apply(Integer arg0) {
throw new IllegalArgumentException("any error");
}
}).blockingSingle();
}
代码示例来源:origin: ReactiveX/RxJava
@Test(timeout = 500)
public void testWithObserveOn() throws InterruptedException {
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
Flowable.range(0, Flowable.bufferSize() * 10).onBackpressureDrop().observeOn(Schedulers.io()).subscribe(ts);
ts.awaitTerminalEvent();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void workerNotDisposedPrematurelyNormalInAsyncOut() {
DisposeTrackingScheduler s = new DisposeTrackingScheduler();
TestSubscriber<Integer> ts = new TestSubscriberFusedCanceling();
Flowable.just(1).hide().observeOn(s).subscribe(ts);
assertEquals(1, s.disposedCount.get());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void workerNotDisposedPrematurelySyncInNormalOut() {
DisposeTrackingScheduler s = new DisposeTrackingScheduler();
Flowable.concat(
Flowable.just(1).observeOn(s),
Flowable.just(2)
)
.test()
.assertResult(1, 2);
assertEquals(1, s.disposedCount.get());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void inputSyncFused() {
Flowable.range(1, 5)
.observeOn(Schedulers.single())
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(1, 2, 3, 4, 5);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void workerNotDisposedPrematurelySyncInNormalOutConditional() {
DisposeTrackingScheduler s = new DisposeTrackingScheduler();
Flowable.concat(
Flowable.just(1).observeOn(s).filter(Functions.alwaysTrue()),
Flowable.just(2)
)
.test()
.assertResult(1, 2);
assertEquals(1, s.disposedCount.get());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void request1Conditional() {
Flowable.range(1, 10).hide()
.observeOn(ImmediateThinScheduler.INSTANCE)
.filter(Functions.alwaysTrue())
.test(1L)
.assertValue(1);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void workerNotDisposedPrematurelyNormalInNormalOutConditional() {
DisposeTrackingScheduler s = new DisposeTrackingScheduler();
Flowable.concat(
Flowable.just(1).hide().observeOn(s).filter(Functions.alwaysTrue()),
Flowable.just(2)
)
.test()
.assertResult(1, 2);
assertEquals(1, s.disposedCount.get());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void bufferSizesWork() {
for (int i = 1; i <= 1024; i = i * 2) {
TestSubscriber<Integer> ts = TestSubscriber.create();
Flowable.range(1, 1000 * 1000).observeOn(Schedulers.computation(), false, i)
.subscribe(ts);
ts.awaitTerminalEvent();
ts.assertValueCount(1000 * 1000);
ts.assertComplete();
ts.assertNoErrors();
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void outputFusedReject() {
TestSubscriber<Integer> ts = SubscriberFusion.newTest(QueueFuseable.SYNC);
Flowable.range(1, 5).hide()
.observeOn(Schedulers.single())
.subscribe(ts);
SubscriberFusion.assertFusion(ts, QueueFuseable.NONE)
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(1, 2, 3, 4, 5);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void backFusedErrorConditional() {
TestSubscriber<Integer> ts = SubscriberFusion.newTest(QueueFuseable.ANY);
Flowable.<Integer>error(new TestException())
.observeOn(ImmediateThinScheduler.INSTANCE)
.filter(Functions.alwaysTrue())
.subscribe(ts);
SubscriberFusion.assertFusion(ts, QueueFuseable.ASYNC)
.assertFailure(TestException.class);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void backFusedConditional() {
TestSubscriber<Integer> ts = SubscriberFusion.newTest(QueueFuseable.ANY);
Flowable.range(1, 100).hide()
.observeOn(ImmediateThinScheduler.INSTANCE)
.filter(Functions.alwaysTrue())
.subscribe(ts);
SubscriberFusion.assertFusion(ts, QueueFuseable.ASYNC)
.assertValueCount(100)
.assertComplete()
.assertNoErrors();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testErrorDelayedAsync() {
Flowable<Integer> source = Flowable.just(1, 2, 3)
.concatWith(Flowable.<Integer>error(new TestException()));
TestSubscriber<Integer> ts = TestSubscriber.create();
source.observeOn(Schedulers.computation(), true).subscribe(ts);
ts.awaitTerminalEvent(2, TimeUnit.SECONDS);
ts.assertValues(1, 2, 3);
ts.assertError(TestException.class);
ts.assertNotComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testSkipLastWithBackpressure() {
Flowable<Integer> f = Flowable.range(0, Flowable.bufferSize() * 2).skipLast(Flowable.bufferSize() + 10);
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
f.observeOn(Schedulers.computation()).subscribe(ts);
ts.awaitTerminalEvent();
ts.assertNoErrors();
assertEquals((Flowable.bufferSize()) - 10, ts.valueCount());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testBackpressure2() {
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
Flowable.range(1, 100000).takeLast(Flowable.bufferSize() * 4)
.observeOn(Schedulers.newThread()).map(newSlowProcessor()).subscribe(ts);
ts.awaitTerminalEvent();
ts.assertNoErrors();
assertEquals(Flowable.bufferSize() * 4, ts.valueCount());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testInnerBackpressureWithAlignedBoundaries() {
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
Flowable.range(0, Flowable.bufferSize() * 2)
.concatWith(Flowable.range(0, Flowable.bufferSize() * 2))
.observeOn(Schedulers.computation()) // observeOn has a backpressured RxRingBuffer
.subscribe(ts);
ts.awaitTerminalEvent();
ts.assertNoErrors();
assertEquals(Flowable.bufferSize() * 4, ts.valueCount());
}
内容来源于网络,如有侵权,请联系作者删除!