本文整理了Java中io.reactivex.Flowable.materialize()
方法的一些代码示例,展示了Flowable.materialize()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.materialize()
方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:materialize
[英]Returns a Flowable that represents all of the emissions and notifications from the source Publisher into emissions marked with their original types within Notification objects.
Backpressure: The operator honors backpressure from downstream and expects it from the source Publisher. If this expectation is violated, the operator may throw an IllegalStateException. Scheduler: materialize does not operate by default on a particular Scheduler.
[中]返回一个可流动的值,该值表示源发布服务器发出的所有排放和通知,并在通知对象中标记为其原始类型。
背压:操作员接受来自下游的背压,并期望来自源发布者的背压。如果违反此期望,运算符可能抛出一个非法状态异常。调度程序:默认情况下,materialize不会在特定调度程序上运行。
代码示例来源:origin: ReactiveX/RxJava
@Override
public Object apply(Flowable<Object> f) throws Exception {
return f.materialize();
}
}, false, null, null, Notification.createOnComplete());
代码示例来源:origin: ReactiveX/RxJava
@Override
public Flowable<Notification<Object>> apply(Flowable<Object> f) throws Exception {
return f.materialize();
}
});
代码示例来源:origin: ReactiveX/RxJava
@Override
public Iterator<T> iterator() {
LatestSubscriberIterator<T> lio = new LatestSubscriberIterator<T>();
Flowable.<T>fromPublisher(source).materialize().subscribe(lio);
return lio;
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testDematerialize2() {
Throwable exception = new Throwable("test");
Flowable<Integer> flowable = Flowable.error(exception);
Flowable<Integer> dematerialize = flowable.materialize().dematerialize();
Subscriber<Integer> subscriber = TestHelper.mockSubscriber();
dematerialize.subscribe(subscriber);
verify(subscriber, times(1)).onError(exception);
verify(subscriber, times(0)).onComplete();
verify(subscriber, times(0)).onNext(any(Integer.class));
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testDematerialize3() {
Exception exception = new Exception("test");
Flowable<Integer> flowable = Flowable.error(exception);
Flowable<Integer> dematerialize = flowable.materialize().dematerialize();
Subscriber<Integer> subscriber = TestHelper.mockSubscriber();
dematerialize.subscribe(subscriber);
verify(subscriber, times(1)).onError(exception);
verify(subscriber, times(0)).onComplete();
verify(subscriber, times(0)).onNext(any(Integer.class));
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testDematerialize1() {
Flowable<Notification<Integer>> notifications = Flowable.just(1, 2).materialize();
Flowable<Integer> dematerialize = notifications.dematerialize();
Subscriber<Integer> subscriber = TestHelper.mockSubscriber();
dematerialize.subscribe(subscriber);
verify(subscriber, times(1)).onNext(1);
verify(subscriber, times(1)).onNext(2);
verify(subscriber, times(1)).onComplete();
verify(subscriber, never()).onError(any(Throwable.class));
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void simpleSelector() {
Flowable<Notification<Integer>> notifications = Flowable.just(1, 2).materialize();
Flowable<Integer> dematerialize = notifications.dematerialize(Functions.<Notification<Integer>>identity());
Subscriber<Integer> subscriber = TestHelper.mockSubscriber();
dematerialize.subscribe(subscriber);
verify(subscriber, times(1)).onNext(1);
verify(subscriber, times(1)).onNext(2);
verify(subscriber, times(1)).onComplete();
verify(subscriber, never()).onError(any(Throwable.class));
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testMaterializeDematerializeChaining() {
Flowable<Integer> obs = Flowable.just(1);
Flowable<Integer> chained = obs.materialize()
.dematerialize(Functions.<Notification<Integer>>identity());
Subscriber<Integer> subscriber = TestHelper.mockSubscriber();
chained.subscribe(subscriber);
verify(subscriber, times(1)).onNext(1);
verify(subscriber, times(1)).onComplete();
verify(subscriber, times(0)).onError(any(Throwable.class));
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testHonorsContractWhenThrows() {
Flowable<Integer> source = Flowable.error(new TestException());
Flowable<Integer> result = source.materialize().dematerialize();
Subscriber<Integer> subscriber = TestHelper.mockSubscriber();
result.subscribe(subscriber);
verify(subscriber, never()).onNext(any(Integer.class));
verify(subscriber, never()).onComplete();
verify(subscriber).onError(any(TestException.class));
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testHonorsContractWhenCompleted() {
Flowable<Integer> source = Flowable.just(1);
Flowable<Integer> result = source.materialize().dematerialize();
Subscriber<Integer> subscriber = TestHelper.mockSubscriber();
result.subscribe(subscriber);
verify(subscriber).onNext(1);
verify(subscriber).onComplete();
verify(subscriber, never()).onError(any(Throwable.class));
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void badRequest() {
TestHelper.assertBadRequestReported(Flowable.just(1).materialize());
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void dispose() {
TestHelper.checkDisposed(Flowable.just(1).materialize());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testMultipleSubscribes() throws InterruptedException, ExecutionException {
final TestAsyncErrorObservable o = new TestAsyncErrorObservable("one", "two", null, "three");
Flowable<Notification<String>> m = Flowable.unsafeCreate(o).materialize();
assertEquals(3, m.toList().toFuture().get().size());
assertEquals(3, m.toList().toFuture().get().size());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void selectorNull() {
Flowable.just(1, 2)
.materialize()
.dematerialize(new Function<Notification<Integer>, Notification<Object>>() {
@Override
public Notification<Object> apply(Notification<Integer> v) throws Exception {
return null;
}
})
.test()
.assertFailure(NullPointerException.class);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testBackpressureWithError() {
TestSubscriber<Notification<Integer>> ts = new TestSubscriber<Notification<Integer>>(0L);
Flowable.<Integer> error(new IllegalArgumentException()).materialize().subscribe(ts);
ts.assertNoValues();
ts.request(1);
ts.assertValueCount(1);
ts.assertComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void selectorCrash() {
Flowable.just(1, 2)
.materialize()
.dematerialize(new Function<Notification<Integer>, Notification<Object>>() {
@Override
public Notification<Object> apply(Notification<Integer> v) throws Exception {
throw new TestException();
}
})
.test()
.assertFailure(TestException.class);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testBackpressureOnEmptyStream() {
TestSubscriber<Notification<Integer>> ts = new TestSubscriber<Notification<Integer>>(0L);
Flowable.<Integer> empty().materialize().subscribe(ts);
ts.assertNoValues();
ts.request(1);
ts.assertValueCount(1);
assertTrue(ts.values().get(0).isOnComplete());
ts.assertComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testUnsubscribeJustBeforeCompletionNotificationShouldPreventThatNotificationArriving() {
TestSubscriber<Notification<Integer>> ts = new TestSubscriber<Notification<Integer>>(0L);
Flowable.<Integer>empty().materialize()
.subscribe(ts);
ts.assertNoValues();
ts.dispose();
ts.request(1);
ts.assertNoValues();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testBackpressureNoError() {
TestSubscriber<Notification<Integer>> ts = new TestSubscriber<Notification<Integer>>(0L);
Flowable.just(1, 2, 3).materialize().subscribe(ts);
ts.assertNoValues();
ts.request(1);
ts.assertValueCount(1);
ts.request(2);
ts.assertValueCount(3);
ts.request(1);
ts.assertValueCount(4);
ts.assertComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void backpressure() {
TestSubscriber<Notification<Integer>> ts = Flowable.range(1, 5).materialize().test(0);
ts.assertEmpty();
ts.request(5);
ts.assertValueCount(5)
.assertNoErrors()
.assertNotComplete();
ts.request(1);
ts.assertValueCount(6)
.assertNoErrors()
.assertComplete();
}
内容来源于网络,如有侵权,请联系作者删除!