io.reactivex.Flowable.concatMapDelayError()方法的使用及代码示例

x33g5p2x  于2022-01-26 转载在 其他  
字(7.1k)|赞(0)|评价(0)|浏览(151)

本文整理了Java中io.reactivex.Flowable.concatMapDelayError()方法的一些代码示例,展示了Flowable.concatMapDelayError()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.concatMapDelayError()方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:concatMapDelayError

Flowable.concatMapDelayError介绍

[英]Maps each of the items into a Publisher, subscribes to them one after the other, one at a time and emits their values in order while delaying any error from either this or any of the inner Publishers till all of them terminate. Backpressure: The operator honors backpressure from downstream. Both this and the inner Publishers are expected to honor backpressure as well. If the source Publisher violates the rule, the operator will signal a MissingBackpressureException. If any of the inner Publishers doesn't honor backpressure, that may throw an IllegalStateException when that Publisher completes. Scheduler: concatMapDelayError does not operate by default on a particular Scheduler.
[中]将每个项目映射到发布服务器,一次订阅一个项目,并按顺序发送其值,同时延迟此发布服务器或任何内部发布服务器的任何错误,直到所有发布服务器终止。背压:操作员接受来自下游的背压。这两个和内部出版商预计将尊重背压以及。如果源发布服务器违反规则,操作员将发出MissingBackpressureException信号。如果任何内部发布服务器不支持backpressure,则该发布服务器完成后可能抛出非法状态异常。调度程序:默认情况下,concatMapDelayError不会在特定调度程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public Publisher<Integer> apply(Flowable<Object> f) throws Exception {
  3. return f.concatMapDelayError(Functions.justFunction(Flowable.just(2)));
  4. }
  5. });

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void concatMapDelayErrorEmptySource() {
  3. assertSame(Flowable.empty(), Flowable.<Object>empty()
  4. .concatMapDelayError(new Function<Object, Flowable<Integer>>() {
  5. @Override
  6. public Flowable<Integer> apply(Object v) throws Exception {
  7. return Flowable.just(1);
  8. }
  9. }, 16, true));
  10. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void concatMapDelayErrorJustSource() {
  3. Flowable.just(0)
  4. .concatMapDelayError(new Function<Object, Flowable<Integer>>() {
  5. @Override
  6. public Flowable<Integer> apply(Object v) throws Exception {
  7. return Flowable.just(1);
  8. }
  9. }, 16, true)
  10. .test()
  11. .assertResult(1);
  12. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void concatMapJustSourceDelayError() {
  3. Flowable.just(0).hide()
  4. .concatMapDelayError(new Function<Object, Flowable<Integer>>() {
  5. @Override
  6. public Flowable<Integer> apply(Object v) throws Exception {
  7. return Flowable.just(1);
  8. }
  9. }, 16, false)
  10. .test()
  11. .assertResult(1);
  12. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void notVeryEnd() {
  3. Flowable.range(1, 2)
  4. .concatMapDelayError(Functions.justFunction(Flowable.error(new TestException())), 16, false)
  5. .test()
  6. .assertFailure(TestException.class);
  7. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void concatMapScalarBackpressuredDelayError() {
  3. Flowable.just(1).hide()
  4. .concatMapDelayError(Functions.justFunction(Flowable.just(2)))
  5. .test(1L)
  6. .assertResult(2);
  7. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void fusedCrashDelayError() {
  3. Flowable.range(1, 2)
  4. .map(new Function<Integer, Object>() {
  5. @Override
  6. public Object apply(Integer v) throws Exception { throw new TestException(); }
  7. })
  8. .concatMapDelayError(Functions.justFunction(Flowable.just(1)))
  9. .test()
  10. .assertFailure(TestException.class);
  11. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void concatMapEmptyDelayError() {
  3. Flowable.just(1).hide()
  4. .concatMapDelayError(Functions.justFunction(Flowable.empty()))
  5. .test()
  6. .assertResult();
  7. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void callableCrashDelayError() {
  3. Flowable.just(1).hide()
  4. .concatMapDelayError(Functions.justFunction(Flowable.fromCallable(new Callable<Object>() {
  5. @Override
  6. public Object call() throws Exception {
  7. throw new TestException();
  8. }
  9. })))
  10. .test()
  11. .assertFailure(TestException.class);
  12. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void error() {
  3. Flowable.error(new TestException())
  4. .concatMapDelayError(Functions.justFunction(Flowable.just(2)), 16, false)
  5. .test()
  6. .assertFailure(TestException.class);
  7. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void concatMapDelayError() {
  3. Flowable.just(Flowable.just(1), Flowable.just(2))
  4. .concatMapDelayError(Functions.<Flowable<Integer>>identity())
  5. .test()
  6. .assertResult(1, 2);
  7. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void innerWithScalar() {
  3. TestSubscriber<Integer> ts = TestSubscriber.create();
  4. Flowable.range(1, 3)
  5. .concatMapDelayError(new Function<Integer, Flowable<Integer>>() {
  6. @Override
  7. public Flowable<Integer> apply(Integer v) {
  8. return v == 2 ? Flowable.just(3) : Flowable.range(1, 2);
  9. }
  10. }).subscribe(ts);
  11. ts.assertValues(1, 2, 3, 1, 2);
  12. ts.assertNoErrors();
  13. ts.assertComplete();
  14. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void innerWithEmpty() {
  3. TestSubscriber<Integer> ts = TestSubscriber.create();
  4. Flowable.range(1, 3)
  5. .concatMapDelayError(new Function<Integer, Flowable<Integer>>() {
  6. @Override
  7. public Flowable<Integer> apply(Integer v) {
  8. return v == 2 ? Flowable.<Integer>empty() : Flowable.range(1, 2);
  9. }
  10. }).subscribe(ts);
  11. ts.assertValues(1, 2, 1, 2);
  12. ts.assertNoErrors();
  13. ts.assertComplete();
  14. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void concatMapInnerErrorDelayError() {
  3. Flowable.just(1).hide()
  4. .concatMapDelayError(Functions.justFunction(Flowable.error(new TestException())))
  5. .test()
  6. .assertFailure(TestException.class);
  7. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void innerThrows() {
  3. TestSubscriber<Integer> ts = TestSubscriber.create();
  4. Flowable.just(1)
  5. .hide() // prevent scalar optimization
  6. .concatMapDelayError(new Function<Integer, Flowable<Integer>>() {
  7. @Override
  8. public Flowable<Integer> apply(Integer v) {
  9. throw new TestException();
  10. }
  11. }).subscribe(ts);
  12. ts.assertNoValues();
  13. ts.assertError(TestException.class);
  14. ts.assertNotComplete();
  15. }

代码示例来源:origin: ReactiveX/RxJava

  1. @SuppressWarnings({ "unchecked", "rawtypes" })
  2. @Test
  3. public void concatMapDelayErrorJustJust() {
  4. TestSubscriber<Integer> ts = TestSubscriber.create();
  5. Flowable.just(Flowable.just(1)).concatMapDelayError((Function)Functions.identity()).subscribe(ts);
  6. ts.assertValue(1);
  7. ts.assertNoErrors();
  8. ts.assertComplete();
  9. }

代码示例来源:origin: ReactiveX/RxJava

  1. @SuppressWarnings({ "unchecked", "rawtypes" })
  2. @Test
  3. public void concatMapDelayErrorJustRange() {
  4. TestSubscriber<Integer> ts = TestSubscriber.create();
  5. Flowable.just(Flowable.range(1, 5)).concatMapDelayError((Function)Functions.identity()).subscribe(ts);
  6. ts.assertValues(1, 2, 3, 4, 5);
  7. ts.assertNoErrors();
  8. ts.assertComplete();
  9. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void concatMapDelayErrorWithError() {
  3. Flowable.just(Flowable.just(1).concatWith(Flowable.<Integer>error(new TestException())), Flowable.just(2))
  4. .concatMapDelayError(Functions.<Flowable<Integer>>identity())
  5. .test()
  6. .assertFailure(TestException.class, 1, 2);
  7. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void dispose() {
  3. TestHelper.checkDisposed(Flowable.range(1, 2)
  4. .concatMap(Functions.justFunction(Flowable.just(1))));
  5. TestHelper.checkDisposed(Flowable.range(1, 2)
  6. .concatMapDelayError(Functions.justFunction(Flowable.just(1))));
  7. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void innerErrors() {
  3. final Flowable<Integer> inner = Flowable.range(1, 2)
  4. .concatWith(Flowable.<Integer>error(new TestException()));
  5. TestSubscriber<Integer> ts = TestSubscriber.create();
  6. Flowable.range(1, 3).concatMapDelayError(new Function<Integer, Flowable<Integer>>() {
  7. @Override
  8. public Flowable<Integer> apply(Integer v) {
  9. return inner;
  10. }
  11. }).subscribe(ts);
  12. ts.assertValues(1, 2, 1, 2, 1, 2);
  13. ts.assertError(CompositeException.class);
  14. ts.assertNotComplete();
  15. }

相关文章

Flowable类方法