io.reactivex.Observable.materialize()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(9.8k)|赞(0)|评价(0)|浏览(155)

本文整理了Java中io.reactivex.Observable.materialize()方法的一些代码示例,展示了Observable.materialize()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.materialize()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:materialize

Observable.materialize介绍

[英]Returns an Observable that represents all of the emissions and notifications from the source ObservableSource into emissions marked with their original types within Notification objects.

Scheduler: materialize does not operate by default on a particular Scheduler.
[中]返回一个Observable,它表示源ObservableSource中的所有排放通知,并在通知对象中标记为其原始类型。
调度程序:默认情况下,materialize不会在特定调度程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public ObservableSource<Notification<Object>> apply(Observable<Object> o) throws Exception {
  3. return o.materialize();
  4. }
  5. });

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public Iterator<T> iterator() {
  3. BlockingObservableLatestIterator<T> lio = new BlockingObservableLatestIterator<T>();
  4. Observable<Notification<T>> materialized = Observable.wrap(source).materialize();
  5. materialized.subscribe(lio);
  6. return lio;
  7. }

代码示例来源:origin: redisson/redisson

  1. @Override
  2. public Iterator<T> iterator() {
  3. BlockingObservableLatestIterator<T> lio = new BlockingObservableLatestIterator<T>();
  4. Observable<Notification<T>> materialized = Observable.wrap(source).materialize();
  5. materialized.subscribe(lio);
  6. return lio;
  7. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void dispose() {
  3. TestHelper.checkDisposed(Observable.just(1).materialize());
  4. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testDematerialize1() {
  3. Observable<Notification<Integer>> notifications = Observable.just(1, 2).materialize();
  4. Observable<Integer> dematerialize = notifications.dematerialize();
  5. Observer<Integer> observer = TestHelper.mockObserver();
  6. dematerialize.subscribe(observer);
  7. verify(observer, times(1)).onNext(1);
  8. verify(observer, times(1)).onNext(2);
  9. verify(observer, times(1)).onComplete();
  10. verify(observer, never()).onError(any(Throwable.class));
  11. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testMaterializeDematerializeChaining() {
  3. Observable<Integer> obs = Observable.just(1);
  4. Observable<Integer> chained = obs.materialize()
  5. .dematerialize(Functions.<Notification<Integer>>identity());
  6. Observer<Integer> observer = TestHelper.mockObserver();
  7. chained.subscribe(observer);
  8. verify(observer, times(1)).onNext(1);
  9. verify(observer, times(1)).onComplete();
  10. verify(observer, times(0)).onError(any(Throwable.class));
  11. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testHonorsContractWhenCompleted() {
  3. Observable<Integer> source = Observable.just(1);
  4. Observable<Integer> result = source.materialize().dematerialize();
  5. Observer<Integer> o = TestHelper.mockObserver();
  6. result.subscribe(o);
  7. verify(o).onNext(1);
  8. verify(o).onComplete();
  9. verify(o, never()).onError(any(Throwable.class));
  10. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testDematerialize3() {
  3. Exception exception = new Exception("test");
  4. Observable<Integer> o = Observable.error(exception);
  5. Observable<Integer> dematerialize = o.materialize().dematerialize();
  6. Observer<Integer> observer = TestHelper.mockObserver();
  7. dematerialize.subscribe(observer);
  8. verify(observer, times(1)).onError(exception);
  9. verify(observer, times(0)).onComplete();
  10. verify(observer, times(0)).onNext(any(Integer.class));
  11. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void simpleSelector() {
  3. Observable<Notification<Integer>> notifications = Observable.just(1, 2).materialize();
  4. Observable<Integer> dematerialize = notifications.dematerialize(Functions.<Notification<Integer>>identity());
  5. Observer<Integer> observer = TestHelper.mockObserver();
  6. dematerialize.subscribe(observer);
  7. verify(observer, times(1)).onNext(1);
  8. verify(observer, times(1)).onNext(2);
  9. verify(observer, times(1)).onComplete();
  10. verify(observer, never()).onError(any(Throwable.class));
  11. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testDematerialize2() {
  3. Throwable exception = new Throwable("test");
  4. Observable<Integer> o = Observable.error(exception);
  5. Observable<Integer> dematerialize = o.materialize().dematerialize();
  6. Observer<Integer> observer = TestHelper.mockObserver();
  7. dematerialize.subscribe(observer);
  8. verify(observer, times(1)).onError(exception);
  9. verify(observer, times(0)).onComplete();
  10. verify(observer, times(0)).onNext(any(Integer.class));
  11. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testHonorsContractWhenThrows() {
  3. Observable<Integer> source = Observable.error(new TestException());
  4. Observable<Integer> result = source.materialize().dematerialize();
  5. Observer<Integer> o = TestHelper.mockObserver();
  6. result.subscribe(o);
  7. verify(o, never()).onNext(any(Integer.class));
  8. verify(o, never()).onComplete();
  9. verify(o).onError(any(TestException.class));
  10. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testEmitMaterializedNotifications() {
  3. Observable<Notification<Integer>> oi = Observable.just(1, 2, 3).materialize();
  4. Observable<Notification<String>> os = Observable.just("a", "b", "c").materialize();
  5. Observable<String> o = Observable.zip(oi, os, new BiFunction<Notification<Integer>, Notification<String>, String>() {
  6. @Override
  7. public String apply(Notification<Integer> t1, Notification<String> t2) {
  8. return kind(t1) + "_" + value(t1) + "-" + kind(t2) + "_" + value(t2);
  9. }
  10. });
  11. final ArrayList<String> list = new ArrayList<String>();
  12. o.subscribe(new Consumer<String>() {
  13. @Override
  14. public void accept(String s) {
  15. System.out.println(s);
  16. list.add(s);
  17. }
  18. });
  19. assertEquals(4, list.size());
  20. assertEquals("OnNext_1-OnNext_a", list.get(0));
  21. assertEquals("OnNext_2-OnNext_b", list.get(1));
  22. assertEquals("OnNext_3-OnNext_c", list.get(2));
  23. assertEquals("OnComplete_null-OnComplete_null", list.get(3));
  24. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void selectorNull() {
  3. Observable.just(1, 2)
  4. .materialize()
  5. .dematerialize(new Function<Notification<Integer>, Notification<Object>>() {
  6. @Override
  7. public Notification<Object> apply(Notification<Integer> v) throws Exception {
  8. return null;
  9. }
  10. })
  11. .test()
  12. .assertFailure(NullPointerException.class);
  13. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testMultipleSubscribes() throws InterruptedException, ExecutionException {
  3. final TestAsyncErrorObservable o = new TestAsyncErrorObservable("one", "two", null, "three");
  4. Observable<Notification<String>> m = Observable.unsafeCreate(o).materialize();
  5. assertEquals(3, m.toList().toFuture().get().size());
  6. assertEquals(3, m.toList().toFuture().get().size());
  7. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void selectorCrash() {
  3. Observable.just(1, 2)
  4. .materialize()
  5. .dematerialize(new Function<Notification<Integer>, Notification<Object>>() {
  6. @Override
  7. public Notification<Object> apply(Notification<Integer> v) throws Exception {
  8. throw new TestException();
  9. }
  10. })
  11. .test()
  12. .assertFailure(TestException.class);
  13. }

代码示例来源:origin: TeamNewPipe/NewPipe

  1. return local.materialize();
  2. }).materialize();
  3. })
  4. .subscribeOn(Schedulers.io())

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testErrorThrownIssue1685() {
  3. Subject<Object> subject = ReplaySubject.create();
  4. Observable.error(new RuntimeException("oops"))
  5. .materialize()
  6. .delay(1, TimeUnit.SECONDS)
  7. .dematerialize(Functions.<Notification<Object>>identity())
  8. .subscribe(subject);
  9. subject.subscribe();
  10. subject.materialize().blockingFirst();
  11. System.out.println("Done");
  12. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testWithCompletionCausingError() {
  3. TestObserver<Notification<Integer>> to = new TestObserver<Notification<Integer>>();
  4. final RuntimeException ex = new RuntimeException("boo");
  5. Observable.<Integer>empty().materialize().doOnNext(new Consumer<Object>() {
  6. @Override
  7. public void accept(Object t) {
  8. throw ex;
  9. }
  10. }).subscribe(to);
  11. to.assertError(ex);
  12. to.assertNoValues();
  13. to.assertTerminated();
  14. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testMaterialize1() {
  3. // null will cause onError to be triggered before "three" can be
  4. // returned
  5. final TestAsyncErrorObservable o1 = new TestAsyncErrorObservable("one", "two", null,
  6. "three");
  7. TestLocalObserver observer = new TestLocalObserver();
  8. Observable<Notification<String>> m = Observable.unsafeCreate(o1).materialize();
  9. m.subscribe(observer);
  10. try {
  11. o1.t.join();
  12. } catch (InterruptedException e) {
  13. throw new RuntimeException(e);
  14. }
  15. assertFalse(observer.onError);
  16. assertTrue(observer.onComplete);
  17. assertEquals(3, observer.notifications.size());
  18. assertTrue(observer.notifications.get(0).isOnNext());
  19. assertEquals("one", observer.notifications.get(0).getValue());
  20. assertTrue(observer.notifications.get(1).isOnNext());
  21. assertEquals("two", observer.notifications.get(1).getValue());
  22. assertTrue(observer.notifications.get(2).isOnError());
  23. assertEquals(NullPointerException.class, observer.notifications.get(2).getError().getClass());
  24. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testMaterialize2() {
  3. final TestAsyncErrorObservable o1 = new TestAsyncErrorObservable("one", "two", "three");
  4. TestLocalObserver observer = new TestLocalObserver();
  5. Observable<Notification<String>> m = Observable.unsafeCreate(o1).materialize();
  6. m.subscribe(observer);
  7. try {
  8. o1.t.join();
  9. } catch (InterruptedException e) {
  10. throw new RuntimeException(e);
  11. }
  12. assertFalse(observer.onError);
  13. assertTrue(observer.onComplete);
  14. assertEquals(4, observer.notifications.size());
  15. assertTrue(observer.notifications.get(0).isOnNext());
  16. assertEquals("one", observer.notifications.get(0).getValue());
  17. assertTrue(observer.notifications.get(1).isOnNext());
  18. assertEquals("two", observer.notifications.get(1).getValue());
  19. assertTrue(observer.notifications.get(2).isOnNext());
  20. assertEquals("three", observer.notifications.get(2).getValue());
  21. assertTrue(observer.notifications.get(3).isOnComplete());
  22. }

相关文章

Observable类方法