io.reactivex.Observable.test()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(5.7k)|赞(0)|评价(0)|浏览(141)

本文整理了Java中io.reactivex.Observable.test()方法的一些代码示例,展示了Observable.test()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.test()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:test

Observable.test介绍

[英]Creates a TestObserver and subscribes it to this Observable. Scheduler: test does not operate by default on a particular Scheduler.
[中]创建TestObserver并将其订阅到此Observable。调度程序:默认情况下,测试不会在特定调度程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void concat3() {
  3. Observable.concat(Observable.just(1), Observable.just(2), Observable.just(3))
  4. .test()
  5. .assertResult(1, 2, 3);
  6. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void concatObservableDelayErrorTillEnd() {
  3. Observable.concatDelayError(
  4. Observable.just(Observable.just(1), Observable.just(2),
  5. Observable.just(3).concatWith(Observable.<Integer>error(new TestException())),
  6. Observable.just(4)), 2, true)
  7. .test()
  8. .assertFailure(TestException.class, 1, 2, 3, 4);
  9. }

代码示例来源:origin: ReactiveX/RxJava

  1. @SuppressWarnings("unchecked")
  2. @Test
  3. public void ambIterableOrder() {
  4. Observable<Integer> error = Observable.error(new RuntimeException());
  5. Observable.amb(Arrays.asList(Observable.just(1), error)).test().assertValue(1).assertComplete();
  6. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void mapperThrowsDelayError() {
  3. Observable.just(1).hide()
  4. .concatMapDelayError(new Function<Integer, ObservableSource<Integer>>() {
  5. @Override
  6. public ObservableSource<Integer> apply(Integer v) throws Exception {
  7. throw new TestException();
  8. }
  9. })
  10. .test()
  11. .assertFailure(TestException.class);
  12. }

代码示例来源:origin: ReactiveX/RxJava

  1. @SuppressWarnings("unchecked")
  2. @Test
  3. public void bufferTimeSkipDefault() {
  4. Observable.range(1, 5).buffer(1, 1, TimeUnit.MINUTES)
  5. .test()
  6. .assertResult(Arrays.asList(1, 2, 3, 4, 5));
  7. }

代码示例来源:origin: ReactiveX/RxJava

  1. @SuppressWarnings("unchecked")
  2. @Test
  3. public void restartTimer() {
  4. Observable.range(1, 5)
  5. .buffer(1, TimeUnit.DAYS, Schedulers.single(), 2, Functions.<Integer>createArrayList(16), true)
  6. .test()
  7. .assertResult(Arrays.asList(1, 2), Arrays.asList(3, 4), Arrays.asList(5));
  8. }

代码示例来源:origin: ReactiveX/RxJava

  1. @SuppressWarnings("unchecked")
  2. @Test
  3. public void bufferTimedExactEmpty() {
  4. Observable.empty()
  5. .buffer(1, TimeUnit.DAYS)
  6. .test()
  7. .assertResult(Collections.emptyList());
  8. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void take() {
  3. Observable<Integer> cache = Observable.range(1, 5).cache();
  4. cache.take(2).test().assertResult(1, 2);
  5. cache.take(3).test().assertResult(1, 2, 3);
  6. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void firstOrErrorMultipleElementsObservable() {
  3. Observable.just(1, 2, 3)
  4. .firstOrError()
  5. .toObservable()
  6. .test()
  7. .assertNoErrors()
  8. .assertValue(1);
  9. }

代码示例来源:origin: ReactiveX/RxJava

  1. @SuppressWarnings("unchecked")
  2. @Test
  3. public void bufferBoundaryHint() {
  4. Observable.range(1, 5).buffer(Observable.timer(1, TimeUnit.MINUTES), 2)
  5. .test()
  6. .assertResult(Arrays.asList(1, 2, 3, 4, 5));
  7. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void normalDelayBoundary() {
  3. Observable.range(1, 5)
  4. .concatMapEagerDelayError(new Function<Integer, ObservableSource<Integer>>() {
  5. @Override
  6. public ObservableSource<Integer> apply(Integer t) {
  7. return Observable.range(t, 2);
  8. }
  9. }, false)
  10. .test()
  11. .assertResult(1, 2, 2, 3, 3, 4, 4, 5, 5, 6);
  12. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void nextCrash() {
  3. Maybe.just(1).flattenAsObservable(new Function<Integer, Iterable<Integer>>() {
  4. @Override
  5. public Iterable<Integer> apply(Integer v) throws Exception {
  6. return new CrashingIterable(100, 100, 1);
  7. }
  8. })
  9. .test()
  10. .assertFailureAndMessage(TestException.class, "next()");
  11. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void innerError() {
  3. Observable.just(1)
  4. .switchMapMaybe(Functions.justFunction(Maybe.error(new TestException())))
  5. .test()
  6. .assertFailure(TestException.class);
  7. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void elementAtIndex1WithDefaultOnEmptySourceObservable() {
  3. Observable.empty()
  4. .elementAt(1, 10)
  5. .toObservable()
  6. .test()
  7. .assertResult(10);
  8. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void mainErrorDelayed() {
  3. Observable.<Integer>error(new TestException())
  4. .concatMapDelayError(new Function<Integer, ObservableSource<Integer>>() {
  5. @Override
  6. public ObservableSource<Integer> apply(Integer v) throws Exception {
  7. return Observable.range(v, 2);
  8. }
  9. })
  10. .test()
  11. .assertFailure(TestException.class);
  12. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void timedCancelledUpfront() {
  3. TestScheduler sch = new TestScheduler();
  4. TestObserver<List<Object>> to = Observable.never()
  5. .buffer(1, TimeUnit.MILLISECONDS, sch)
  6. .test(true);
  7. sch.advanceTimeBy(1, TimeUnit.MILLISECONDS);
  8. to.assertEmpty();
  9. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void mainError() {
  3. Observable.error(new TestException())
  4. .concatMapSingle(Functions.justFunction(Single.just(1)))
  5. .test()
  6. .assertFailure(TestException.class);
  7. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. @SuppressWarnings("unchecked")
  3. public void openClosemainError() {
  4. Observable.error(new TestException())
  5. .buffer(Observable.never(), Functions.justFunction(Observable.never()))
  6. .test()
  7. .assertFailure(TestException.class);
  8. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void cancelMain() {
  3. SingleSubject<Integer> ss = SingleSubject.create();
  4. PublishSubject<Integer> ps = PublishSubject.create();
  5. TestObserver<Integer> to = ss.flatMapObservable(Functions.justFunction(ps))
  6. .test();
  7. assertTrue(ss.hasObservers());
  8. assertFalse(ps.hasObservers());
  9. to.cancel();
  10. assertFalse(ss.hasObservers());
  11. assertFalse(ps.hasObservers());
  12. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void cancelMain() {
  3. MaybeSubject<Integer> ms = MaybeSubject.create();
  4. PublishSubject<Integer> ps = PublishSubject.create();
  5. TestObserver<Integer> to = ms.flatMapObservable(Functions.justFunction(ps))
  6. .test();
  7. assertTrue(ms.hasObservers());
  8. assertFalse(ps.hasObservers());
  9. to.cancel();
  10. assertFalse(ms.hasObservers());
  11. assertFalse(ps.hasObservers());
  12. }

相关文章

Observable类方法