本文整理了Java中io.reactivex.Flowable.parallel()
方法的一些代码示例,展示了Flowable.parallel()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.parallel()
方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:parallel
[英]Parallelizes the flow by creating multiple 'rails' (equal to the number of CPUs) and dispatches the upstream items to them in a round-robin fashion.
Note that the rails don't execute in parallel on their own and one needs to apply ParallelFlowable#runOn(Scheduler) to specify the Scheduler where each rail will execute.
To merge the parallel 'rails' back into a single sequence, use ParallelFlowable#sequential().
Backpressure: The operator requires the upstream to honor backpressure and each 'rail' honors backpressure as well. Scheduler: parallel does not operate by default on a particular Scheduler.
History: 2.0.5 - experimental
[中]通过创建多个“rails”(等于CPU的数量)并行化流,并以循环方式将上游项分派给它们。
请注意,rails本身并不并行执行,需要应用ParallelFlowable#runOn(调度程序)来指定每个rail将在何处执行的调度程序。
要将并行“rails”合并回单个序列,请使用ParallelFlowable#sequential()。
背压:操作员要求上游承受背压,每个“轨道”也承受背压。调度程序:默认情况下,并行程序不会在特定调度程序上运行。
历史:2.0.5-实验性
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = TestException.class)
public void toThrows() {
Flowable.range(1, 5)
.parallel()
.to(new Function<ParallelFlowable<Integer>, Flowable<Integer>>() {
@Override
public Flowable<Integer> apply(ParallelFlowable<Integer> pf) throws Exception {
throw new TestException();
}
});
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test
public void badParallelismStage() {
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
Flowable.range(1, 10)
.parallel(2)
.subscribe(new Subscriber[] { ts });
ts.assertFailure(IllegalArgumentException.class);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void normalDelayError() {
Flowable.just(1)
.parallel(1)
.sequentialDelayError(1)
.test()
.assertResult(1);
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test
public void toSortedList() {
TestSubscriber<List<Integer>> ts = new TestSubscriber<List<Integer>>();
Flowable.fromArray(10, 9, 8, 7, 6, 5, 4, 3, 2, 1)
.parallel()
.toSortedList(Functions.naturalComparator())
.subscribe(ts);
ts.assertResult(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void subscriberCount() {
ParallelFlowableTest.checkSubscriberCount(Flowable.range(1, 5).parallel()
.filter(Functions.alwaysTrue()));
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void delayErrorCancelBackpressured() {
TestSubscriber<Integer> ts = Flowable.range(1, 3)
.parallel(1)
.sequentialDelayError(1)
.test(0);
ts
.cancel();
ts.assertEmpty();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void rangeDelayErrorBackpressure2() {
Flowable.range(1, 3)
.parallel(1)
.sequentialDelayError(1)
.rebatchRequests(1)
.test()
.assertResult(1, 2, 3);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void mapWrongParallelismConditional() {
TestHelper.checkInvalidParallelSubscribers(
Flowable.just(1).parallel(1)
.map(Functions.identity(), ParallelFailureHandling.ERROR)
.filter(Functions.alwaysTrue())
);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void error() {
Flowable.error(new TestException())
.parallel(1)
.runOn(ImmediateThinScheduler.INSTANCE)
.sequential()
.test()
.assertFailure(TestException.class);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void rangeDelayErrorBackpressure() {
Flowable.range(1, 3)
.parallel(1)
.sequentialDelayError(1)
.take(2)
.rebatchRequests(1)
.test()
.assertResult(1, 2);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void error() {
Flowable.error(new TestException())
.parallel()
.filter(Functions.alwaysTrue())
.sequential()
.test()
.assertFailure(TestException.class);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void mapNoError() {
for (ParallelFailureHandling e : ParallelFailureHandling.values()) {
Flowable.just(1)
.parallel(1)
.map(Functions.identity(), e)
.sequential()
.test()
.assertResult(1);
}
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test
public void emptyConditionalBackpressured() {
TestSubscriber<Object> ts = new TestSubscriber<Object>(0L);
Flowable.empty()
.parallel(1)
.runOn(ImmediateThinScheduler.INSTANCE)
.filter(Functions.alwaysTrue())
.subscribe(new Subscriber[] { ts });
ts
.assertResult();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void filterNoError() {
for (ParallelFailureHandling e : ParallelFailureHandling.values()) {
Flowable.just(1)
.parallel(1)
.filter(Functions.alwaysTrue(), e)
.sequential()
.test()
.assertResult(1);
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void filterFalse() {
for (ParallelFailureHandling e : ParallelFailureHandling.values()) {
Flowable.just(1)
.parallel(1)
.filter(Functions.alwaysFalse(), e)
.sequential()
.test()
.assertResult();
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void mapErrorNoError() {
for (ParallelFailureHandling e : ParallelFailureHandling.values()) {
Flowable.<Integer>error(new TestException())
.parallel(1)
.map(Functions.identity(), e)
.sequential()
.test()
.assertFailure(TestException.class);
}
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test
public void errorConditionalBackpressured() {
TestSubscriber<Object> ts = new TestSubscriber<Object>(0L);
Flowable.error(new TestException())
.parallel(1)
.runOn(ImmediateThinScheduler.INSTANCE)
.filter(Functions.alwaysTrue())
.subscribe(new Subscriber[] { ts });
ts
.assertFailure(TestException.class);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void doOnNextErrorNoError() {
for (ParallelFailureHandling e : ParallelFailureHandling.values()) {
Flowable.<Integer>error(new TestException())
.parallel(1)
.doOnNext(this, e)
.sequential()
.test()
.assertFailure(TestException.class);
assertEquals(calls, 0);
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void mapConditionalNoError() {
for (ParallelFailureHandling e : ParallelFailureHandling.values()) {
Flowable.just(1)
.parallel(1)
.map(Functions.identity(), e)
.filter(Functions.alwaysTrue())
.sequential()
.test()
.assertResult(1);
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void mapErrorConditionalNoError() {
for (ParallelFailureHandling e : ParallelFailureHandling.values()) {
Flowable.<Integer>error(new TestException())
.parallel(1)
.map(Functions.identity(), e)
.filter(Functions.alwaysTrue())
.sequential()
.test()
.assertFailure(TestException.class);
}
}
内容来源于网络,如有侵权,请联系作者删除!