io.reactivex.Flowable.materialize()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(7.9k)|赞(0)|评价(0)|浏览(131)

本文整理了Java中io.reactivex.Flowable.materialize()方法的一些代码示例,展示了Flowable.materialize()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.materialize()方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:materialize

Flowable.materialize介绍

[英]Returns a Flowable that represents all of the emissions and notifications from the source Publisher into emissions marked with their original types within Notification objects.

Backpressure: The operator honors backpressure from downstream and expects it from the source Publisher. If this expectation is violated, the operator may throw an IllegalStateException. Scheduler: materialize does not operate by default on a particular Scheduler.
[中]返回一个可流动的值,该值表示源发布服务器发出的所有排放通知,并在通知对象中标记为其原始类型。
背压:操作员接受来自下游的背压,并期望来自源发布者的背压。如果违反此期望,运算符可能抛出一个非法状态异常。调度程序:默认情况下,materialize不会在特定调度程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Object apply(Flowable<Object> f) throws Exception {
    return f.materialize();
  }
}, false, null, null, Notification.createOnComplete());

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Flowable<Notification<Object>> apply(Flowable<Object> f) throws Exception {
    return f.materialize();
  }
});

代码示例来源:origin: ReactiveX/RxJava

@Override
public Iterator<T> iterator() {
  LatestSubscriberIterator<T> lio = new LatestSubscriberIterator<T>();
  Flowable.<T>fromPublisher(source).materialize().subscribe(lio);
  return lio;
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testDematerialize2() {
  Throwable exception = new Throwable("test");
  Flowable<Integer> flowable = Flowable.error(exception);
  Flowable<Integer> dematerialize = flowable.materialize().dematerialize();
  Subscriber<Integer> subscriber = TestHelper.mockSubscriber();
  dematerialize.subscribe(subscriber);
  verify(subscriber, times(1)).onError(exception);
  verify(subscriber, times(0)).onComplete();
  verify(subscriber, times(0)).onNext(any(Integer.class));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testDematerialize3() {
  Exception exception = new Exception("test");
  Flowable<Integer> flowable = Flowable.error(exception);
  Flowable<Integer> dematerialize = flowable.materialize().dematerialize();
  Subscriber<Integer> subscriber = TestHelper.mockSubscriber();
  dematerialize.subscribe(subscriber);
  verify(subscriber, times(1)).onError(exception);
  verify(subscriber, times(0)).onComplete();
  verify(subscriber, times(0)).onNext(any(Integer.class));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testDematerialize1() {
  Flowable<Notification<Integer>> notifications = Flowable.just(1, 2).materialize();
  Flowable<Integer> dematerialize = notifications.dematerialize();
  Subscriber<Integer> subscriber = TestHelper.mockSubscriber();
  dematerialize.subscribe(subscriber);
  verify(subscriber, times(1)).onNext(1);
  verify(subscriber, times(1)).onNext(2);
  verify(subscriber, times(1)).onComplete();
  verify(subscriber, never()).onError(any(Throwable.class));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void simpleSelector() {
  Flowable<Notification<Integer>> notifications = Flowable.just(1, 2).materialize();
  Flowable<Integer> dematerialize = notifications.dematerialize(Functions.<Notification<Integer>>identity());
  Subscriber<Integer> subscriber = TestHelper.mockSubscriber();
  dematerialize.subscribe(subscriber);
  verify(subscriber, times(1)).onNext(1);
  verify(subscriber, times(1)).onNext(2);
  verify(subscriber, times(1)).onComplete();
  verify(subscriber, never()).onError(any(Throwable.class));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testMaterializeDematerializeChaining() {
  Flowable<Integer> obs = Flowable.just(1);
  Flowable<Integer> chained = obs.materialize()
      .dematerialize(Functions.<Notification<Integer>>identity());
  Subscriber<Integer> subscriber = TestHelper.mockSubscriber();
  chained.subscribe(subscriber);
  verify(subscriber, times(1)).onNext(1);
  verify(subscriber, times(1)).onComplete();
  verify(subscriber, times(0)).onError(any(Throwable.class));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testHonorsContractWhenThrows() {
  Flowable<Integer> source = Flowable.error(new TestException());
  Flowable<Integer> result = source.materialize().dematerialize();
  Subscriber<Integer> subscriber = TestHelper.mockSubscriber();
  result.subscribe(subscriber);
  verify(subscriber, never()).onNext(any(Integer.class));
  verify(subscriber, never()).onComplete();
  verify(subscriber).onError(any(TestException.class));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testHonorsContractWhenCompleted() {
  Flowable<Integer> source = Flowable.just(1);
  Flowable<Integer> result = source.materialize().dematerialize();
  Subscriber<Integer> subscriber = TestHelper.mockSubscriber();
  result.subscribe(subscriber);
  verify(subscriber).onNext(1);
  verify(subscriber).onComplete();
  verify(subscriber, never()).onError(any(Throwable.class));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
  public void badRequest() {
    TestHelper.assertBadRequestReported(Flowable.just(1).materialize());
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void dispose() {
  TestHelper.checkDisposed(Flowable.just(1).materialize());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testMultipleSubscribes() throws InterruptedException, ExecutionException {
  final TestAsyncErrorObservable o = new TestAsyncErrorObservable("one", "two", null, "three");
  Flowable<Notification<String>> m = Flowable.unsafeCreate(o).materialize();
  assertEquals(3, m.toList().toFuture().get().size());
  assertEquals(3, m.toList().toFuture().get().size());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void selectorNull() {
  Flowable.just(1, 2)
  .materialize()
  .dematerialize(new Function<Notification<Integer>, Notification<Object>>() {
    @Override
    public Notification<Object> apply(Notification<Integer> v) throws Exception {
      return null;
    }
  })
  .test()
  .assertFailure(NullPointerException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testBackpressureWithError() {
  TestSubscriber<Notification<Integer>> ts = new TestSubscriber<Notification<Integer>>(0L);
  Flowable.<Integer> error(new IllegalArgumentException()).materialize().subscribe(ts);
  ts.assertNoValues();
  ts.request(1);
  ts.assertValueCount(1);
  ts.assertComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void selectorCrash() {
  Flowable.just(1, 2)
  .materialize()
  .dematerialize(new Function<Notification<Integer>, Notification<Object>>() {
    @Override
    public Notification<Object> apply(Notification<Integer> v) throws Exception {
      throw new TestException();
    }
  })
  .test()
  .assertFailure(TestException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testBackpressureOnEmptyStream() {
  TestSubscriber<Notification<Integer>> ts = new TestSubscriber<Notification<Integer>>(0L);
  Flowable.<Integer> empty().materialize().subscribe(ts);
  ts.assertNoValues();
  ts.request(1);
  ts.assertValueCount(1);
  assertTrue(ts.values().get(0).isOnComplete());
  ts.assertComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testUnsubscribeJustBeforeCompletionNotificationShouldPreventThatNotificationArriving() {
  TestSubscriber<Notification<Integer>> ts = new TestSubscriber<Notification<Integer>>(0L);
  Flowable.<Integer>empty().materialize()
      .subscribe(ts);
  ts.assertNoValues();
  ts.dispose();
  ts.request(1);
  ts.assertNoValues();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testBackpressureNoError() {
  TestSubscriber<Notification<Integer>> ts = new TestSubscriber<Notification<Integer>>(0L);
  Flowable.just(1, 2, 3).materialize().subscribe(ts);
  ts.assertNoValues();
  ts.request(1);
  ts.assertValueCount(1);
  ts.request(2);
  ts.assertValueCount(3);
  ts.request(1);
  ts.assertValueCount(4);
  ts.assertComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void backpressure() {
  TestSubscriber<Notification<Integer>> ts = Flowable.range(1, 5).materialize().test(0);
  ts.assertEmpty();
  ts.request(5);
  ts.assertValueCount(5)
  .assertNoErrors()
  .assertNotComplete();
  ts.request(1);
  ts.assertValueCount(6)
  .assertNoErrors()
  .assertComplete();
}

相关文章

Flowable类方法