io.reactivex.Flowable.onBackpressureBuffer()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(5.2k)|赞(0)|评价(0)|浏览(183)

本文整理了Java中io.reactivex.Flowable.onBackpressureBuffer()方法的一些代码示例,展示了Flowable.onBackpressureBuffer()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.onBackpressureBuffer()方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:onBackpressureBuffer

Flowable.onBackpressureBuffer介绍

[英]Instructs a Publisher that is emitting items faster than its Subscriber can consume them to buffer these items indefinitely until they can be emitted.

Backpressure: The operator honors backpressure from downstream and consumes the source Publisher in an unbounded manner (i.e., not applying backpressure to it). Scheduler: onBackpressureBuffer does not operate by default on a particular Scheduler.
[中]

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Flowable<Object> apply(Flowable<Object> f) throws Exception {
    return f.onBackpressureBuffer(8, Functions.EMPTY_ACTION, BackpressureOverflowStrategy.ERROR);
  }
});

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Object apply(Flowable<Object> f) throws Exception {
    return f.onBackpressureBuffer(8, Functions.EMPTY_ACTION, BackpressureOverflowStrategy.ERROR);
  }
}, false, 1, 1, 1);

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Publisher<Long> createPublisher(long elements) {
    return
      Flowable.intervalRange(0, elements, 0, 1, TimeUnit.MILLISECONDS)
      .onBackpressureBuffer()
    ;
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Publisher<Integer> createPublisher(long elements) {
    return
        Flowable.range(0, (int)elements).onBackpressureBuffer()
    ;
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Publisher<Long> createPublisher(long elements) {
    return
      Flowable.interval(0, 1, TimeUnit.MILLISECONDS).take(elements)
      .onBackpressureBuffer()
    ;
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void badRequest() {
  TestHelper.assertBadRequestReported(Flowable.just(1)
      .onBackpressureBuffer(16, Functions.EMPTY_ACTION, BackpressureOverflowStrategy.ERROR));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void dispose() {
  TestHelper.checkDisposed(Flowable.just(1)
      .onBackpressureBuffer(16, Functions.EMPTY_ACTION, BackpressureOverflowStrategy.ERROR));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void empty() {
  Flowable.empty()
  .onBackpressureBuffer(16, Functions.EMPTY_ACTION, BackpressureOverflowStrategy.ERROR)
  .test(0L)
  .assertResult();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void overflowError() {
  Flowable.range(1, 20)
  .onBackpressureBuffer(8, Functions.EMPTY_ACTION, BackpressureOverflowStrategy.ERROR)
  .test(0L)
  .assertFailure(MissingBackpressureException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void emptyDelayError() {
  Flowable.empty()
  .onBackpressureBuffer(true)
  .test()
  .assertResult();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void overflowCrashes() {
  Flowable.range(1, 20)
  .onBackpressureBuffer(8, new Action() {
    @Override
    public void run() throws Exception {
      throw new TestException();
    }
  }, BackpressureOverflowStrategy.DROP_OLDEST)
  .test(0L)
  .assertFailure(TestException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
  public void justTake() {
    Flowable.just(1)
    .onBackpressureBuffer(16, Functions.EMPTY_ACTION, BackpressureOverflowStrategy.ERROR)
    .take(1)
    .test()
    .assertResult(1);
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void error() {
  Flowable
  .error(new TestException())
  .onBackpressureBuffer(16, Functions.EMPTY_ACTION, BackpressureOverflowStrategy.ERROR)
  .test()
  .assertFailure(TestException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
  public void fusionRejected() {
    TestSubscriber<Integer> ts = SubscriberFusion.newTest(QueueFuseable.SYNC);

    Flowable.<Integer>never().onBackpressureBuffer().subscribe(ts);

    SubscriberFusion.assertFusion(ts, QueueFuseable.NONE)
    .assertEmpty();
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void maxSize() {
  TestSubscriber<Integer> ts = TestSubscriber.create(0);
  Flowable.range(1, 10).onBackpressureBuffer(1).subscribe(ts);
  ts.assertNoValues();
  ts.assertError(MissingBackpressureException.class);
  ts.assertNotComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void noDelayError() {
  Flowable.just(1).concatWith(Flowable.<Integer>error(new TestException()))
  .onBackpressureBuffer(false)
  .test(0L)
  .assertFailure(TestException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings({ "rawtypes", "unchecked" })
  @Override
  public Publisher<List<Long>> createPublisher(long elements) {
    return
      Flowable.fromIterable(iterate(elements))
      .window(Flowable.just(1).concatWith(Flowable.<Integer>never()))
      .onBackpressureBuffer()
      .flatMap((Function)Functions.identity())
    ;
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void delayError() {
  TestSubscriber<Integer> ts = Flowable.just(1).concatWith(Flowable.<Integer>error(new TestException()))
  .onBackpressureBuffer(true)
  .test(0L)
  .assertEmpty();
  ts.request(1);
  ts.assertFailure(TestException.class, 1);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void fusedNormal() {
  TestSubscriber<Integer> ts = SubscriberFusion.newTest(QueueFuseable.ANY);
  Flowable.range(1, 10).onBackpressureBuffer().subscribe(ts);
  ts.assertOf(SubscriberFusion.<Integer>assertFuseable())
   .assertOf(SubscriberFusion.<Integer>assertFusionMode(QueueFuseable.ASYNC))
   .assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void delayErrorBuffer() {
  TestSubscriber<Integer> ts = Flowable.just(1).concatWith(Flowable.<Integer>error(new TestException()))
  .onBackpressureBuffer(16, true)
  .test(0L)
  .assertEmpty();
  ts.request(1);
  ts.assertFailure(TestException.class, 1);
}

相关文章

Flowable类方法