io.reactivex.Flowable.dematerialize()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(9.2k)|赞(0)|评价(0)|浏览(171)

本文整理了Java中io.reactivex.Flowable.dematerialize()方法的一些代码示例,展示了Flowable.dematerialize()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.dematerialize()方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:dematerialize

Flowable.dematerialize介绍

[英]Returns a Flowable that reverses the effect of #materialize by transforming the Notification objects emitted by the source Publisher into the items or notifications they represent.

When the upstream signals an Notification#createOnError(Throwable) or Notification#createOnComplete() item, the returned Flowable cancels the flow and terminates with that type of terminal event:

  1. Flowable.just(createOnNext(1), createOnComplete(), createOnNext(2))
  2. .doOnCancel(() -> System.out.println("Cancelled!"));
  3. .test()
  4. .assertResult(1);

If the upstream signals onError or onComplete directly, the flow is terminated with the same event.

  1. Flowable.just(createOnNext(1), createOnNext(2))
  2. .test()
  3. .assertResult(1, 2);

If this behavior is not desired, the completion can be suppressed by applying #concatWith(Publisher)with a #never() source. Backpressure: The operator doesn't interfere with backpressure which is determined by the source Publisher's backpressure behavior. Scheduler: dematerialize does not operate by default on a particular Scheduler.
[中]返回一个Flowable,它通过将源发布服务器发出的通知对象转换为它们所表示的项目或通知来反转#materialize的效果。
当上游向通知#createOnError(Throwable)或通知#createOnComplete()项发出信号时,返回的可流动项将取消该流,并以该类型的终端事件终止:

  1. Flowable.just(createOnNext(1), createOnComplete(), createOnNext(2))
  2. .doOnCancel(() -> System.out.println("Cancelled!"));
  3. .test()
  4. .assertResult(1);

如果上游直接发出onError或onComplete信号,则流将以相同事件终止

  1. Flowable.just(createOnNext(1), createOnNext(2))
  2. .test()
  3. .assertResult(1, 2);

如果不需要此行为,可以通过使用#never()源应用#concatWith(Publisher)来抑制完成。背压:操作员不会干扰由源发布者的背压行为确定的背压。调度程序:默认情况下,dematerialize不会在特定调度程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public Flowable<Object> apply(Flowable<Object> f) throws Exception {
  3. return f.dematerialize();
  4. }
  5. });

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testErrorPassThru() {
  3. Exception exception = new Exception("test");
  4. Flowable<Integer> flowable = Flowable.error(exception);
  5. Flowable<Integer> dematerialize = flowable.dematerialize();
  6. Subscriber<Integer> subscriber = TestHelper.mockSubscriber();
  7. dematerialize.subscribe(subscriber);
  8. verify(subscriber, times(1)).onError(exception);
  9. verify(subscriber, times(0)).onComplete();
  10. verify(subscriber, times(0)).onNext(any(Integer.class));
  11. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testDematerialize2() {
  3. Throwable exception = new Throwable("test");
  4. Flowable<Integer> flowable = Flowable.error(exception);
  5. Flowable<Integer> dematerialize = flowable.materialize().dematerialize();
  6. Subscriber<Integer> subscriber = TestHelper.mockSubscriber();
  7. dematerialize.subscribe(subscriber);
  8. verify(subscriber, times(1)).onError(exception);
  9. verify(subscriber, times(0)).onComplete();
  10. verify(subscriber, times(0)).onNext(any(Integer.class));
  11. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testDematerialize3() {
  3. Exception exception = new Exception("test");
  4. Flowable<Integer> flowable = Flowable.error(exception);
  5. Flowable<Integer> dematerialize = flowable.materialize().dematerialize();
  6. Subscriber<Integer> subscriber = TestHelper.mockSubscriber();
  7. dematerialize.subscribe(subscriber);
  8. verify(subscriber, times(1)).onError(exception);
  9. verify(subscriber, times(0)).onComplete();
  10. verify(subscriber, times(0)).onNext(any(Integer.class));
  11. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testDematerialize1() {
  3. Flowable<Notification<Integer>> notifications = Flowable.just(1, 2).materialize();
  4. Flowable<Integer> dematerialize = notifications.dematerialize();
  5. Subscriber<Integer> subscriber = TestHelper.mockSubscriber();
  6. dematerialize.subscribe(subscriber);
  7. verify(subscriber, times(1)).onNext(1);
  8. verify(subscriber, times(1)).onNext(2);
  9. verify(subscriber, times(1)).onComplete();
  10. verify(subscriber, never()).onError(any(Throwable.class));
  11. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testCompletePassThru() {
  3. Flowable<Integer> flowable = Flowable.empty();
  4. Flowable<Integer> dematerialize = flowable.dematerialize();
  5. Subscriber<Integer> subscriber = TestHelper.mockSubscriber();
  6. TestSubscriber<Integer> ts = new TestSubscriber<Integer>(subscriber);
  7. dematerialize.subscribe(ts);
  8. System.out.println(ts.errors());
  9. verify(subscriber, never()).onError(any(Throwable.class));
  10. verify(subscriber, times(1)).onComplete();
  11. verify(subscriber, times(0)).onNext(any(Integer.class));
  12. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void simpleSelector() {
  3. Flowable<Notification<Integer>> notifications = Flowable.just(1, 2).materialize();
  4. Flowable<Integer> dematerialize = notifications.dematerialize(Functions.<Notification<Integer>>identity());
  5. Subscriber<Integer> subscriber = TestHelper.mockSubscriber();
  6. dematerialize.subscribe(subscriber);
  7. verify(subscriber, times(1)).onNext(1);
  8. verify(subscriber, times(1)).onNext(2);
  9. verify(subscriber, times(1)).onComplete();
  10. verify(subscriber, never()).onError(any(Throwable.class));
  11. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testMaterializeDematerializeChaining() {
  3. Flowable<Integer> obs = Flowable.just(1);
  4. Flowable<Integer> chained = obs.materialize()
  5. .dematerialize(Functions.<Notification<Integer>>identity());
  6. Subscriber<Integer> subscriber = TestHelper.mockSubscriber();
  7. chained.subscribe(subscriber);
  8. verify(subscriber, times(1)).onNext(1);
  9. verify(subscriber, times(1)).onComplete();
  10. verify(subscriber, times(0)).onError(any(Throwable.class));
  11. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testHonorsContractWhenThrows() {
  3. Flowable<Integer> source = Flowable.error(new TestException());
  4. Flowable<Integer> result = source.materialize().dematerialize();
  5. Subscriber<Integer> subscriber = TestHelper.mockSubscriber();
  6. result.subscribe(subscriber);
  7. verify(subscriber, never()).onNext(any(Integer.class));
  8. verify(subscriber, never()).onComplete();
  9. verify(subscriber).onError(any(TestException.class));
  10. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testHonorsContractWhenCompleted() {
  3. Flowable<Integer> source = Flowable.just(1);
  4. Flowable<Integer> result = source.materialize().dematerialize();
  5. Subscriber<Integer> subscriber = TestHelper.mockSubscriber();
  6. result.subscribe(subscriber);
  7. verify(subscriber).onNext(1);
  8. verify(subscriber).onComplete();
  9. verify(subscriber, never()).onError(any(Throwable.class));
  10. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void dispose() {
  3. TestHelper.checkDisposed(Flowable.just(Notification.createOnComplete()).dematerialize());
  4. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void selectorNull() {
  3. Flowable.just(1, 2)
  4. .materialize()
  5. .dematerialize(new Function<Notification<Integer>, Notification<Object>>() {
  6. @Override
  7. public Notification<Object> apply(Notification<Integer> v) throws Exception {
  8. return null;
  9. }
  10. })
  11. .test()
  12. .assertFailure(NullPointerException.class);
  13. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void selectorCrash() {
  3. Flowable.just(1, 2)
  4. .materialize()
  5. .dematerialize(new Function<Notification<Integer>, Notification<Object>>() {
  6. @Override
  7. public Notification<Object> apply(Notification<Integer> v) throws Exception {
  8. throw new TestException();
  9. }
  10. })
  11. .test()
  12. .assertFailure(TestException.class);
  13. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testErrorThrownIssue1685() {
  3. List<Throwable> errors = TestHelper.trackPluginErrors();
  4. try {
  5. FlowableProcessor<Object> processor = ReplayProcessor.create();
  6. Flowable.error(new RuntimeException("oops"))
  7. .materialize()
  8. .delay(1, TimeUnit.SECONDS)
  9. .dematerialize(Functions.<Notification<Object>>identity())
  10. .subscribe(processor);
  11. processor.subscribe();
  12. processor.materialize().blockingFirst();
  13. System.out.println("Done");
  14. TestHelper.assertError(errors, 0, OnErrorNotImplementedException.class);
  15. } finally {
  16. RxJavaPlugins.reset();
  17. }
  18. }

代码示例来源:origin: davidmoten/rxjava2-jdbc

  1. private Flowable<Tuple2<T1, T2>> build() {
  2. return Call.createWithTwoOutParameters(b.connection, b.sql, b.parameterGroups(), b.params, cls1, cls2) //
  3. .dematerialize();
  4. }
  5. }

代码示例来源:origin: davidmoten/rxjava2-jdbc

  1. private Flowable<Tuple3<T1, T2, T3>> build() {
  2. return Call
  3. .createWithThreeOutParameters(b.connection, b.sql, b.parameterGroups(), b.params, cls1, cls2, cls3) //
  4. .dematerialize();
  5. }
  6. }

代码示例来源:origin: davidmoten/rxjava2-jdbc

  1. private Flowable<CallableResultSetN> build() {
  2. return Call.createWithNResultSets(b.connection, b.sql, b.parameterGroups(), b.params, functions, 0) //
  3. .dematerialize();
  4. }
  5. }

代码示例来源:origin: davidmoten/rxjava2-jdbc

  1. private Flowable<T1> build() {
  2. return Call.createWithOneOutParameter(b.connection, b.sql, b.parameterGroups(), b.params, cls) //
  3. .dematerialize();
  4. }
  5. }

代码示例来源:origin: davidmoten/rxjava2-jdbc

  1. private Flowable<CallableResultSet2<T1, T2>> build() {
  2. return Call.createWithTwoResultSets(b.connection, b.sql, b.parameterGroups(), b.params, f1, f2, 0) //
  3. .dematerialize();
  4. }
  5. }

代码示例来源:origin: davidmoten/rxjava2-jdbc

  1. static Flowable<Integer> createWithZeroOutParameters(Single<Connection> connection, String sql,
  2. Flowable<List<Object>> parameterGroups, List<ParameterPlaceholder> parameterPlaceholders) {
  3. return connection.toFlowable()
  4. .flatMap(con -> Call.<Integer>createWithParameters(con, sql, parameterGroups, parameterPlaceholders,
  5. (stmt, parameters) -> createWithZeroOutParameters(stmt, parameters, parameterPlaceholders)))
  6. .dematerialize();
  7. }

相关文章

Flowable类方法