io.reactivex.Observable.concatWith()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(6.3k)|赞(0)|评价(0)|浏览(149)

本文整理了Java中io.reactivex.Observable.concatWith()方法的一些代码示例,展示了Observable.concatWith()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.concatWith()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:concatWith

Observable.concatWith介绍

[英]Returns an Observable that emits items from this Observable and when it completes normally, the other CompletableSource is subscribed to and the returned Observable emits its terminal events.

Scheduler: concatWith does not operate by default on a particular Scheduler.
[中]返回一个Observable,该Observable从该Observable发出项目,当它正常完成时,另一个CompletableSource被订阅,返回的Observable发出其终端事件。
调度程序:默认情况下,concatWith不会在特定调度程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public ObservableSource<Integer> apply(Observable<Integer> v) throws Exception {
  3. return v.concatWith(v);
  4. }
  5. })

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testConcatWith() {
  3. TestObserver<Integer> to = new TestObserver<Integer>();
  4. Observable.just(1).concatWith(Observable.just(2)).subscribe(to);
  5. to.assertValues(1, 2);
  6. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void concatObservableDelayErrorTillEnd() {
  3. Observable.concatDelayError(
  4. Observable.just(Observable.just(1), Observable.just(2),
  5. Observable.just(3).concatWith(Observable.<Integer>error(new TestException())),
  6. Observable.just(4)), 2, true)
  7. .test()
  8. .assertFailure(TestException.class, 1, 2, 3, 4);
  9. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void concatObservableDelayErrorWithError() {
  3. Observable.concatDelayError(
  4. Observable.just(Observable.just(1), Observable.just(2),
  5. Observable.just(3).concatWith(Observable.<Integer>error(new TestException())),
  6. Observable.just(4)))
  7. .test()
  8. .assertFailure(TestException.class, 1, 2, 3, 4);
  9. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void concatObservableDelayErrorBoundary() {
  3. Observable.concatDelayError(
  4. Observable.just(Observable.just(1), Observable.just(2),
  5. Observable.just(3).concatWith(Observable.<Integer>error(new TestException())),
  6. Observable.just(4)), 2, false)
  7. .test()
  8. .assertFailure(TestException.class, 1, 2, 3);
  9. }

代码示例来源:origin: ReactiveX/RxJava

  1. @SuppressWarnings("unchecked")
  2. @Test
  3. public void concatArrayDelayErrorWithError() {
  4. Observable.concatArrayDelayError(Observable.just(1), Observable.just(2),
  5. Observable.just(3).concatWith(Observable.<Integer>error(new TestException())),
  6. Observable.just(4))
  7. .test()
  8. .assertFailure(TestException.class, 1, 2, 3, 4);
  9. }

代码示例来源:origin: ReactiveX/RxJava

  1. @SuppressWarnings("unchecked")
  2. @Test
  3. public void mergeIterableDelayErrorWithError() {
  4. Observable.mergeDelayError(
  5. Arrays.asList(Observable.just(1).concatWith(Observable.<Integer>error(new TestException())),
  6. Observable.just(2)))
  7. .test()
  8. .assertFailure(TestException.class, 1, 2);
  9. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void skipLastTimedCustomSchedulerDelayError() {
  3. Observable.just(1).concatWith(Observable.just(2).delay(500, TimeUnit.MILLISECONDS))
  4. .skipLast(300, TimeUnit.MILLISECONDS, Schedulers.io(), true)
  5. .test()
  6. .awaitDone(5, TimeUnit.SECONDS)
  7. .assertResult(1);
  8. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void concatMapDelayErrorWithError() {
  3. Observable.just(Observable.just(1).concatWith(Observable.<Integer>error(new TestException())), Observable.just(2))
  4. .concatMapDelayError(Functions.<Observable<Integer>>identity())
  5. .test()
  6. .assertFailure(TestException.class, 1, 2);
  7. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void skipLastTimedDefaultScheduler() {
  3. Observable.just(1).concatWith(Observable.just(2).delay(500, TimeUnit.MILLISECONDS))
  4. .skipLast(300, TimeUnit.MILLISECONDS)
  5. .test()
  6. .awaitDone(5, TimeUnit.SECONDS)
  7. .assertResult(1);
  8. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void takeLastTimeDelayErrorCustomScheduler() {
  3. Observable.just(1, 2).concatWith(Observable.<Integer>error(new TestException()))
  4. .takeLast(1, TimeUnit.MINUTES, Schedulers.io(), true)
  5. .test()
  6. .assertFailure(TestException.class, 1, 2);
  7. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testConcatOuterBackpressure() {
  3. assertEquals(1,
  4. (int) Observable.<Integer> empty()
  5. .concatWith(Observable.just(1))
  6. .take(1)
  7. .blockingSingle());
  8. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void normal() {
  3. final TestObserver<Integer> to = new TestObserver<Integer>();
  4. Observable.range(1, 5)
  5. .concatWith(Single.just(100))
  6. .subscribe(to);
  7. to.assertResult(1, 2, 3, 4, 5, 100);
  8. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void otherError() {
  3. final TestObserver<Integer> to = new TestObserver<Integer>();
  4. Observable.range(1, 5)
  5. .concatWith(Completable.error(new TestException()))
  6. .subscribe(to);
  7. to.assertFailure(TestException.class, 1, 2, 3, 4, 5);
  8. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void otherError() {
  3. final TestObserver<Integer> to = new TestObserver<Integer>();
  4. Observable.range(1, 5)
  5. .concatWith(Single.<Integer>error(new TestException()))
  6. .subscribe(to);
  7. to.assertFailure(TestException.class, 1, 2, 3, 4, 5);
  8. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void replayIsUnsubscribed() {
  3. ConnectableObservable<Integer> co = Observable.just(1).concatWith(Observable.<Integer>never())
  4. .replay();
  5. if (co instanceof Disposable) {
  6. assertTrue(((Disposable)co).isDisposed());
  7. Disposable connection = co.connect();
  8. assertFalse(((Disposable)co).isDisposed());
  9. connection.dispose();
  10. assertTrue(((Disposable)co).isDisposed());
  11. }
  12. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void cancelOther() {
  3. SingleSubject<Object> other = SingleSubject.create();
  4. TestObserver<Object> to = Observable.empty()
  5. .concatWith(other)
  6. .test();
  7. assertTrue(other.hasObservers());
  8. to.cancel();
  9. assertFalse(other.hasObservers());
  10. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void cancelOther() {
  3. CompletableSubject other = CompletableSubject.create();
  4. TestObserver<Object> to = Observable.empty()
  5. .concatWith(other)
  6. .test();
  7. assertTrue(other.hasObservers());
  8. to.cancel();
  9. assertFalse(other.hasObservers());
  10. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void cancelOther() {
  3. MaybeSubject<Object> other = MaybeSubject.create();
  4. TestObserver<Object> to = Observable.empty()
  5. .concatWith(other)
  6. .test();
  7. assertTrue(other.hasObservers());
  8. to.cancel();
  9. assertFalse(other.hasObservers());
  10. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void observers() {
  3. PublishSubject<Integer> ps = PublishSubject.create();
  4. ObservableCache<Integer> cache = (ObservableCache<Integer>)Observable.range(1, 5).concatWith(ps).cache();
  5. assertFalse(cache.hasObservers());
  6. assertEquals(0, cache.cachedEventCount());
  7. TestObserver<Integer> to = cache.test();
  8. assertTrue(cache.hasObservers());
  9. assertEquals(5, cache.cachedEventCount());
  10. ps.onComplete();
  11. to.assertResult(1, 2, 3, 4, 5);
  12. }

相关文章

Observable类方法