io.reactivex.Observable.concatDelayError()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(10.5k)|赞(0)|评价(0)|浏览(120)

本文整理了Java中io.reactivex.Observable.concatDelayError()方法的一些代码示例,展示了Observable.concatDelayError()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.concatDelayError()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:concatDelayError

Observable.concatDelayError介绍

[英]Concatenates the ObservableSource sequence of ObservableSources into a single sequence by subscribing to each inner ObservableSource, one after the other, one at a time and delays any errors till the all inner and the outer ObservableSources terminate.

Scheduler: concatDelayError does not operate by default on a particular Scheduler.
[中]通过逐个订阅每个内部可观测资源,一次一个地将可观测资源的可观测资源序列连接成单个序列,并延迟任何错误,直到所有内部和外部可观测资源终止。
调度程序:默认情况下,concatDelayError不会在特定调度程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

/**
 * Concatenates the ObservableSource sequence of ObservableSources into a single sequence by subscribing to each inner ObservableSource,
 * one after the other, one at a time and delays any errors till the all inner and the outer ObservableSources terminate.
 * <p>
 * <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concatDelayError.png" alt="">
 * <dl>
 *  <dt><b>Scheduler:</b></dt>
 *  <dd>{@code concatDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
 * </dl>
 *
 * @param <T> the common element base type
 * @param sources the ObservableSource sequence of ObservableSources
 * @return the new ObservableSource with the concatenating behavior
 */
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> concatDelayError(ObservableSource<? extends ObservableSource<? extends T>> sources) {
  return concatDelayError(sources, bufferSize(), true);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void concatObservableDelayError() {
  Observable.concatDelayError(
      Observable.just(Observable.just(1), Observable.just(2),
      Observable.just(3), Observable.just(4)))
  .test()
  .assertResult(1, 2, 3, 4);
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unchecked")
@Test
public void concatIterableDelayError() {
  Observable.concatDelayError(
      Arrays.asList(Observable.just(1), Observable.just(2),
      Observable.just(3), Observable.just(4)))
  .test()
  .assertResult(1, 2, 3, 4);
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unchecked")
@Test
public void concatIterableDelayErrorWithError() {
  Observable.concatDelayError(
      Arrays.asList(Observable.just(1), Observable.just(2),
      Observable.just(3).concatWith(Observable.<Integer>error(new TestException())),
      Observable.just(4)))
  .test()
  .assertFailure(TestException.class, 1, 2, 3, 4);
}

代码示例来源:origin: redisson/redisson

/**
 * Concatenates the ObservableSource sequence of ObservableSources into a single sequence by subscribing to each inner ObservableSource,
 * one after the other, one at a time and delays any errors till the all inner and the outer ObservableSources terminate.
 * <p>
 * <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concatDelayError.png" alt="">
 * <dl>
 *  <dt><b>Scheduler:</b></dt>
 *  <dd>{@code concatDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
 * </dl>
 *
 * @param <T> the common element base type
 * @param sources the ObservableSource sequence of ObservableSources
 * @return the new ObservableSource with the concatenating behavior
 */
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> concatDelayError(ObservableSource<? extends ObservableSource<? extends T>> sources) {
  return concatDelayError(sources, bufferSize(), true);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void concatObservableDelayErrorTillEnd() {
  Observable.concatDelayError(
      Observable.just(Observable.just(1), Observable.just(2),
      Observable.just(3).concatWith(Observable.<Integer>error(new TestException())),
      Observable.just(4)), 2, true)
  .test()
  .assertFailure(TestException.class, 1, 2, 3, 4);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void concatObservableDelayErrorWithError() {
  Observable.concatDelayError(
      Observable.just(Observable.just(1), Observable.just(2),
      Observable.just(3).concatWith(Observable.<Integer>error(new TestException())),
      Observable.just(4)))
  .test()
  .assertFailure(TestException.class, 1, 2, 3, 4);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void concatObservableDelayErrorBoundary() {
  Observable.concatDelayError(
      Observable.just(Observable.just(1), Observable.just(2),
      Observable.just(3).concatWith(Observable.<Integer>error(new TestException())),
      Observable.just(4)), 2, false)
  .test()
  .assertFailure(TestException.class, 1, 2, 3);
}

代码示例来源:origin: ReactiveX/RxJava

/**
 * Concatenates the Iterable sequence of ObservableSources into a single sequence by subscribing to each ObservableSource,
 * one after the other, one at a time and delays any errors till the all inner ObservableSources terminate.
 * <p>
 * <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concatDelayError.png" alt="">
 * <dl>
 *  <dt><b>Scheduler:</b></dt>
 *  <dd>{@code concatDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
 * </dl>
 *
 * @param <T> the common element base type
 * @param sources the Iterable sequence of ObservableSources
 * @return the new ObservableSource with the concatenating behavior
 */
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> concatDelayError(Iterable<? extends ObservableSource<? extends T>> sources) {
  ObjectHelper.requireNonNull(sources, "sources is null");
  return concatDelayError(fromIterable(sources));
}

代码示例来源:origin: ReactiveX/RxJava

/**
 * Concatenates a variable number of ObservableSource sources and delays errors from any of them
 * till all terminate.
 * <p>
 * <img width="640" height="290" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concatArray.png" alt="">
 * <dl>
 *  <dt><b>Scheduler:</b></dt>
 *  <dd>{@code concatArrayDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
 * </dl>
 * @param sources the array of sources
 * @param <T> the common base value type
 * @return the new Observable instance
 * @throws NullPointerException if sources is null
 */
@SuppressWarnings({ "unchecked" })
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> concatArrayDelayError(ObservableSource<? extends T>... sources) {
  if (sources.length == 0) {
    return empty();
  } else
  if (sources.length == 1) {
    return (Observable<T>)wrap(sources[0]);
  }
  return concatDelayError(fromArray(sources));
}

代码示例来源:origin: redisson/redisson

/**
 * Concatenates the Iterable sequence of ObservableSources into a single sequence by subscribing to each ObservableSource,
 * one after the other, one at a time and delays any errors till the all inner ObservableSources terminate.
 * <p>
 * <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concatDelayError.png" alt="">
 * <dl>
 *  <dt><b>Scheduler:</b></dt>
 *  <dd>{@code concatDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
 * </dl>
 *
 * @param <T> the common element base type
 * @param sources the Iterable sequence of ObservableSources
 * @return the new ObservableSource with the concatenating behavior
 */
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> concatDelayError(Iterable<? extends ObservableSource<? extends T>> sources) {
  ObjectHelper.requireNonNull(sources, "sources is null");
  return concatDelayError(fromIterable(sources));
}

代码示例来源:origin: redisson/redisson

/**
 * Concatenates a variable number of ObservableSource sources and delays errors from any of them
 * till all terminate.
 * <p>
 * <img width="640" height="290" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concatArray.png" alt="">
 * <dl>
 *  <dt><b>Scheduler:</b></dt>
 *  <dd>{@code concatArrayDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
 * </dl>
 * @param sources the array of sources
 * @param <T> the common base value type
 * @return the new Observable instance
 * @throws NullPointerException if sources is null
 */
@SuppressWarnings({ "unchecked" })
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> concatArrayDelayError(ObservableSource<? extends T>... sources) {
  if (sources.length == 0) {
    return empty();
  } else
  if (sources.length == 1) {
    return (Observable<T>)wrap(sources[0]);
  }
  return concatDelayError(fromArray(sources));
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unchecked")
@Test
public void noSubsequentSubscriptionDelayErrorIterable() {
  final int[] calls = { 0 };
  Observable<Integer> source = Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> s) throws Exception {
      calls[0]++;
      s.onNext(1);
      s.onComplete();
    }
  });
  Observable.concatDelayError(Arrays.asList(source, source)).firstElement()
  .test()
  .assertResult(1);
  assertEquals(1, calls[0]);
}

代码示例来源:origin: WallaceXiao/StockChart-MPAndroidChart

@Override
  public <T> Observable<CacheResult<T>> execute(RxCache rxCache, String key, long time, Observable<T> source, Type type) {
    Observable<CacheResult<T>> cache = loadCache(rxCache, type, key, time, true);
    Observable<CacheResult<T>> remote = loadRemote(rxCache, key, source, false);
    //return remote.switchIfEmpty(cache);
    return Observable
        .concatDelayError(Arrays.asList(remote, cache))
        .take(1);
  }
}

代码示例来源:origin: fengzhizi715/RxCache

@Override
  public <T> Observable<Record<T>> execute(RxCache rxCache, String key, Observable<T> source, Type type) {

    Observable<Record<T>> cache = rxCache.<T>load2Observable(key, type);

    Observable<Record<T>> remote = source
        .map(new Function<T, Record<T>>() {
          @Override
          public Record<T> apply(@NonNull T t) throws Exception {

            rxCache.save(key, t);

            return new Record<>(Source.CLOUD, key, t);
          }
        });

    return Observable.concatDelayError(Arrays.asList(cache, remote))
        .filter(new Predicate<Record<T>>() {
          @Override
          public boolean test(@NonNull Record<T> record) throws Exception {
            return record.getData() != null;
          }
        });
  }
}

代码示例来源:origin: akarnokd/akarnokd-misc

@Test
  public void publisherOfPublisherDelayErrorX2() {

    Observable.concatDelayError(
        Observable.just(
            Observable.just(1, 2),
            Observable.error(new Exception("test")),
            Observable.just(3, 4)))
    .test()
    .assertFailure(Exception.class, 1, 2, 3, 4);
  }
}

相关文章

Observable类方法