io.reactivex.Flowable.materialize()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(7.9k)|赞(0)|评价(0)|浏览(151)

本文整理了Java中io.reactivex.Flowable.materialize()方法的一些代码示例,展示了Flowable.materialize()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.materialize()方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:materialize

Flowable.materialize介绍

[英]Returns a Flowable that represents all of the emissions and notifications from the source Publisher into emissions marked with their original types within Notification objects.

Backpressure: The operator honors backpressure from downstream and expects it from the source Publisher. If this expectation is violated, the operator may throw an IllegalStateException. Scheduler: materialize does not operate by default on a particular Scheduler.
[中]返回一个可流动的值,该值表示源发布服务器发出的所有排放通知,并在通知对象中标记为其原始类型。
背压:操作员接受来自下游的背压,并期望来自源发布者的背压。如果违反此期望,运算符可能抛出一个非法状态异常。调度程序:默认情况下,materialize不会在特定调度程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public Object apply(Flowable<Object> f) throws Exception {
  3. return f.materialize();
  4. }
  5. }, false, null, null, Notification.createOnComplete());

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public Flowable<Notification<Object>> apply(Flowable<Object> f) throws Exception {
  3. return f.materialize();
  4. }
  5. });

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public Iterator<T> iterator() {
  3. LatestSubscriberIterator<T> lio = new LatestSubscriberIterator<T>();
  4. Flowable.<T>fromPublisher(source).materialize().subscribe(lio);
  5. return lio;
  6. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testDematerialize2() {
  3. Throwable exception = new Throwable("test");
  4. Flowable<Integer> flowable = Flowable.error(exception);
  5. Flowable<Integer> dematerialize = flowable.materialize().dematerialize();
  6. Subscriber<Integer> subscriber = TestHelper.mockSubscriber();
  7. dematerialize.subscribe(subscriber);
  8. verify(subscriber, times(1)).onError(exception);
  9. verify(subscriber, times(0)).onComplete();
  10. verify(subscriber, times(0)).onNext(any(Integer.class));
  11. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testDematerialize3() {
  3. Exception exception = new Exception("test");
  4. Flowable<Integer> flowable = Flowable.error(exception);
  5. Flowable<Integer> dematerialize = flowable.materialize().dematerialize();
  6. Subscriber<Integer> subscriber = TestHelper.mockSubscriber();
  7. dematerialize.subscribe(subscriber);
  8. verify(subscriber, times(1)).onError(exception);
  9. verify(subscriber, times(0)).onComplete();
  10. verify(subscriber, times(0)).onNext(any(Integer.class));
  11. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testDematerialize1() {
  3. Flowable<Notification<Integer>> notifications = Flowable.just(1, 2).materialize();
  4. Flowable<Integer> dematerialize = notifications.dematerialize();
  5. Subscriber<Integer> subscriber = TestHelper.mockSubscriber();
  6. dematerialize.subscribe(subscriber);
  7. verify(subscriber, times(1)).onNext(1);
  8. verify(subscriber, times(1)).onNext(2);
  9. verify(subscriber, times(1)).onComplete();
  10. verify(subscriber, never()).onError(any(Throwable.class));
  11. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void simpleSelector() {
  3. Flowable<Notification<Integer>> notifications = Flowable.just(1, 2).materialize();
  4. Flowable<Integer> dematerialize = notifications.dematerialize(Functions.<Notification<Integer>>identity());
  5. Subscriber<Integer> subscriber = TestHelper.mockSubscriber();
  6. dematerialize.subscribe(subscriber);
  7. verify(subscriber, times(1)).onNext(1);
  8. verify(subscriber, times(1)).onNext(2);
  9. verify(subscriber, times(1)).onComplete();
  10. verify(subscriber, never()).onError(any(Throwable.class));
  11. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testMaterializeDematerializeChaining() {
  3. Flowable<Integer> obs = Flowable.just(1);
  4. Flowable<Integer> chained = obs.materialize()
  5. .dematerialize(Functions.<Notification<Integer>>identity());
  6. Subscriber<Integer> subscriber = TestHelper.mockSubscriber();
  7. chained.subscribe(subscriber);
  8. verify(subscriber, times(1)).onNext(1);
  9. verify(subscriber, times(1)).onComplete();
  10. verify(subscriber, times(0)).onError(any(Throwable.class));
  11. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testHonorsContractWhenThrows() {
  3. Flowable<Integer> source = Flowable.error(new TestException());
  4. Flowable<Integer> result = source.materialize().dematerialize();
  5. Subscriber<Integer> subscriber = TestHelper.mockSubscriber();
  6. result.subscribe(subscriber);
  7. verify(subscriber, never()).onNext(any(Integer.class));
  8. verify(subscriber, never()).onComplete();
  9. verify(subscriber).onError(any(TestException.class));
  10. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testHonorsContractWhenCompleted() {
  3. Flowable<Integer> source = Flowable.just(1);
  4. Flowable<Integer> result = source.materialize().dematerialize();
  5. Subscriber<Integer> subscriber = TestHelper.mockSubscriber();
  6. result.subscribe(subscriber);
  7. verify(subscriber).onNext(1);
  8. verify(subscriber).onComplete();
  9. verify(subscriber, never()).onError(any(Throwable.class));
  10. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void badRequest() {
  3. TestHelper.assertBadRequestReported(Flowable.just(1).materialize());
  4. }
  5. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void dispose() {
  3. TestHelper.checkDisposed(Flowable.just(1).materialize());
  4. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testMultipleSubscribes() throws InterruptedException, ExecutionException {
  3. final TestAsyncErrorObservable o = new TestAsyncErrorObservable("one", "two", null, "three");
  4. Flowable<Notification<String>> m = Flowable.unsafeCreate(o).materialize();
  5. assertEquals(3, m.toList().toFuture().get().size());
  6. assertEquals(3, m.toList().toFuture().get().size());
  7. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void selectorNull() {
  3. Flowable.just(1, 2)
  4. .materialize()
  5. .dematerialize(new Function<Notification<Integer>, Notification<Object>>() {
  6. @Override
  7. public Notification<Object> apply(Notification<Integer> v) throws Exception {
  8. return null;
  9. }
  10. })
  11. .test()
  12. .assertFailure(NullPointerException.class);
  13. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testBackpressureWithError() {
  3. TestSubscriber<Notification<Integer>> ts = new TestSubscriber<Notification<Integer>>(0L);
  4. Flowable.<Integer> error(new IllegalArgumentException()).materialize().subscribe(ts);
  5. ts.assertNoValues();
  6. ts.request(1);
  7. ts.assertValueCount(1);
  8. ts.assertComplete();
  9. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void selectorCrash() {
  3. Flowable.just(1, 2)
  4. .materialize()
  5. .dematerialize(new Function<Notification<Integer>, Notification<Object>>() {
  6. @Override
  7. public Notification<Object> apply(Notification<Integer> v) throws Exception {
  8. throw new TestException();
  9. }
  10. })
  11. .test()
  12. .assertFailure(TestException.class);
  13. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testBackpressureOnEmptyStream() {
  3. TestSubscriber<Notification<Integer>> ts = new TestSubscriber<Notification<Integer>>(0L);
  4. Flowable.<Integer> empty().materialize().subscribe(ts);
  5. ts.assertNoValues();
  6. ts.request(1);
  7. ts.assertValueCount(1);
  8. assertTrue(ts.values().get(0).isOnComplete());
  9. ts.assertComplete();
  10. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testUnsubscribeJustBeforeCompletionNotificationShouldPreventThatNotificationArriving() {
  3. TestSubscriber<Notification<Integer>> ts = new TestSubscriber<Notification<Integer>>(0L);
  4. Flowable.<Integer>empty().materialize()
  5. .subscribe(ts);
  6. ts.assertNoValues();
  7. ts.dispose();
  8. ts.request(1);
  9. ts.assertNoValues();
  10. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testBackpressureNoError() {
  3. TestSubscriber<Notification<Integer>> ts = new TestSubscriber<Notification<Integer>>(0L);
  4. Flowable.just(1, 2, 3).materialize().subscribe(ts);
  5. ts.assertNoValues();
  6. ts.request(1);
  7. ts.assertValueCount(1);
  8. ts.request(2);
  9. ts.assertValueCount(3);
  10. ts.request(1);
  11. ts.assertValueCount(4);
  12. ts.assertComplete();
  13. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void backpressure() {
  3. TestSubscriber<Notification<Integer>> ts = Flowable.range(1, 5).materialize().test(0);
  4. ts.assertEmpty();
  5. ts.request(5);
  6. ts.assertValueCount(5)
  7. .assertNoErrors()
  8. .assertNotComplete();
  9. ts.request(1);
  10. ts.assertValueCount(6)
  11. .assertNoErrors()
  12. .assertComplete();
  13. }

相关文章

Flowable类方法