io.reactivex.Observable.materialize()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(9.8k)|赞(0)|评价(0)|浏览(131)

本文整理了Java中io.reactivex.Observable.materialize()方法的一些代码示例,展示了Observable.materialize()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.materialize()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:materialize

Observable.materialize介绍

[英]Returns an Observable that represents all of the emissions and notifications from the source ObservableSource into emissions marked with their original types within Notification objects.

Scheduler: materialize does not operate by default on a particular Scheduler.
[中]返回一个Observable,它表示源ObservableSource中的所有排放通知,并在通知对象中标记为其原始类型。
调度程序:默认情况下,materialize不会在特定调度程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Override
  public ObservableSource<Notification<Object>> apply(Observable<Object> o) throws Exception {
    return o.materialize();
  }
});

代码示例来源:origin: ReactiveX/RxJava

@Override
public Iterator<T> iterator() {
  BlockingObservableLatestIterator<T> lio = new BlockingObservableLatestIterator<T>();
  Observable<Notification<T>> materialized = Observable.wrap(source).materialize();
  materialized.subscribe(lio);
  return lio;
}

代码示例来源:origin: redisson/redisson

@Override
public Iterator<T> iterator() {
  BlockingObservableLatestIterator<T> lio = new BlockingObservableLatestIterator<T>();
  Observable<Notification<T>> materialized = Observable.wrap(source).materialize();
  materialized.subscribe(lio);
  return lio;
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void dispose() {
  TestHelper.checkDisposed(Observable.just(1).materialize());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testDematerialize1() {
  Observable<Notification<Integer>> notifications = Observable.just(1, 2).materialize();
  Observable<Integer> dematerialize = notifications.dematerialize();
  Observer<Integer> observer = TestHelper.mockObserver();
  dematerialize.subscribe(observer);
  verify(observer, times(1)).onNext(1);
  verify(observer, times(1)).onNext(2);
  verify(observer, times(1)).onComplete();
  verify(observer, never()).onError(any(Throwable.class));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testMaterializeDematerializeChaining() {
  Observable<Integer> obs = Observable.just(1);
  Observable<Integer> chained = obs.materialize()
      .dematerialize(Functions.<Notification<Integer>>identity());
  Observer<Integer> observer = TestHelper.mockObserver();
  chained.subscribe(observer);
  verify(observer, times(1)).onNext(1);
  verify(observer, times(1)).onComplete();
  verify(observer, times(0)).onError(any(Throwable.class));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testHonorsContractWhenCompleted() {
  Observable<Integer> source = Observable.just(1);
  Observable<Integer> result = source.materialize().dematerialize();
  Observer<Integer> o = TestHelper.mockObserver();
  result.subscribe(o);
  verify(o).onNext(1);
  verify(o).onComplete();
  verify(o, never()).onError(any(Throwable.class));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testDematerialize3() {
  Exception exception = new Exception("test");
  Observable<Integer> o = Observable.error(exception);
  Observable<Integer> dematerialize = o.materialize().dematerialize();
  Observer<Integer> observer = TestHelper.mockObserver();
  dematerialize.subscribe(observer);
  verify(observer, times(1)).onError(exception);
  verify(observer, times(0)).onComplete();
  verify(observer, times(0)).onNext(any(Integer.class));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void simpleSelector() {
  Observable<Notification<Integer>> notifications = Observable.just(1, 2).materialize();
  Observable<Integer> dematerialize = notifications.dematerialize(Functions.<Notification<Integer>>identity());
  Observer<Integer> observer = TestHelper.mockObserver();
  dematerialize.subscribe(observer);
  verify(observer, times(1)).onNext(1);
  verify(observer, times(1)).onNext(2);
  verify(observer, times(1)).onComplete();
  verify(observer, never()).onError(any(Throwable.class));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testDematerialize2() {
  Throwable exception = new Throwable("test");
  Observable<Integer> o = Observable.error(exception);
  Observable<Integer> dematerialize = o.materialize().dematerialize();
  Observer<Integer> observer = TestHelper.mockObserver();
  dematerialize.subscribe(observer);
  verify(observer, times(1)).onError(exception);
  verify(observer, times(0)).onComplete();
  verify(observer, times(0)).onNext(any(Integer.class));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testHonorsContractWhenThrows() {
  Observable<Integer> source = Observable.error(new TestException());
  Observable<Integer> result = source.materialize().dematerialize();
  Observer<Integer> o = TestHelper.mockObserver();
  result.subscribe(o);
  verify(o, never()).onNext(any(Integer.class));
  verify(o, never()).onComplete();
  verify(o).onError(any(TestException.class));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testEmitMaterializedNotifications() {
  Observable<Notification<Integer>> oi = Observable.just(1, 2, 3).materialize();
  Observable<Notification<String>> os = Observable.just("a", "b", "c").materialize();
  Observable<String> o = Observable.zip(oi, os, new BiFunction<Notification<Integer>, Notification<String>, String>() {
    @Override
    public String apply(Notification<Integer> t1, Notification<String> t2) {
      return kind(t1) + "_" + value(t1) + "-" + kind(t2) + "_" + value(t2);
    }
  });
  final ArrayList<String> list = new ArrayList<String>();
  o.subscribe(new Consumer<String>() {
    @Override
    public void accept(String s) {
      System.out.println(s);
      list.add(s);
    }
  });
  assertEquals(4, list.size());
  assertEquals("OnNext_1-OnNext_a", list.get(0));
  assertEquals("OnNext_2-OnNext_b", list.get(1));
  assertEquals("OnNext_3-OnNext_c", list.get(2));
  assertEquals("OnComplete_null-OnComplete_null", list.get(3));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void selectorNull() {
  Observable.just(1, 2)
  .materialize()
  .dematerialize(new Function<Notification<Integer>, Notification<Object>>() {
    @Override
    public Notification<Object> apply(Notification<Integer> v) throws Exception {
      return null;
    }
  })
  .test()
  .assertFailure(NullPointerException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testMultipleSubscribes() throws InterruptedException, ExecutionException {
  final TestAsyncErrorObservable o = new TestAsyncErrorObservable("one", "two", null, "three");
  Observable<Notification<String>> m = Observable.unsafeCreate(o).materialize();
  assertEquals(3, m.toList().toFuture().get().size());
  assertEquals(3, m.toList().toFuture().get().size());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void selectorCrash() {
  Observable.just(1, 2)
  .materialize()
  .dematerialize(new Function<Notification<Integer>, Notification<Object>>() {
    @Override
    public Notification<Object> apply(Notification<Integer> v) throws Exception {
      throw new TestException();
    }
  })
  .test()
  .assertFailure(TestException.class);
}

代码示例来源:origin: TeamNewPipe/NewPipe

return local.materialize();
  }).materialize();
})
.subscribeOn(Schedulers.io())

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testErrorThrownIssue1685() {
  Subject<Object> subject = ReplaySubject.create();
  Observable.error(new RuntimeException("oops"))
    .materialize()
    .delay(1, TimeUnit.SECONDS)
    .dematerialize(Functions.<Notification<Object>>identity())
    .subscribe(subject);
  subject.subscribe();
  subject.materialize().blockingFirst();
  System.out.println("Done");
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testWithCompletionCausingError() {
  TestObserver<Notification<Integer>> to = new TestObserver<Notification<Integer>>();
  final RuntimeException ex = new RuntimeException("boo");
  Observable.<Integer>empty().materialize().doOnNext(new Consumer<Object>() {
    @Override
    public void accept(Object t) {
      throw ex;
    }
  }).subscribe(to);
  to.assertError(ex);
  to.assertNoValues();
  to.assertTerminated();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testMaterialize1() {
  // null will cause onError to be triggered before "three" can be
  // returned
  final TestAsyncErrorObservable o1 = new TestAsyncErrorObservable("one", "two", null,
      "three");
  TestLocalObserver observer = new TestLocalObserver();
  Observable<Notification<String>> m = Observable.unsafeCreate(o1).materialize();
  m.subscribe(observer);
  try {
    o1.t.join();
  } catch (InterruptedException e) {
    throw new RuntimeException(e);
  }
  assertFalse(observer.onError);
  assertTrue(observer.onComplete);
  assertEquals(3, observer.notifications.size());
  assertTrue(observer.notifications.get(0).isOnNext());
  assertEquals("one", observer.notifications.get(0).getValue());
  assertTrue(observer.notifications.get(1).isOnNext());
  assertEquals("two", observer.notifications.get(1).getValue());
  assertTrue(observer.notifications.get(2).isOnError());
  assertEquals(NullPointerException.class, observer.notifications.get(2).getError().getClass());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testMaterialize2() {
  final TestAsyncErrorObservable o1 = new TestAsyncErrorObservable("one", "two", "three");
  TestLocalObserver observer = new TestLocalObserver();
  Observable<Notification<String>> m = Observable.unsafeCreate(o1).materialize();
  m.subscribe(observer);
  try {
    o1.t.join();
  } catch (InterruptedException e) {
    throw new RuntimeException(e);
  }
  assertFalse(observer.onError);
  assertTrue(observer.onComplete);
  assertEquals(4, observer.notifications.size());
  assertTrue(observer.notifications.get(0).isOnNext());
  assertEquals("one", observer.notifications.get(0).getValue());
  assertTrue(observer.notifications.get(1).isOnNext());
  assertEquals("two", observer.notifications.get(1).getValue());
  assertTrue(observer.notifications.get(2).isOnNext());
  assertEquals("three", observer.notifications.get(2).getValue());
  assertTrue(observer.notifications.get(3).isOnComplete());
}

相关文章

Observable类方法