io.reactivex.Flowable.concatMapDelayError()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(7.1k)|赞(0)|评价(0)|浏览(133)

本文整理了Java中io.reactivex.Flowable.concatMapDelayError()方法的一些代码示例,展示了Flowable.concatMapDelayError()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.concatMapDelayError()方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:concatMapDelayError

Flowable.concatMapDelayError介绍

[英]Maps each of the items into a Publisher, subscribes to them one after the other, one at a time and emits their values in order while delaying any error from either this or any of the inner Publishers till all of them terminate. Backpressure: The operator honors backpressure from downstream. Both this and the inner Publishers are expected to honor backpressure as well. If the source Publisher violates the rule, the operator will signal a MissingBackpressureException. If any of the inner Publishers doesn't honor backpressure, that may throw an IllegalStateException when that Publisher completes. Scheduler: concatMapDelayError does not operate by default on a particular Scheduler.
[中]将每个项目映射到发布服务器,一次订阅一个项目,并按顺序发送其值,同时延迟此发布服务器或任何内部发布服务器的任何错误,直到所有发布服务器终止。背压:操作员接受来自下游的背压。这两个和内部出版商预计将尊重背压以及。如果源发布服务器违反规则,操作员将发出MissingBackpressureException信号。如果任何内部发布服务器不支持backpressure,则该发布服务器完成后可能抛出非法状态异常。调度程序:默认情况下,concatMapDelayError不会在特定调度程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Publisher<Integer> apply(Flowable<Object> f) throws Exception {
    return f.concatMapDelayError(Functions.justFunction(Flowable.just(2)));
  }
});

代码示例来源:origin: ReactiveX/RxJava

@Test
public void concatMapDelayErrorEmptySource() {
  assertSame(Flowable.empty(), Flowable.<Object>empty()
      .concatMapDelayError(new Function<Object, Flowable<Integer>>() {
        @Override
        public Flowable<Integer> apply(Object v) throws Exception {
          return Flowable.just(1);
        }
      }, 16, true));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void concatMapDelayErrorJustSource() {
  Flowable.just(0)
  .concatMapDelayError(new Function<Object, Flowable<Integer>>() {
    @Override
    public Flowable<Integer> apply(Object v) throws Exception {
      return Flowable.just(1);
    }
  }, 16, true)
  .test()
  .assertResult(1);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void concatMapJustSourceDelayError() {
  Flowable.just(0).hide()
  .concatMapDelayError(new Function<Object, Flowable<Integer>>() {
    @Override
    public Flowable<Integer> apply(Object v) throws Exception {
      return Flowable.just(1);
    }
  }, 16, false)
  .test()
  .assertResult(1);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void notVeryEnd() {
  Flowable.range(1, 2)
  .concatMapDelayError(Functions.justFunction(Flowable.error(new TestException())), 16, false)
  .test()
  .assertFailure(TestException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void concatMapScalarBackpressuredDelayError() {
  Flowable.just(1).hide()
  .concatMapDelayError(Functions.justFunction(Flowable.just(2)))
  .test(1L)
  .assertResult(2);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void fusedCrashDelayError() {
  Flowable.range(1, 2)
  .map(new Function<Integer, Object>() {
    @Override
    public Object apply(Integer v) throws Exception { throw new TestException(); }
  })
  .concatMapDelayError(Functions.justFunction(Flowable.just(1)))
  .test()
  .assertFailure(TestException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void concatMapEmptyDelayError() {
  Flowable.just(1).hide()
  .concatMapDelayError(Functions.justFunction(Flowable.empty()))
  .test()
  .assertResult();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void callableCrashDelayError() {
  Flowable.just(1).hide()
  .concatMapDelayError(Functions.justFunction(Flowable.fromCallable(new Callable<Object>() {
    @Override
    public Object call() throws Exception {
      throw new TestException();
    }
  })))
  .test()
  .assertFailure(TestException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void error() {
  Flowable.error(new TestException())
  .concatMapDelayError(Functions.justFunction(Flowable.just(2)), 16, false)
  .test()
  .assertFailure(TestException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void concatMapDelayError() {
  Flowable.just(Flowable.just(1), Flowable.just(2))
  .concatMapDelayError(Functions.<Flowable<Integer>>identity())
  .test()
  .assertResult(1, 2);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void innerWithScalar() {
  TestSubscriber<Integer> ts = TestSubscriber.create();
  Flowable.range(1, 3)
  .concatMapDelayError(new Function<Integer, Flowable<Integer>>() {
    @Override
    public Flowable<Integer> apply(Integer v) {
      return v == 2 ? Flowable.just(3) : Flowable.range(1, 2);
    }
  }).subscribe(ts);
  ts.assertValues(1, 2, 3, 1, 2);
  ts.assertNoErrors();
  ts.assertComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void innerWithEmpty() {
  TestSubscriber<Integer> ts = TestSubscriber.create();
  Flowable.range(1, 3)
  .concatMapDelayError(new Function<Integer, Flowable<Integer>>() {
    @Override
    public Flowable<Integer> apply(Integer v) {
      return v == 2 ? Flowable.<Integer>empty() : Flowable.range(1, 2);
    }
  }).subscribe(ts);
  ts.assertValues(1, 2, 1, 2);
  ts.assertNoErrors();
  ts.assertComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void concatMapInnerErrorDelayError() {
  Flowable.just(1).hide()
  .concatMapDelayError(Functions.justFunction(Flowable.error(new TestException())))
  .test()
  .assertFailure(TestException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void innerThrows() {
  TestSubscriber<Integer> ts = TestSubscriber.create();
  Flowable.just(1)
  .hide() // prevent scalar optimization
  .concatMapDelayError(new Function<Integer, Flowable<Integer>>() {
    @Override
    public Flowable<Integer> apply(Integer v) {
      throw new TestException();
    }
  }).subscribe(ts);
  ts.assertNoValues();
  ts.assertError(TestException.class);
  ts.assertNotComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings({ "unchecked", "rawtypes" })
@Test
public void concatMapDelayErrorJustJust() {
  TestSubscriber<Integer> ts = TestSubscriber.create();
  Flowable.just(Flowable.just(1)).concatMapDelayError((Function)Functions.identity()).subscribe(ts);
  ts.assertValue(1);
  ts.assertNoErrors();
  ts.assertComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings({ "unchecked", "rawtypes" })
@Test
public void concatMapDelayErrorJustRange() {
  TestSubscriber<Integer> ts = TestSubscriber.create();
  Flowable.just(Flowable.range(1, 5)).concatMapDelayError((Function)Functions.identity()).subscribe(ts);
  ts.assertValues(1, 2, 3, 4, 5);
  ts.assertNoErrors();
  ts.assertComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void concatMapDelayErrorWithError() {
  Flowable.just(Flowable.just(1).concatWith(Flowable.<Integer>error(new TestException())), Flowable.just(2))
  .concatMapDelayError(Functions.<Flowable<Integer>>identity())
  .test()
  .assertFailure(TestException.class, 1, 2);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void dispose() {
  TestHelper.checkDisposed(Flowable.range(1, 2)
  .concatMap(Functions.justFunction(Flowable.just(1))));
  TestHelper.checkDisposed(Flowable.range(1, 2)
  .concatMapDelayError(Functions.justFunction(Flowable.just(1))));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void innerErrors() {
  final Flowable<Integer> inner = Flowable.range(1, 2)
      .concatWith(Flowable.<Integer>error(new TestException()));
  TestSubscriber<Integer> ts = TestSubscriber.create();
  Flowable.range(1, 3).concatMapDelayError(new Function<Integer, Flowable<Integer>>() {
    @Override
    public Flowable<Integer> apply(Integer v) {
      return inner;
    }
  }).subscribe(ts);
  ts.assertValues(1, 2, 1, 2, 1, 2);
  ts.assertError(CompositeException.class);
  ts.assertNotComplete();
}

相关文章

Flowable类方法