io.reactivex.Flowable.throttleLatest()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(4.3k)|赞(0)|评价(0)|浏览(230)

本文整理了Java中io.reactivex.Flowable.throttleLatest()方法的一些代码示例,展示了Flowable.throttleLatest()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.throttleLatest()方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:throttleLatest

Flowable.throttleLatest介绍

[英]Throttles items from the upstream Flowable by first emitting the next item from upstream, then periodically emitting the latest item (if any) when the specified timeout elapses between them.

Unlike the option with #throttleLatest(long,TimeUnit,boolean), the very last item being held back (if any) is not emitted when the upstream completes.

If no items were emitted from the upstream during this timeout phase, the next upstream item is emitted immediately and the timeout window starts from then. Backpressure: This operator does not support backpressure as it uses time to control data flow. If the downstream is not ready to receive items, a io.reactivex.exceptions.MissingBackpressureExceptionwill be signaled. Scheduler: throttleLatest operates by default on the computation Scheduler.

History: 2.1.14 - experimental
[中]通过首先从上游发送下一个项目,然后在指定的超时时间过后定期发送最新的项目(如果有),来限制来自上游的项目的流量。
与带有#throttleTest(long、TimeUnit、boolean)的选项不同,在上游完成时不会发出最后一个被保留的项(如果有)。
如果在此超时阶段没有从上游发出任何项,则会立即发出下一个上游项,超时窗口从此开始。背压:此运算符不支持背压,因为它使用时间来控制数据流。如果下游未准备好接收项目,则发送io。reactivex。例外情况。丢失背压异常将发出信号。调度程序:默认情况下,ThrottleTest在计算调度程序上运行。
历史:2.1.14-实验性

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Publisher<Object> apply(Flowable<Object> f) throws Exception {
    return f.throttleLatest(1, TimeUnit.MINUTES);
  }
});

代码示例来源:origin: ReactiveX/RxJava

@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Flowable<T> throttleLatest(long timeout, TimeUnit unit, Scheduler scheduler) {
  return throttleLatest(timeout, unit, scheduler, false);

代码示例来源:origin: ReactiveX/RxJava

@SchedulerSupport(SchedulerSupport.COMPUTATION)
public final Flowable<T> throttleLatest(long timeout, TimeUnit unit) {
  return throttleLatest(timeout, unit, Schedulers.computation(), false);

代码示例来源:origin: ReactiveX/RxJava

@SchedulerSupport(SchedulerSupport.COMPUTATION)
public final Flowable<T> throttleLatest(long timeout, TimeUnit unit, boolean emitLast) {
  return throttleLatest(timeout, unit, Schedulers.computation(), emitLast);

代码示例来源:origin: ReactiveX/RxJava

@Test
public void badRequest() {
  TestHelper.assertBadRequestReported(
      Flowable.never()
      .throttleLatest(1, TimeUnit.MINUTES)
  );
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void just() {
  Flowable.just(1)
  .throttleLatest(1, TimeUnit.MINUTES)
  .test()
  .assertResult(1);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void range() {
  Flowable.range(1, 5)
  .throttleLatest(1, TimeUnit.MINUTES)
  .test()
  .assertResult(1);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void rangeEmitLatest() {
  Flowable.range(1, 5)
  .throttleLatest(1, TimeUnit.MINUTES, true)
  .test()
  .assertResult(1, 5);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void missingBackpressureExceptionFirst() throws Exception {
  TestScheduler sch = new TestScheduler();
  Action onCancel = mock(Action.class);
  Flowable.just(1, 2)
  .doOnCancel(onCancel)
  .throttleLatest(1, TimeUnit.MINUTES, sch)
  .test(0)
  .assertFailure(MissingBackpressureException.class);
  verify(onCancel).run();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void take() throws Exception {
  Action onCancel = mock(Action.class);
  Flowable.range(1, 5)
  .doOnCancel(onCancel)
  .throttleLatest(1, TimeUnit.MINUTES)
  .take(1)
  .test()
  .assertResult(1);
  verify(onCancel).run();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void error() {
  Flowable.error(new TestException())
  .throttleLatest(1, TimeUnit.MINUTES)
  .test()
  .assertFailure(TestException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void missingBackpressureExceptionLatest() throws Exception {
  TestScheduler sch = new TestScheduler();
  Action onCancel = mock(Action.class);
  TestSubscriber<Integer> ts = Flowable.just(1, 2)
  .concatWith(Flowable.<Integer>never())
  .doOnCancel(onCancel)
  .throttleLatest(1, TimeUnit.SECONDS, sch, true)
  .test(1);
  sch.advanceTimeBy(1, TimeUnit.SECONDS);
  ts.assertFailure(MissingBackpressureException.class, 1);
  verify(onCancel).run();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void missingBackpressureExceptionLatestComplete() throws Exception {
  TestScheduler sch = new TestScheduler();
  Action onCancel = mock(Action.class);
  PublishProcessor<Integer> pp = PublishProcessor.create();
  TestSubscriber<Integer> ts = pp
  .doOnCancel(onCancel)
  .throttleLatest(1, TimeUnit.SECONDS, sch, true)
  .test(1);
  pp.onNext(1);
  pp.onNext(2);
  ts.assertValuesOnly(1);
  pp.onComplete();
  ts.assertFailure(MissingBackpressureException.class, 1);
  verify(onCancel, never()).run();
}

相关文章

Flowable类方法