本文整理了Java中io.reactivex.Observable.materialize()
方法的一些代码示例,展示了Observable.materialize()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.materialize()
方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:materialize
[英]Returns an Observable that represents all of the emissions and notifications from the source ObservableSource into emissions marked with their original types within Notification objects.
Scheduler: materialize does not operate by default on a particular Scheduler.
[中]返回一个Observable,它表示源ObservableSource中的所有排放和通知,并在通知对象中标记为其原始类型。
调度程序:默认情况下,materialize不会在特定调度程序上运行。
代码示例来源:origin: ReactiveX/RxJava
@Override
public ObservableSource<Notification<Object>> apply(Observable<Object> o) throws Exception {
return o.materialize();
}
});
代码示例来源:origin: ReactiveX/RxJava
@Override
public Iterator<T> iterator() {
BlockingObservableLatestIterator<T> lio = new BlockingObservableLatestIterator<T>();
Observable<Notification<T>> materialized = Observable.wrap(source).materialize();
materialized.subscribe(lio);
return lio;
}
代码示例来源:origin: redisson/redisson
@Override
public Iterator<T> iterator() {
BlockingObservableLatestIterator<T> lio = new BlockingObservableLatestIterator<T>();
Observable<Notification<T>> materialized = Observable.wrap(source).materialize();
materialized.subscribe(lio);
return lio;
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void dispose() {
TestHelper.checkDisposed(Observable.just(1).materialize());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testDematerialize1() {
Observable<Notification<Integer>> notifications = Observable.just(1, 2).materialize();
Observable<Integer> dematerialize = notifications.dematerialize();
Observer<Integer> observer = TestHelper.mockObserver();
dematerialize.subscribe(observer);
verify(observer, times(1)).onNext(1);
verify(observer, times(1)).onNext(2);
verify(observer, times(1)).onComplete();
verify(observer, never()).onError(any(Throwable.class));
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testMaterializeDematerializeChaining() {
Observable<Integer> obs = Observable.just(1);
Observable<Integer> chained = obs.materialize()
.dematerialize(Functions.<Notification<Integer>>identity());
Observer<Integer> observer = TestHelper.mockObserver();
chained.subscribe(observer);
verify(observer, times(1)).onNext(1);
verify(observer, times(1)).onComplete();
verify(observer, times(0)).onError(any(Throwable.class));
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testHonorsContractWhenCompleted() {
Observable<Integer> source = Observable.just(1);
Observable<Integer> result = source.materialize().dematerialize();
Observer<Integer> o = TestHelper.mockObserver();
result.subscribe(o);
verify(o).onNext(1);
verify(o).onComplete();
verify(o, never()).onError(any(Throwable.class));
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testDematerialize3() {
Exception exception = new Exception("test");
Observable<Integer> o = Observable.error(exception);
Observable<Integer> dematerialize = o.materialize().dematerialize();
Observer<Integer> observer = TestHelper.mockObserver();
dematerialize.subscribe(observer);
verify(observer, times(1)).onError(exception);
verify(observer, times(0)).onComplete();
verify(observer, times(0)).onNext(any(Integer.class));
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void simpleSelector() {
Observable<Notification<Integer>> notifications = Observable.just(1, 2).materialize();
Observable<Integer> dematerialize = notifications.dematerialize(Functions.<Notification<Integer>>identity());
Observer<Integer> observer = TestHelper.mockObserver();
dematerialize.subscribe(observer);
verify(observer, times(1)).onNext(1);
verify(observer, times(1)).onNext(2);
verify(observer, times(1)).onComplete();
verify(observer, never()).onError(any(Throwable.class));
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testDematerialize2() {
Throwable exception = new Throwable("test");
Observable<Integer> o = Observable.error(exception);
Observable<Integer> dematerialize = o.materialize().dematerialize();
Observer<Integer> observer = TestHelper.mockObserver();
dematerialize.subscribe(observer);
verify(observer, times(1)).onError(exception);
verify(observer, times(0)).onComplete();
verify(observer, times(0)).onNext(any(Integer.class));
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testHonorsContractWhenThrows() {
Observable<Integer> source = Observable.error(new TestException());
Observable<Integer> result = source.materialize().dematerialize();
Observer<Integer> o = TestHelper.mockObserver();
result.subscribe(o);
verify(o, never()).onNext(any(Integer.class));
verify(o, never()).onComplete();
verify(o).onError(any(TestException.class));
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testEmitMaterializedNotifications() {
Observable<Notification<Integer>> oi = Observable.just(1, 2, 3).materialize();
Observable<Notification<String>> os = Observable.just("a", "b", "c").materialize();
Observable<String> o = Observable.zip(oi, os, new BiFunction<Notification<Integer>, Notification<String>, String>() {
@Override
public String apply(Notification<Integer> t1, Notification<String> t2) {
return kind(t1) + "_" + value(t1) + "-" + kind(t2) + "_" + value(t2);
}
});
final ArrayList<String> list = new ArrayList<String>();
o.subscribe(new Consumer<String>() {
@Override
public void accept(String s) {
System.out.println(s);
list.add(s);
}
});
assertEquals(4, list.size());
assertEquals("OnNext_1-OnNext_a", list.get(0));
assertEquals("OnNext_2-OnNext_b", list.get(1));
assertEquals("OnNext_3-OnNext_c", list.get(2));
assertEquals("OnComplete_null-OnComplete_null", list.get(3));
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void selectorNull() {
Observable.just(1, 2)
.materialize()
.dematerialize(new Function<Notification<Integer>, Notification<Object>>() {
@Override
public Notification<Object> apply(Notification<Integer> v) throws Exception {
return null;
}
})
.test()
.assertFailure(NullPointerException.class);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testMultipleSubscribes() throws InterruptedException, ExecutionException {
final TestAsyncErrorObservable o = new TestAsyncErrorObservable("one", "two", null, "three");
Observable<Notification<String>> m = Observable.unsafeCreate(o).materialize();
assertEquals(3, m.toList().toFuture().get().size());
assertEquals(3, m.toList().toFuture().get().size());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void selectorCrash() {
Observable.just(1, 2)
.materialize()
.dematerialize(new Function<Notification<Integer>, Notification<Object>>() {
@Override
public Notification<Object> apply(Notification<Integer> v) throws Exception {
throw new TestException();
}
})
.test()
.assertFailure(TestException.class);
}
代码示例来源:origin: TeamNewPipe/NewPipe
return local.materialize();
}).materialize();
})
.subscribeOn(Schedulers.io())
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testErrorThrownIssue1685() {
Subject<Object> subject = ReplaySubject.create();
Observable.error(new RuntimeException("oops"))
.materialize()
.delay(1, TimeUnit.SECONDS)
.dematerialize(Functions.<Notification<Object>>identity())
.subscribe(subject);
subject.subscribe();
subject.materialize().blockingFirst();
System.out.println("Done");
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testWithCompletionCausingError() {
TestObserver<Notification<Integer>> to = new TestObserver<Notification<Integer>>();
final RuntimeException ex = new RuntimeException("boo");
Observable.<Integer>empty().materialize().doOnNext(new Consumer<Object>() {
@Override
public void accept(Object t) {
throw ex;
}
}).subscribe(to);
to.assertError(ex);
to.assertNoValues();
to.assertTerminated();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testMaterialize1() {
// null will cause onError to be triggered before "three" can be
// returned
final TestAsyncErrorObservable o1 = new TestAsyncErrorObservable("one", "two", null,
"three");
TestLocalObserver observer = new TestLocalObserver();
Observable<Notification<String>> m = Observable.unsafeCreate(o1).materialize();
m.subscribe(observer);
try {
o1.t.join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
assertFalse(observer.onError);
assertTrue(observer.onComplete);
assertEquals(3, observer.notifications.size());
assertTrue(observer.notifications.get(0).isOnNext());
assertEquals("one", observer.notifications.get(0).getValue());
assertTrue(observer.notifications.get(1).isOnNext());
assertEquals("two", observer.notifications.get(1).getValue());
assertTrue(observer.notifications.get(2).isOnError());
assertEquals(NullPointerException.class, observer.notifications.get(2).getError().getClass());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testMaterialize2() {
final TestAsyncErrorObservable o1 = new TestAsyncErrorObservable("one", "two", "three");
TestLocalObserver observer = new TestLocalObserver();
Observable<Notification<String>> m = Observable.unsafeCreate(o1).materialize();
m.subscribe(observer);
try {
o1.t.join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
assertFalse(observer.onError);
assertTrue(observer.onComplete);
assertEquals(4, observer.notifications.size());
assertTrue(observer.notifications.get(0).isOnNext());
assertEquals("one", observer.notifications.get(0).getValue());
assertTrue(observer.notifications.get(1).isOnNext());
assertEquals("two", observer.notifications.get(1).getValue());
assertTrue(observer.notifications.get(2).isOnNext());
assertEquals("three", observer.notifications.get(2).getValue());
assertTrue(observer.notifications.get(3).isOnComplete());
}
内容来源于网络,如有侵权,请联系作者删除!