io.reactivex.Flowable.parallel()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(6.0k)|赞(0)|评价(0)|浏览(382)

本文整理了Java中io.reactivex.Flowable.parallel()方法的一些代码示例,展示了Flowable.parallel()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.parallel()方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:parallel

Flowable.parallel介绍

[英]Parallelizes the flow by creating multiple 'rails' (equal to the number of CPUs) and dispatches the upstream items to them in a round-robin fashion.

Note that the rails don't execute in parallel on their own and one needs to apply ParallelFlowable#runOn(Scheduler) to specify the Scheduler where each rail will execute.

To merge the parallel 'rails' back into a single sequence, use ParallelFlowable#sequential().

Backpressure: The operator requires the upstream to honor backpressure and each 'rail' honors backpressure as well. Scheduler: parallel does not operate by default on a particular Scheduler.

History: 2.0.5 - experimental
[中]通过创建多个“rails”(等于CPU的数量)并行化流,并以循环方式将上游项分派给它们。
请注意,rails本身并不并行执行,需要应用ParallelFlowable#runOn(调度程序)来指定每个rail将在何处执行的调度程序。
要将并行“rails”合并回单个序列,请使用ParallelFlowable#sequential()。
背压:操作员要求上游承受背压,每个“轨道”也承受背压。调度程序:默认情况下,并行程序不会在特定调度程序上运行。
历史:2.0.5-实验性

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = TestException.class)
public void toThrows() {
  Flowable.range(1, 5)
  .parallel()
  .to(new Function<ParallelFlowable<Integer>, Flowable<Integer>>() {
    @Override
    public Flowable<Integer> apply(ParallelFlowable<Integer> pf) throws Exception {
      throw new TestException();
    }
  });
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unchecked")
@Test
public void badParallelismStage() {
  TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
  Flowable.range(1, 10)
  .parallel(2)
  .subscribe(new Subscriber[] { ts });
  ts.assertFailure(IllegalArgumentException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void normalDelayError() {
  Flowable.just(1)
  .parallel(1)
  .sequentialDelayError(1)
  .test()
  .assertResult(1);
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unchecked")
@Test
public void toSortedList() {
  TestSubscriber<List<Integer>> ts = new TestSubscriber<List<Integer>>();
  Flowable.fromArray(10, 9, 8, 7, 6, 5, 4, 3, 2, 1)
  .parallel()
  .toSortedList(Functions.naturalComparator())
  .subscribe(ts);
  ts.assertResult(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void subscriberCount() {
  ParallelFlowableTest.checkSubscriberCount(Flowable.range(1, 5).parallel()
  .filter(Functions.alwaysTrue()));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void delayErrorCancelBackpressured() {
  TestSubscriber<Integer> ts = Flowable.range(1, 3)
  .parallel(1)
  .sequentialDelayError(1)
  .test(0);
  ts
  .cancel();
  ts.assertEmpty();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void rangeDelayErrorBackpressure2() {
  Flowable.range(1, 3)
  .parallel(1)
  .sequentialDelayError(1)
  .rebatchRequests(1)
  .test()
  .assertResult(1, 2, 3);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void mapWrongParallelismConditional() {
  TestHelper.checkInvalidParallelSubscribers(
    Flowable.just(1).parallel(1)
    .map(Functions.identity(), ParallelFailureHandling.ERROR)
    .filter(Functions.alwaysTrue())
  );
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void error() {
  Flowable.error(new TestException())
  .parallel(1)
  .runOn(ImmediateThinScheduler.INSTANCE)
  .sequential()
  .test()
  .assertFailure(TestException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void rangeDelayErrorBackpressure() {
  Flowable.range(1, 3)
  .parallel(1)
  .sequentialDelayError(1)
  .take(2)
  .rebatchRequests(1)
  .test()
  .assertResult(1, 2);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void error() {
  Flowable.error(new TestException())
  .parallel()
  .filter(Functions.alwaysTrue())
  .sequential()
  .test()
  .assertFailure(TestException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void mapNoError() {
  for (ParallelFailureHandling e : ParallelFailureHandling.values()) {
    Flowable.just(1)
    .parallel(1)
    .map(Functions.identity(), e)
    .sequential()
    .test()
    .assertResult(1);
  }
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unchecked")
@Test
public void emptyConditionalBackpressured() {
  TestSubscriber<Object> ts = new TestSubscriber<Object>(0L);
  Flowable.empty()
  .parallel(1)
  .runOn(ImmediateThinScheduler.INSTANCE)
  .filter(Functions.alwaysTrue())
  .subscribe(new Subscriber[] { ts });
  ts
  .assertResult();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void filterNoError() {
  for (ParallelFailureHandling e : ParallelFailureHandling.values()) {
    Flowable.just(1)
    .parallel(1)
    .filter(Functions.alwaysTrue(), e)
    .sequential()
    .test()
    .assertResult(1);
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void filterFalse() {
  for (ParallelFailureHandling e : ParallelFailureHandling.values()) {
    Flowable.just(1)
    .parallel(1)
    .filter(Functions.alwaysFalse(), e)
    .sequential()
    .test()
    .assertResult();
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void mapErrorNoError() {
  for (ParallelFailureHandling e : ParallelFailureHandling.values()) {
    Flowable.<Integer>error(new TestException())
    .parallel(1)
    .map(Functions.identity(), e)
    .sequential()
    .test()
    .assertFailure(TestException.class);
  }
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unchecked")
@Test
public void errorConditionalBackpressured() {
  TestSubscriber<Object> ts = new TestSubscriber<Object>(0L);
  Flowable.error(new TestException())
  .parallel(1)
  .runOn(ImmediateThinScheduler.INSTANCE)
  .filter(Functions.alwaysTrue())
  .subscribe(new Subscriber[] { ts });
  ts
  .assertFailure(TestException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void doOnNextErrorNoError() {
  for (ParallelFailureHandling e : ParallelFailureHandling.values()) {
    Flowable.<Integer>error(new TestException())
    .parallel(1)
    .doOnNext(this, e)
    .sequential()
    .test()
    .assertFailure(TestException.class);
    assertEquals(calls, 0);
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void mapConditionalNoError() {
  for (ParallelFailureHandling e : ParallelFailureHandling.values()) {
    Flowable.just(1)
    .parallel(1)
    .map(Functions.identity(), e)
    .filter(Functions.alwaysTrue())
    .sequential()
    .test()
    .assertResult(1);
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void mapErrorConditionalNoError() {
  for (ParallelFailureHandling e : ParallelFailureHandling.values()) {
    Flowable.<Integer>error(new TestException())
    .parallel(1)
    .map(Functions.identity(), e)
    .filter(Functions.alwaysTrue())
    .sequential()
    .test()
    .assertFailure(TestException.class);
  }
}

相关文章

Flowable类方法