本文整理了Java中io.reactivex.Observable.concatWith()
方法的一些代码示例,展示了Observable.concatWith()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.concatWith()
方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:concatWith
[英]Returns an Observable that emits items from this Observable and when it completes normally, the other CompletableSource is subscribed to and the returned Observable emits its terminal events.
Scheduler: concatWith does not operate by default on a particular Scheduler.
[中]返回一个Observable,该Observable从该Observable发出项目,当它正常完成时,另一个CompletableSource被订阅,返回的Observable发出其终端事件。
调度程序:默认情况下,concatWith不会在特定调度程序上运行。
代码示例来源:origin: ReactiveX/RxJava
@Override
public ObservableSource<Integer> apply(Observable<Integer> v) throws Exception {
return v.concatWith(v);
}
})
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testConcatWith() {
TestObserver<Integer> to = new TestObserver<Integer>();
Observable.just(1).concatWith(Observable.just(2)).subscribe(to);
to.assertValues(1, 2);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void concatObservableDelayErrorTillEnd() {
Observable.concatDelayError(
Observable.just(Observable.just(1), Observable.just(2),
Observable.just(3).concatWith(Observable.<Integer>error(new TestException())),
Observable.just(4)), 2, true)
.test()
.assertFailure(TestException.class, 1, 2, 3, 4);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void concatObservableDelayErrorWithError() {
Observable.concatDelayError(
Observable.just(Observable.just(1), Observable.just(2),
Observable.just(3).concatWith(Observable.<Integer>error(new TestException())),
Observable.just(4)))
.test()
.assertFailure(TestException.class, 1, 2, 3, 4);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void concatObservableDelayErrorBoundary() {
Observable.concatDelayError(
Observable.just(Observable.just(1), Observable.just(2),
Observable.just(3).concatWith(Observable.<Integer>error(new TestException())),
Observable.just(4)), 2, false)
.test()
.assertFailure(TestException.class, 1, 2, 3);
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test
public void concatArrayDelayErrorWithError() {
Observable.concatArrayDelayError(Observable.just(1), Observable.just(2),
Observable.just(3).concatWith(Observable.<Integer>error(new TestException())),
Observable.just(4))
.test()
.assertFailure(TestException.class, 1, 2, 3, 4);
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test
public void mergeIterableDelayErrorWithError() {
Observable.mergeDelayError(
Arrays.asList(Observable.just(1).concatWith(Observable.<Integer>error(new TestException())),
Observable.just(2)))
.test()
.assertFailure(TestException.class, 1, 2);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void skipLastTimedCustomSchedulerDelayError() {
Observable.just(1).concatWith(Observable.just(2).delay(500, TimeUnit.MILLISECONDS))
.skipLast(300, TimeUnit.MILLISECONDS, Schedulers.io(), true)
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(1);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void concatMapDelayErrorWithError() {
Observable.just(Observable.just(1).concatWith(Observable.<Integer>error(new TestException())), Observable.just(2))
.concatMapDelayError(Functions.<Observable<Integer>>identity())
.test()
.assertFailure(TestException.class, 1, 2);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void skipLastTimedDefaultScheduler() {
Observable.just(1).concatWith(Observable.just(2).delay(500, TimeUnit.MILLISECONDS))
.skipLast(300, TimeUnit.MILLISECONDS)
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(1);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void takeLastTimeDelayErrorCustomScheduler() {
Observable.just(1, 2).concatWith(Observable.<Integer>error(new TestException()))
.takeLast(1, TimeUnit.MINUTES, Schedulers.io(), true)
.test()
.assertFailure(TestException.class, 1, 2);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testConcatOuterBackpressure() {
assertEquals(1,
(int) Observable.<Integer> empty()
.concatWith(Observable.just(1))
.take(1)
.blockingSingle());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void normal() {
final TestObserver<Integer> to = new TestObserver<Integer>();
Observable.range(1, 5)
.concatWith(Single.just(100))
.subscribe(to);
to.assertResult(1, 2, 3, 4, 5, 100);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void otherError() {
final TestObserver<Integer> to = new TestObserver<Integer>();
Observable.range(1, 5)
.concatWith(Completable.error(new TestException()))
.subscribe(to);
to.assertFailure(TestException.class, 1, 2, 3, 4, 5);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void otherError() {
final TestObserver<Integer> to = new TestObserver<Integer>();
Observable.range(1, 5)
.concatWith(Single.<Integer>error(new TestException()))
.subscribe(to);
to.assertFailure(TestException.class, 1, 2, 3, 4, 5);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void replayIsUnsubscribed() {
ConnectableObservable<Integer> co = Observable.just(1).concatWith(Observable.<Integer>never())
.replay();
if (co instanceof Disposable) {
assertTrue(((Disposable)co).isDisposed());
Disposable connection = co.connect();
assertFalse(((Disposable)co).isDisposed());
connection.dispose();
assertTrue(((Disposable)co).isDisposed());
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void cancelOther() {
SingleSubject<Object> other = SingleSubject.create();
TestObserver<Object> to = Observable.empty()
.concatWith(other)
.test();
assertTrue(other.hasObservers());
to.cancel();
assertFalse(other.hasObservers());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void cancelOther() {
CompletableSubject other = CompletableSubject.create();
TestObserver<Object> to = Observable.empty()
.concatWith(other)
.test();
assertTrue(other.hasObservers());
to.cancel();
assertFalse(other.hasObservers());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void cancelOther() {
MaybeSubject<Object> other = MaybeSubject.create();
TestObserver<Object> to = Observable.empty()
.concatWith(other)
.test();
assertTrue(other.hasObservers());
to.cancel();
assertFalse(other.hasObservers());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void observers() {
PublishSubject<Integer> ps = PublishSubject.create();
ObservableCache<Integer> cache = (ObservableCache<Integer>)Observable.range(1, 5).concatWith(ps).cache();
assertFalse(cache.hasObservers());
assertEquals(0, cache.cachedEventCount());
TestObserver<Integer> to = cache.test();
assertTrue(cache.hasObservers());
assertEquals(5, cache.cachedEventCount());
ps.onComplete();
to.assertResult(1, 2, 3, 4, 5);
}
内容来源于网络,如有侵权,请联系作者删除!