io.reactivex.Observable.concatMapDelayError()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(9.7k)|赞(0)|评价(0)|浏览(190)

本文整理了Java中io.reactivex.Observable.concatMapDelayError()方法的一些代码示例,展示了Observable.concatMapDelayError()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.concatMapDelayError()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:concatMapDelayError

Observable.concatMapDelayError介绍

[英]Maps each of the items into an ObservableSource, subscribes to them one after the other, one at a time and emits their values in order while delaying any error from either this or any of the inner ObservableSources till all of them terminate.

Scheduler: concatMapDelayError does not operate by default on a particular Scheduler.
[中]将每个项目映射到一个ObservableSource中,一次一个地逐个订阅它们,并按顺序发送它们的值,同时延迟来自该或任何内部ObservableSource的任何错误,直到它们全部终止。
调度程序:默认情况下,concatMapDelayError不会在特定调度程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

  1. /**
  2. * Maps each of the items into an ObservableSource, subscribes to them one after the other,
  3. * one at a time and emits their values in order
  4. * while delaying any error from either this or any of the inner ObservableSources
  5. * till all of them terminate.
  6. * <p>
  7. * <img width="640" height="347" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concatMapDelayError.o.png" alt="">
  8. * <dl>
  9. * <dt><b>Scheduler:</b></dt>
  10. * <dd>{@code concatMapDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
  11. * </dl>
  12. *
  13. * @param <R> the result value type
  14. * @param mapper the function that maps the items of this ObservableSource into the inner ObservableSources.
  15. * @return the new ObservableSource instance with the concatenation behavior
  16. */
  17. @CheckReturnValue
  18. @SchedulerSupport(SchedulerSupport.NONE)
  19. public final <R> Observable<R> concatMapDelayError(Function<? super T, ? extends ObservableSource<? extends R>> mapper) {
  20. return concatMapDelayError(mapper, bufferSize(), true);
  21. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void concatMapDelayError() {
  3. Observable.just(Observable.just(1), Observable.just(2))
  4. .concatMapDelayError(Functions.<Observable<Integer>>identity())
  5. .test()
  6. .assertResult(1, 2);
  7. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void concatMapDelayErrorJustSource() {
  3. Observable.just(0)
  4. .concatMapDelayError(new Function<Object, ObservableSource<Integer>>() {
  5. @Override
  6. public ObservableSource<Integer> apply(Object v) throws Exception {
  7. return Observable.just(1);
  8. }
  9. }, 16, true)
  10. .test()
  11. .assertResult(1);
  12. }

代码示例来源:origin: redisson/redisson

  1. /**
  2. * Maps each of the items into an ObservableSource, subscribes to them one after the other,
  3. * one at a time and emits their values in order
  4. * while delaying any error from either this or any of the inner ObservableSources
  5. * till all of them terminate.
  6. * <p>
  7. * <img width="640" height="347" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concatMapDelayError.o.png" alt="">
  8. * <dl>
  9. * <dt><b>Scheduler:</b></dt>
  10. * <dd>{@code concatMapDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
  11. * </dl>
  12. *
  13. * @param <R> the result value type
  14. * @param mapper the function that maps the items of this ObservableSource into the inner ObservableSources.
  15. * @return the new ObservableSource instance with the concatenation behavior
  16. */
  17. @CheckReturnValue
  18. @SchedulerSupport(SchedulerSupport.NONE)
  19. public final <R> Observable<R> concatMapDelayError(Function<? super T, ? extends ObservableSource<? extends R>> mapper) {
  20. return concatMapDelayError(mapper, bufferSize(), true);
  21. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void fusedPollThrowsDelayError() {
  3. Observable.just(1)
  4. .map(new Function<Integer, Integer>() {
  5. @Override
  6. public Integer apply(Integer v) throws Exception {
  7. throw new TestException();
  8. }
  9. })
  10. .concatMapDelayError(new Function<Integer, ObservableSource<Integer>>() {
  11. @Override
  12. public ObservableSource<Integer> apply(Integer v) throws Exception {
  13. return Observable.range(v, 2);
  14. }
  15. })
  16. .test()
  17. .assertFailure(TestException.class);
  18. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void mapperThrowsDelayError() {
  3. Observable.just(1).hide()
  4. .concatMapDelayError(new Function<Integer, ObservableSource<Integer>>() {
  5. @Override
  6. public ObservableSource<Integer> apply(Integer v) throws Exception {
  7. throw new TestException();
  8. }
  9. })
  10. .test()
  11. .assertFailure(TestException.class);
  12. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void normalDelayErrors() {
  3. Observable.just(1).hide()
  4. .concatMapDelayError(new Function<Integer, ObservableSource<Integer>>() {
  5. @Override
  6. public ObservableSource<Integer> apply(Integer v) throws Exception {
  7. return Observable.range(v, 2);
  8. }
  9. })
  10. .test()
  11. .assertResult(1, 2);
  12. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void normalDelayErrorsTillTheEnd() {
  3. Observable.just(1).hide()
  4. .concatMapDelayError(new Function<Integer, ObservableSource<Integer>>() {
  5. @Override
  6. public ObservableSource<Integer> apply(Integer v) throws Exception {
  7. return Observable.range(v, 2);
  8. }
  9. }, 16, true)
  10. .test()
  11. .assertResult(1, 2);
  12. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void dispose2() {
  3. TestHelper.checkDisposed(Observable.<Integer>just(1).hide()
  4. .concatMapDelayError(new Function<Integer, ObservableSource<Integer>>() {
  5. @Override
  6. public ObservableSource<Integer> apply(Integer v) throws Exception {
  7. return Observable.error(new TestException());
  8. }
  9. }));
  10. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void innerErrorDelayError() {
  3. Observable.<Integer>just(1).hide()
  4. .concatMapDelayError(new Function<Integer, ObservableSource<Integer>>() {
  5. @Override
  6. public ObservableSource<Integer> apply(Integer v) throws Exception {
  7. return Observable.error(new TestException());
  8. }
  9. })
  10. .test()
  11. .assertFailure(TestException.class);
  12. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void concatMapDelayErrorWithError() {
  3. Observable.just(Observable.just(1).concatWith(Observable.<Integer>error(new TestException())), Observable.just(2))
  4. .concatMapDelayError(Functions.<Observable<Integer>>identity())
  5. .test()
  6. .assertFailure(TestException.class, 1, 2);
  7. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void innerErrorDelayError2() {
  3. Observable.<Integer>just(1).hide()
  4. .concatMapDelayError(new Function<Integer, ObservableSource<Integer>>() {
  5. @Override
  6. public ObservableSource<Integer> apply(Integer v) throws Exception {
  7. return Observable.fromCallable(new Callable<Integer>() {
  8. @Override
  9. public Integer call() throws Exception {
  10. throw new TestException();
  11. }
  12. });
  13. }
  14. })
  15. .test()
  16. .assertFailure(TestException.class);
  17. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void concatMapDelayErrorEmptySource() {
  3. assertSame(Observable.empty(), Observable.<Object>empty()
  4. .concatMapDelayError(new Function<Object, ObservableSource<Integer>>() {
  5. @Override
  6. public ObservableSource<Integer> apply(Object v) throws Exception {
  7. return Observable.just(1);
  8. }
  9. }, 16, true));
  10. }

代码示例来源:origin: ReactiveX/RxJava

  1. /**
  2. * Concatenates elements of each ObservableSource provided via an Iterable sequence into a single sequence
  3. * of elements without interleaving them.
  4. * <p>
  5. * <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concat.png" alt="">
  6. * <dl>
  7. * <dt><b>Scheduler:</b></dt>
  8. * <dd>{@code concat} does not operate by default on a particular {@link Scheduler}.</dd>
  9. * </dl>
  10. * @param <T> the common value type of the sources
  11. * @param sources the Iterable sequence of ObservableSources
  12. * @return the new Observable instance
  13. */
  14. @SuppressWarnings({ "unchecked", "rawtypes" })
  15. @CheckReturnValue
  16. @NonNull
  17. @SchedulerSupport(SchedulerSupport.NONE)
  18. public static <T> Observable<T> concat(Iterable<? extends ObservableSource<? extends T>> sources) {
  19. ObjectHelper.requireNonNull(sources, "sources is null");
  20. return fromIterable(sources).concatMapDelayError((Function)Functions.identity(), bufferSize(), false);
  21. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void mainErrorDelayed() {
  3. Observable.<Integer>error(new TestException())
  4. .concatMapDelayError(new Function<Integer, ObservableSource<Integer>>() {
  5. @Override
  6. public ObservableSource<Integer> apply(Integer v) throws Exception {
  7. return Observable.range(v, 2);
  8. }
  9. })
  10. .test()
  11. .assertFailure(TestException.class);
  12. }

代码示例来源:origin: redisson/redisson

  1. /**
  2. * Concatenates elements of each ObservableSource provided via an Iterable sequence into a single sequence
  3. * of elements without interleaving them.
  4. * <p>
  5. * <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concat.png" alt="">
  6. * <dl>
  7. * <dt><b>Scheduler:</b></dt>
  8. * <dd>{@code concat} does not operate by default on a particular {@link Scheduler}.</dd>
  9. * </dl>
  10. * @param <T> the common value type of the sources
  11. * @param sources the Iterable sequence of ObservableSources
  12. * @return the new Observable instance
  13. */
  14. @SuppressWarnings({ "unchecked", "rawtypes" })
  15. @CheckReturnValue
  16. @SchedulerSupport(SchedulerSupport.NONE)
  17. public static <T> Observable<T> concat(Iterable<? extends ObservableSource<? extends T>> sources) {
  18. ObjectHelper.requireNonNull(sources, "sources is null");
  19. return fromIterable(sources).concatMapDelayError((Function)Functions.identity(), bufferSize(), false);
  20. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void badInnerDelayError() {
  3. @SuppressWarnings("rawtypes")
  4. final Observer[] o = { null };
  5. List<Throwable> errors = TestHelper.trackPluginErrors();
  6. try {
  7. Observable.just(1).hide()
  8. .concatMapDelayError(new Function<Integer, ObservableSource<Integer>>() {
  9. @Override
  10. public ObservableSource<Integer> apply(Integer v) throws Exception {
  11. return new Observable<Integer>() {
  12. @Override
  13. protected void subscribeActual(Observer<? super Integer> observer) {
  14. o[0] = observer;
  15. observer.onSubscribe(Disposables.empty());
  16. observer.onComplete();
  17. }
  18. };
  19. }
  20. })
  21. .test()
  22. .assertResult();
  23. o[0].onError(new TestException());
  24. TestHelper.assertUndeliverable(errors, 0, TestException.class);
  25. } finally {
  26. RxJavaPlugins.reset();
  27. }
  28. }

相关文章

Observable类方法