本文整理了Java中io.reactivex.Observable.concatDelayError()
方法的一些代码示例,展示了Observable.concatDelayError()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.concatDelayError()
方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:concatDelayError
[英]Concatenates the ObservableSource sequence of ObservableSources into a single sequence by subscribing to each inner ObservableSource, one after the other, one at a time and delays any errors till the all inner and the outer ObservableSources terminate.
Scheduler: concatDelayError does not operate by default on a particular Scheduler.
[中]通过逐个订阅每个内部可观测资源,一次一个地将可观测资源的可观测资源序列连接成单个序列,并延迟任何错误,直到所有内部和外部可观测资源终止。
调度程序:默认情况下,concatDelayError不会在特定调度程序上运行。
代码示例来源:origin: ReactiveX/RxJava
/**
* Concatenates the ObservableSource sequence of ObservableSources into a single sequence by subscribing to each inner ObservableSource,
* one after the other, one at a time and delays any errors till the all inner and the outer ObservableSources terminate.
* <p>
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concatDelayError.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code concatDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <T> the common element base type
* @param sources the ObservableSource sequence of ObservableSources
* @return the new ObservableSource with the concatenating behavior
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> concatDelayError(ObservableSource<? extends ObservableSource<? extends T>> sources) {
return concatDelayError(sources, bufferSize(), true);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void concatObservableDelayError() {
Observable.concatDelayError(
Observable.just(Observable.just(1), Observable.just(2),
Observable.just(3), Observable.just(4)))
.test()
.assertResult(1, 2, 3, 4);
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test
public void concatIterableDelayError() {
Observable.concatDelayError(
Arrays.asList(Observable.just(1), Observable.just(2),
Observable.just(3), Observable.just(4)))
.test()
.assertResult(1, 2, 3, 4);
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test
public void concatIterableDelayErrorWithError() {
Observable.concatDelayError(
Arrays.asList(Observable.just(1), Observable.just(2),
Observable.just(3).concatWith(Observable.<Integer>error(new TestException())),
Observable.just(4)))
.test()
.assertFailure(TestException.class, 1, 2, 3, 4);
}
代码示例来源:origin: redisson/redisson
/**
* Concatenates the ObservableSource sequence of ObservableSources into a single sequence by subscribing to each inner ObservableSource,
* one after the other, one at a time and delays any errors till the all inner and the outer ObservableSources terminate.
* <p>
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concatDelayError.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code concatDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <T> the common element base type
* @param sources the ObservableSource sequence of ObservableSources
* @return the new ObservableSource with the concatenating behavior
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> concatDelayError(ObservableSource<? extends ObservableSource<? extends T>> sources) {
return concatDelayError(sources, bufferSize(), true);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void concatObservableDelayErrorTillEnd() {
Observable.concatDelayError(
Observable.just(Observable.just(1), Observable.just(2),
Observable.just(3).concatWith(Observable.<Integer>error(new TestException())),
Observable.just(4)), 2, true)
.test()
.assertFailure(TestException.class, 1, 2, 3, 4);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void concatObservableDelayErrorWithError() {
Observable.concatDelayError(
Observable.just(Observable.just(1), Observable.just(2),
Observable.just(3).concatWith(Observable.<Integer>error(new TestException())),
Observable.just(4)))
.test()
.assertFailure(TestException.class, 1, 2, 3, 4);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void concatObservableDelayErrorBoundary() {
Observable.concatDelayError(
Observable.just(Observable.just(1), Observable.just(2),
Observable.just(3).concatWith(Observable.<Integer>error(new TestException())),
Observable.just(4)), 2, false)
.test()
.assertFailure(TestException.class, 1, 2, 3);
}
代码示例来源:origin: ReactiveX/RxJava
/**
* Concatenates the Iterable sequence of ObservableSources into a single sequence by subscribing to each ObservableSource,
* one after the other, one at a time and delays any errors till the all inner ObservableSources terminate.
* <p>
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concatDelayError.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code concatDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <T> the common element base type
* @param sources the Iterable sequence of ObservableSources
* @return the new ObservableSource with the concatenating behavior
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> concatDelayError(Iterable<? extends ObservableSource<? extends T>> sources) {
ObjectHelper.requireNonNull(sources, "sources is null");
return concatDelayError(fromIterable(sources));
}
代码示例来源:origin: ReactiveX/RxJava
/**
* Concatenates a variable number of ObservableSource sources and delays errors from any of them
* till all terminate.
* <p>
* <img width="640" height="290" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concatArray.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code concatArrayDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param sources the array of sources
* @param <T> the common base value type
* @return the new Observable instance
* @throws NullPointerException if sources is null
*/
@SuppressWarnings({ "unchecked" })
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> concatArrayDelayError(ObservableSource<? extends T>... sources) {
if (sources.length == 0) {
return empty();
} else
if (sources.length == 1) {
return (Observable<T>)wrap(sources[0]);
}
return concatDelayError(fromArray(sources));
}
代码示例来源:origin: redisson/redisson
/**
* Concatenates the Iterable sequence of ObservableSources into a single sequence by subscribing to each ObservableSource,
* one after the other, one at a time and delays any errors till the all inner ObservableSources terminate.
* <p>
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concatDelayError.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code concatDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <T> the common element base type
* @param sources the Iterable sequence of ObservableSources
* @return the new ObservableSource with the concatenating behavior
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> concatDelayError(Iterable<? extends ObservableSource<? extends T>> sources) {
ObjectHelper.requireNonNull(sources, "sources is null");
return concatDelayError(fromIterable(sources));
}
代码示例来源:origin: redisson/redisson
/**
* Concatenates a variable number of ObservableSource sources and delays errors from any of them
* till all terminate.
* <p>
* <img width="640" height="290" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concatArray.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code concatArrayDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param sources the array of sources
* @param <T> the common base value type
* @return the new Observable instance
* @throws NullPointerException if sources is null
*/
@SuppressWarnings({ "unchecked" })
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> concatArrayDelayError(ObservableSource<? extends T>... sources) {
if (sources.length == 0) {
return empty();
} else
if (sources.length == 1) {
return (Observable<T>)wrap(sources[0]);
}
return concatDelayError(fromArray(sources));
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test
public void noSubsequentSubscriptionDelayErrorIterable() {
final int[] calls = { 0 };
Observable<Integer> source = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> s) throws Exception {
calls[0]++;
s.onNext(1);
s.onComplete();
}
});
Observable.concatDelayError(Arrays.asList(source, source)).firstElement()
.test()
.assertResult(1);
assertEquals(1, calls[0]);
}
代码示例来源:origin: WallaceXiao/StockChart-MPAndroidChart
@Override
public <T> Observable<CacheResult<T>> execute(RxCache rxCache, String key, long time, Observable<T> source, Type type) {
Observable<CacheResult<T>> cache = loadCache(rxCache, type, key, time, true);
Observable<CacheResult<T>> remote = loadRemote(rxCache, key, source, false);
//return remote.switchIfEmpty(cache);
return Observable
.concatDelayError(Arrays.asList(remote, cache))
.take(1);
}
}
代码示例来源:origin: fengzhizi715/RxCache
@Override
public <T> Observable<Record<T>> execute(RxCache rxCache, String key, Observable<T> source, Type type) {
Observable<Record<T>> cache = rxCache.<T>load2Observable(key, type);
Observable<Record<T>> remote = source
.map(new Function<T, Record<T>>() {
@Override
public Record<T> apply(@NonNull T t) throws Exception {
rxCache.save(key, t);
return new Record<>(Source.CLOUD, key, t);
}
});
return Observable.concatDelayError(Arrays.asList(cache, remote))
.filter(new Predicate<Record<T>>() {
@Override
public boolean test(@NonNull Record<T> record) throws Exception {
return record.getData() != null;
}
});
}
}
代码示例来源:origin: akarnokd/akarnokd-misc
@Test
public void publisherOfPublisherDelayErrorX2() {
Observable.concatDelayError(
Observable.just(
Observable.just(1, 2),
Observable.error(new Exception("test")),
Observable.just(3, 4)))
.test()
.assertFailure(Exception.class, 1, 2, 3, 4);
}
}
内容来源于网络,如有侵权,请联系作者删除!