io.reactivex.Flowable.parallel()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(6.0k)|赞(0)|评价(0)|浏览(426)

本文整理了Java中io.reactivex.Flowable.parallel()方法的一些代码示例,展示了Flowable.parallel()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.parallel()方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:parallel

Flowable.parallel介绍

[英]Parallelizes the flow by creating multiple 'rails' (equal to the number of CPUs) and dispatches the upstream items to them in a round-robin fashion.

Note that the rails don't execute in parallel on their own and one needs to apply ParallelFlowable#runOn(Scheduler) to specify the Scheduler where each rail will execute.

To merge the parallel 'rails' back into a single sequence, use ParallelFlowable#sequential().

Backpressure: The operator requires the upstream to honor backpressure and each 'rail' honors backpressure as well. Scheduler: parallel does not operate by default on a particular Scheduler.

History: 2.0.5 - experimental
[中]通过创建多个“rails”(等于CPU的数量)并行化流,并以循环方式将上游项分派给它们。
请注意,rails本身并不并行执行,需要应用ParallelFlowable#runOn(调度程序)来指定每个rail将在何处执行的调度程序。
要将并行“rails”合并回单个序列,请使用ParallelFlowable#sequential()。
背压:操作员要求上游承受背压,每个“轨道”也承受背压。调度程序:默认情况下,并行程序不会在特定调度程序上运行。
历史:2.0.5-实验性

代码示例

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = TestException.class)
  2. public void toThrows() {
  3. Flowable.range(1, 5)
  4. .parallel()
  5. .to(new Function<ParallelFlowable<Integer>, Flowable<Integer>>() {
  6. @Override
  7. public Flowable<Integer> apply(ParallelFlowable<Integer> pf) throws Exception {
  8. throw new TestException();
  9. }
  10. });
  11. }

代码示例来源:origin: ReactiveX/RxJava

  1. @SuppressWarnings("unchecked")
  2. @Test
  3. public void badParallelismStage() {
  4. TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
  5. Flowable.range(1, 10)
  6. .parallel(2)
  7. .subscribe(new Subscriber[] { ts });
  8. ts.assertFailure(IllegalArgumentException.class);
  9. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void normalDelayError() {
  3. Flowable.just(1)
  4. .parallel(1)
  5. .sequentialDelayError(1)
  6. .test()
  7. .assertResult(1);
  8. }

代码示例来源:origin: ReactiveX/RxJava

  1. @SuppressWarnings("unchecked")
  2. @Test
  3. public void toSortedList() {
  4. TestSubscriber<List<Integer>> ts = new TestSubscriber<List<Integer>>();
  5. Flowable.fromArray(10, 9, 8, 7, 6, 5, 4, 3, 2, 1)
  6. .parallel()
  7. .toSortedList(Functions.naturalComparator())
  8. .subscribe(ts);
  9. ts.assertResult(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
  10. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void subscriberCount() {
  3. ParallelFlowableTest.checkSubscriberCount(Flowable.range(1, 5).parallel()
  4. .filter(Functions.alwaysTrue()));
  5. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void delayErrorCancelBackpressured() {
  3. TestSubscriber<Integer> ts = Flowable.range(1, 3)
  4. .parallel(1)
  5. .sequentialDelayError(1)
  6. .test(0);
  7. ts
  8. .cancel();
  9. ts.assertEmpty();
  10. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void rangeDelayErrorBackpressure2() {
  3. Flowable.range(1, 3)
  4. .parallel(1)
  5. .sequentialDelayError(1)
  6. .rebatchRequests(1)
  7. .test()
  8. .assertResult(1, 2, 3);
  9. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void mapWrongParallelismConditional() {
  3. TestHelper.checkInvalidParallelSubscribers(
  4. Flowable.just(1).parallel(1)
  5. .map(Functions.identity(), ParallelFailureHandling.ERROR)
  6. .filter(Functions.alwaysTrue())
  7. );
  8. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void error() {
  3. Flowable.error(new TestException())
  4. .parallel(1)
  5. .runOn(ImmediateThinScheduler.INSTANCE)
  6. .sequential()
  7. .test()
  8. .assertFailure(TestException.class);
  9. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void rangeDelayErrorBackpressure() {
  3. Flowable.range(1, 3)
  4. .parallel(1)
  5. .sequentialDelayError(1)
  6. .take(2)
  7. .rebatchRequests(1)
  8. .test()
  9. .assertResult(1, 2);
  10. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void error() {
  3. Flowable.error(new TestException())
  4. .parallel()
  5. .filter(Functions.alwaysTrue())
  6. .sequential()
  7. .test()
  8. .assertFailure(TestException.class);
  9. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void mapNoError() {
  3. for (ParallelFailureHandling e : ParallelFailureHandling.values()) {
  4. Flowable.just(1)
  5. .parallel(1)
  6. .map(Functions.identity(), e)
  7. .sequential()
  8. .test()
  9. .assertResult(1);
  10. }
  11. }

代码示例来源:origin: ReactiveX/RxJava

  1. @SuppressWarnings("unchecked")
  2. @Test
  3. public void emptyConditionalBackpressured() {
  4. TestSubscriber<Object> ts = new TestSubscriber<Object>(0L);
  5. Flowable.empty()
  6. .parallel(1)
  7. .runOn(ImmediateThinScheduler.INSTANCE)
  8. .filter(Functions.alwaysTrue())
  9. .subscribe(new Subscriber[] { ts });
  10. ts
  11. .assertResult();
  12. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void filterNoError() {
  3. for (ParallelFailureHandling e : ParallelFailureHandling.values()) {
  4. Flowable.just(1)
  5. .parallel(1)
  6. .filter(Functions.alwaysTrue(), e)
  7. .sequential()
  8. .test()
  9. .assertResult(1);
  10. }
  11. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void filterFalse() {
  3. for (ParallelFailureHandling e : ParallelFailureHandling.values()) {
  4. Flowable.just(1)
  5. .parallel(1)
  6. .filter(Functions.alwaysFalse(), e)
  7. .sequential()
  8. .test()
  9. .assertResult();
  10. }
  11. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void mapErrorNoError() {
  3. for (ParallelFailureHandling e : ParallelFailureHandling.values()) {
  4. Flowable.<Integer>error(new TestException())
  5. .parallel(1)
  6. .map(Functions.identity(), e)
  7. .sequential()
  8. .test()
  9. .assertFailure(TestException.class);
  10. }
  11. }

代码示例来源:origin: ReactiveX/RxJava

  1. @SuppressWarnings("unchecked")
  2. @Test
  3. public void errorConditionalBackpressured() {
  4. TestSubscriber<Object> ts = new TestSubscriber<Object>(0L);
  5. Flowable.error(new TestException())
  6. .parallel(1)
  7. .runOn(ImmediateThinScheduler.INSTANCE)
  8. .filter(Functions.alwaysTrue())
  9. .subscribe(new Subscriber[] { ts });
  10. ts
  11. .assertFailure(TestException.class);
  12. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void doOnNextErrorNoError() {
  3. for (ParallelFailureHandling e : ParallelFailureHandling.values()) {
  4. Flowable.<Integer>error(new TestException())
  5. .parallel(1)
  6. .doOnNext(this, e)
  7. .sequential()
  8. .test()
  9. .assertFailure(TestException.class);
  10. assertEquals(calls, 0);
  11. }
  12. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void mapConditionalNoError() {
  3. for (ParallelFailureHandling e : ParallelFailureHandling.values()) {
  4. Flowable.just(1)
  5. .parallel(1)
  6. .map(Functions.identity(), e)
  7. .filter(Functions.alwaysTrue())
  8. .sequential()
  9. .test()
  10. .assertResult(1);
  11. }
  12. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void mapErrorConditionalNoError() {
  3. for (ParallelFailureHandling e : ParallelFailureHandling.values()) {
  4. Flowable.<Integer>error(new TestException())
  5. .parallel(1)
  6. .map(Functions.identity(), e)
  7. .filter(Functions.alwaysTrue())
  8. .sequential()
  9. .test()
  10. .assertFailure(TestException.class);
  11. }
  12. }

相关文章

Flowable类方法