io.reactivex.Flowable.switchOnNextDelayError()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(4.2k)|赞(0)|评价(0)|浏览(131)

本文整理了Java中io.reactivex.Flowable.switchOnNextDelayError()方法的一些代码示例,展示了Flowable.switchOnNextDelayError()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.switchOnNextDelayError()方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:switchOnNextDelayError

Flowable.switchOnNextDelayError介绍

[英]Converts a Publisher that emits Publishers into a Publisher that emits the items emitted by the most recently emitted of those Publishers and delays any exception until all Publishers terminate.

switchOnNext subscribes to a Publisher that emits Publishers. Each time it observes one of these emitted Publishers, the Publisher returned by switchOnNext begins emitting the items emitted by that Publisher. When a new Publisher is emitted, switchOnNext stops emitting items from the earlier-emitted Publisher and begins emitting items from the new one.

The resulting Publisher completes if both the main Publisher and the last inner Publisher, if any, complete. If the main Publisher signals an onError, the termination of the last inner Publisher will emit that error as is or wrapped into a CompositeException along with the other possible errors the former inner Publishers signaled. Backpressure: The operator honors backpressure from downstream. The outer Publisher is consumed in an unbounded manner (i.e., without backpressure) and the inner Publishers are expected to honor backpressure but it is not enforced; the operator won't signal a MissingBackpressureExceptionbut the violation may lead to OutOfMemoryError due to internal buffer bloat. Scheduler: switchOnNextDelayError does not operate by default on a particular Scheduler.
[中]将发出发布服务器的发布服务器转换为发出最近发出的发布服务器发出的项目的发布服务器,并延迟任何异常,直到所有发布服务器终止。
switchOnNext订阅发出发布服务器的发布服务器。每次它观察其中一个发出的发布服务器时,switchOnNext返回的发布服务器开始发出该发布服务器发出的项目。当发出新发布服务器时,switchOnNext停止从先前发出的发布服务器发出项目,并开始从新发布服务器发出项目。
如果主发布服务器和最后一个内部发布服务器(如果有)都已完成,则生成的发布服务器将完成。如果主发布服务器发出onError信号,则最后一个内部发布服务器的终止将按原样发出该错误,或将该错误包装为CompositeException,以及前内部发布服务器发出的其他可能错误。背压:操作员接受来自下游的背压。外部发布服务器以无限制的方式使用(即,没有背压),内部发布服务器应遵守背压,但不强制执行;操作员不会发出MissingBackPressureException的信号,但由于内部缓冲区膨胀,违规可能导致OutOfMemoryError。调度程序:switchOnNextDelayError默认情况下不会在特定调度程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Flowable<T> switchOnNextDelayError(Publisher<? extends Publisher<? extends T>> sources) {
  return switchOnNextDelayError(sources, bufferSize());

代码示例来源:origin: redisson/redisson

@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Flowable<T> switchOnNextDelayError(Publisher<? extends Publisher<? extends T>> sources) {
  return switchOnNextDelayError(sources, bufferSize());

代码示例来源:origin: ReactiveX/RxJava

@Test
public void switchOnNextDelayError() {
  final List<Integer> list = new ArrayList<Integer>();
  Flowable<Integer> source = Flowable.range(1, 10).hide().doOnNext(new Consumer<Integer>() {
    @Override
    public void accept(Integer v) throws Exception {
      list.add(v);
    }
  });
  Flowable.switchOnNextDelayError(Flowable.just(source).hide())
  .test(1);
  assertEquals(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), list);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void switchOnNextDelayErrorPrefetch() {
  final List<Integer> list = new ArrayList<Integer>();
  Flowable<Integer> source = Flowable.range(1, 10).hide().doOnNext(new Consumer<Integer>() {
    @Override
    public void accept(Integer v) throws Exception {
      list.add(v);
    }
  });
  Flowable.switchOnNextDelayError(Flowable.just(source).hide(), 2)
  .test(1);
  assertEquals(Arrays.asList(1, 2, 3), list);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void scalarMapDelayError() {
  Flowable.switchOnNextDelayError(Flowable.just(Flowable.just(1)))
  .test()
  .assertResult(1);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void switchOnNextDelayErrorBufferSize() {
  PublishProcessor<Flowable<Integer>> pp = PublishProcessor.create();
  TestSubscriber<Integer> ts = Flowable.switchOnNextDelayError(pp, 2).test();
  pp.onNext(Flowable.just(1));
  pp.onNext(Flowable.range(2, 4));
  pp.onComplete();
  ts.assertResult(1, 2, 3, 4, 5);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void switchOnNextDelayErrorWithError() {
  PublishProcessor<Flowable<Integer>> pp = PublishProcessor.create();
  TestSubscriber<Integer> ts = Flowable.switchOnNextDelayError(pp).test();
  pp.onNext(Flowable.just(1));
  pp.onNext(Flowable.<Integer>error(new TestException()));
  pp.onNext(Flowable.range(2, 4));
  pp.onComplete();
  ts.assertFailure(TestException.class, 1, 2, 3, 4, 5);
}

相关文章

Flowable类方法