io.reactivex.Observable.concatDelayError()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(10.5k)|赞(0)|评价(0)|浏览(147)

本文整理了Java中io.reactivex.Observable.concatDelayError()方法的一些代码示例,展示了Observable.concatDelayError()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.concatDelayError()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:concatDelayError

Observable.concatDelayError介绍

[英]Concatenates the ObservableSource sequence of ObservableSources into a single sequence by subscribing to each inner ObservableSource, one after the other, one at a time and delays any errors till the all inner and the outer ObservableSources terminate.

Scheduler: concatDelayError does not operate by default on a particular Scheduler.
[中]通过逐个订阅每个内部可观测资源,一次一个地将可观测资源的可观测资源序列连接成单个序列,并延迟任何错误,直到所有内部和外部可观测资源终止。
调度程序:默认情况下,concatDelayError不会在特定调度程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

  1. /**
  2. * Concatenates the ObservableSource sequence of ObservableSources into a single sequence by subscribing to each inner ObservableSource,
  3. * one after the other, one at a time and delays any errors till the all inner and the outer ObservableSources terminate.
  4. * <p>
  5. * <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concatDelayError.png" alt="">
  6. * <dl>
  7. * <dt><b>Scheduler:</b></dt>
  8. * <dd>{@code concatDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
  9. * </dl>
  10. *
  11. * @param <T> the common element base type
  12. * @param sources the ObservableSource sequence of ObservableSources
  13. * @return the new ObservableSource with the concatenating behavior
  14. */
  15. @CheckReturnValue
  16. @SchedulerSupport(SchedulerSupport.NONE)
  17. public static <T> Observable<T> concatDelayError(ObservableSource<? extends ObservableSource<? extends T>> sources) {
  18. return concatDelayError(sources, bufferSize(), true);
  19. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void concatObservableDelayError() {
  3. Observable.concatDelayError(
  4. Observable.just(Observable.just(1), Observable.just(2),
  5. Observable.just(3), Observable.just(4)))
  6. .test()
  7. .assertResult(1, 2, 3, 4);
  8. }

代码示例来源:origin: ReactiveX/RxJava

  1. @SuppressWarnings("unchecked")
  2. @Test
  3. public void concatIterableDelayError() {
  4. Observable.concatDelayError(
  5. Arrays.asList(Observable.just(1), Observable.just(2),
  6. Observable.just(3), Observable.just(4)))
  7. .test()
  8. .assertResult(1, 2, 3, 4);
  9. }

代码示例来源:origin: ReactiveX/RxJava

  1. @SuppressWarnings("unchecked")
  2. @Test
  3. public void concatIterableDelayErrorWithError() {
  4. Observable.concatDelayError(
  5. Arrays.asList(Observable.just(1), Observable.just(2),
  6. Observable.just(3).concatWith(Observable.<Integer>error(new TestException())),
  7. Observable.just(4)))
  8. .test()
  9. .assertFailure(TestException.class, 1, 2, 3, 4);
  10. }

代码示例来源:origin: redisson/redisson

  1. /**
  2. * Concatenates the ObservableSource sequence of ObservableSources into a single sequence by subscribing to each inner ObservableSource,
  3. * one after the other, one at a time and delays any errors till the all inner and the outer ObservableSources terminate.
  4. * <p>
  5. * <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concatDelayError.png" alt="">
  6. * <dl>
  7. * <dt><b>Scheduler:</b></dt>
  8. * <dd>{@code concatDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
  9. * </dl>
  10. *
  11. * @param <T> the common element base type
  12. * @param sources the ObservableSource sequence of ObservableSources
  13. * @return the new ObservableSource with the concatenating behavior
  14. */
  15. @CheckReturnValue
  16. @SchedulerSupport(SchedulerSupport.NONE)
  17. public static <T> Observable<T> concatDelayError(ObservableSource<? extends ObservableSource<? extends T>> sources) {
  18. return concatDelayError(sources, bufferSize(), true);
  19. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void concatObservableDelayErrorTillEnd() {
  3. Observable.concatDelayError(
  4. Observable.just(Observable.just(1), Observable.just(2),
  5. Observable.just(3).concatWith(Observable.<Integer>error(new TestException())),
  6. Observable.just(4)), 2, true)
  7. .test()
  8. .assertFailure(TestException.class, 1, 2, 3, 4);
  9. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void concatObservableDelayErrorWithError() {
  3. Observable.concatDelayError(
  4. Observable.just(Observable.just(1), Observable.just(2),
  5. Observable.just(3).concatWith(Observable.<Integer>error(new TestException())),
  6. Observable.just(4)))
  7. .test()
  8. .assertFailure(TestException.class, 1, 2, 3, 4);
  9. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void concatObservableDelayErrorBoundary() {
  3. Observable.concatDelayError(
  4. Observable.just(Observable.just(1), Observable.just(2),
  5. Observable.just(3).concatWith(Observable.<Integer>error(new TestException())),
  6. Observable.just(4)), 2, false)
  7. .test()
  8. .assertFailure(TestException.class, 1, 2, 3);
  9. }

代码示例来源:origin: ReactiveX/RxJava

  1. /**
  2. * Concatenates the Iterable sequence of ObservableSources into a single sequence by subscribing to each ObservableSource,
  3. * one after the other, one at a time and delays any errors till the all inner ObservableSources terminate.
  4. * <p>
  5. * <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concatDelayError.png" alt="">
  6. * <dl>
  7. * <dt><b>Scheduler:</b></dt>
  8. * <dd>{@code concatDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
  9. * </dl>
  10. *
  11. * @param <T> the common element base type
  12. * @param sources the Iterable sequence of ObservableSources
  13. * @return the new ObservableSource with the concatenating behavior
  14. */
  15. @CheckReturnValue
  16. @NonNull
  17. @SchedulerSupport(SchedulerSupport.NONE)
  18. public static <T> Observable<T> concatDelayError(Iterable<? extends ObservableSource<? extends T>> sources) {
  19. ObjectHelper.requireNonNull(sources, "sources is null");
  20. return concatDelayError(fromIterable(sources));
  21. }

代码示例来源:origin: ReactiveX/RxJava

  1. /**
  2. * Concatenates a variable number of ObservableSource sources and delays errors from any of them
  3. * till all terminate.
  4. * <p>
  5. * <img width="640" height="290" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concatArray.png" alt="">
  6. * <dl>
  7. * <dt><b>Scheduler:</b></dt>
  8. * <dd>{@code concatArrayDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
  9. * </dl>
  10. * @param sources the array of sources
  11. * @param <T> the common base value type
  12. * @return the new Observable instance
  13. * @throws NullPointerException if sources is null
  14. */
  15. @SuppressWarnings({ "unchecked" })
  16. @CheckReturnValue
  17. @SchedulerSupport(SchedulerSupport.NONE)
  18. public static <T> Observable<T> concatArrayDelayError(ObservableSource<? extends T>... sources) {
  19. if (sources.length == 0) {
  20. return empty();
  21. } else
  22. if (sources.length == 1) {
  23. return (Observable<T>)wrap(sources[0]);
  24. }
  25. return concatDelayError(fromArray(sources));
  26. }

代码示例来源:origin: redisson/redisson

  1. /**
  2. * Concatenates the Iterable sequence of ObservableSources into a single sequence by subscribing to each ObservableSource,
  3. * one after the other, one at a time and delays any errors till the all inner ObservableSources terminate.
  4. * <p>
  5. * <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concatDelayError.png" alt="">
  6. * <dl>
  7. * <dt><b>Scheduler:</b></dt>
  8. * <dd>{@code concatDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
  9. * </dl>
  10. *
  11. * @param <T> the common element base type
  12. * @param sources the Iterable sequence of ObservableSources
  13. * @return the new ObservableSource with the concatenating behavior
  14. */
  15. @CheckReturnValue
  16. @SchedulerSupport(SchedulerSupport.NONE)
  17. public static <T> Observable<T> concatDelayError(Iterable<? extends ObservableSource<? extends T>> sources) {
  18. ObjectHelper.requireNonNull(sources, "sources is null");
  19. return concatDelayError(fromIterable(sources));
  20. }

代码示例来源:origin: redisson/redisson

  1. /**
  2. * Concatenates a variable number of ObservableSource sources and delays errors from any of them
  3. * till all terminate.
  4. * <p>
  5. * <img width="640" height="290" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concatArray.png" alt="">
  6. * <dl>
  7. * <dt><b>Scheduler:</b></dt>
  8. * <dd>{@code concatArrayDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
  9. * </dl>
  10. * @param sources the array of sources
  11. * @param <T> the common base value type
  12. * @return the new Observable instance
  13. * @throws NullPointerException if sources is null
  14. */
  15. @SuppressWarnings({ "unchecked" })
  16. @CheckReturnValue
  17. @SchedulerSupport(SchedulerSupport.NONE)
  18. public static <T> Observable<T> concatArrayDelayError(ObservableSource<? extends T>... sources) {
  19. if (sources.length == 0) {
  20. return empty();
  21. } else
  22. if (sources.length == 1) {
  23. return (Observable<T>)wrap(sources[0]);
  24. }
  25. return concatDelayError(fromArray(sources));
  26. }

代码示例来源:origin: ReactiveX/RxJava

  1. @SuppressWarnings("unchecked")
  2. @Test
  3. public void noSubsequentSubscriptionDelayErrorIterable() {
  4. final int[] calls = { 0 };
  5. Observable<Integer> source = Observable.create(new ObservableOnSubscribe<Integer>() {
  6. @Override
  7. public void subscribe(ObservableEmitter<Integer> s) throws Exception {
  8. calls[0]++;
  9. s.onNext(1);
  10. s.onComplete();
  11. }
  12. });
  13. Observable.concatDelayError(Arrays.asList(source, source)).firstElement()
  14. .test()
  15. .assertResult(1);
  16. assertEquals(1, calls[0]);
  17. }

代码示例来源:origin: WallaceXiao/StockChart-MPAndroidChart

  1. @Override
  2. public <T> Observable<CacheResult<T>> execute(RxCache rxCache, String key, long time, Observable<T> source, Type type) {
  3. Observable<CacheResult<T>> cache = loadCache(rxCache, type, key, time, true);
  4. Observable<CacheResult<T>> remote = loadRemote(rxCache, key, source, false);
  5. //return remote.switchIfEmpty(cache);
  6. return Observable
  7. .concatDelayError(Arrays.asList(remote, cache))
  8. .take(1);
  9. }
  10. }

代码示例来源:origin: fengzhizi715/RxCache

  1. @Override
  2. public <T> Observable<Record<T>> execute(RxCache rxCache, String key, Observable<T> source, Type type) {
  3. Observable<Record<T>> cache = rxCache.<T>load2Observable(key, type);
  4. Observable<Record<T>> remote = source
  5. .map(new Function<T, Record<T>>() {
  6. @Override
  7. public Record<T> apply(@NonNull T t) throws Exception {
  8. rxCache.save(key, t);
  9. return new Record<>(Source.CLOUD, key, t);
  10. }
  11. });
  12. return Observable.concatDelayError(Arrays.asList(cache, remote))
  13. .filter(new Predicate<Record<T>>() {
  14. @Override
  15. public boolean test(@NonNull Record<T> record) throws Exception {
  16. return record.getData() != null;
  17. }
  18. });
  19. }
  20. }

代码示例来源:origin: akarnokd/akarnokd-misc

  1. @Test
  2. public void publisherOfPublisherDelayErrorX2() {
  3. Observable.concatDelayError(
  4. Observable.just(
  5. Observable.just(1, 2),
  6. Observable.error(new Exception("test")),
  7. Observable.just(3, 4)))
  8. .test()
  9. .assertFailure(Exception.class, 1, 2, 3, 4);
  10. }
  11. }

相关文章

Observable类方法