
x33g5p2x  于2022-01-19 转载在 其他  



[英]Requests n initially from the upstream and then 75% of n subsequently after 75% of n values have been emitted to the downstream.

This operator allows preventing the downstream to trigger unbounded mode via request(Long.MAX_VALUE)or compensate for the per-item overhead of small and frequent requests. Backpressure: The operator expects backpressure from upstream and honors backpressure from downstream. Scheduler: rebatchRequests does not operate by default on a particular Scheduler.


代码示例来源:origin: ReactiveX/RxJava

  public Publisher<Integer> createPublisher(long elements) {
        Flowable.range(0, (int)elements).rebatchRequests(2)

代码示例来源:origin: ReactiveX/RxJava

public void skipBackpressure() {
  Flowable.range(1, 10)
  .buffer(2, 3)
  .assertResult(Arrays.asList(1, 2), Arrays.asList(4, 5), Arrays.asList(7, 8), Arrays.asList(10));

代码示例来源:origin: ReactiveX/RxJava

public void rebatchRequestsArgumentCheck() {
  try {
    fail("Didn't throw IAE");
  } catch (IllegalArgumentException ex) {
    assertEquals("bufferSize > 0 required but it was -99", ex.getMessage());

代码示例来源:origin: ReactiveX/RxJava

public void slowPathRebatch() {
  Flowable.rangeLong(1L, 5L)
  .assertResult(1L, 2L, 3L, 4L, 5L);

代码示例来源:origin: ReactiveX/RxJava

public void slowPathRebatch() {
  Flowable.range(1, 5)
  .assertResult(1, 2, 3, 4, 5);

代码示例来源:origin: ReactiveX/RxJava

public void normalMaxConcurrent1Backpressured() {
  Flowable.range(1, 10)
  .flatMapMaybe(new Function<Integer, MaybeSource<Integer>>() {
    public MaybeSource<Integer> apply(Integer v) throws Exception {
      return Maybe.just(v);
  }, false, 1)
  .assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

代码示例来源:origin: ReactiveX/RxJava

public void normalBackpressured() {
  Flowable.range(1, 10)
  .flatMapMaybe(new Function<Integer, MaybeSource<Integer>>() {
    public MaybeSource<Integer> apply(Integer v) throws Exception {
      return Maybe.just(v);
  .assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

代码示例来源:origin: ReactiveX/RxJava

public void oneByOne() {
  Flowable.range(1, 3).hide()
  .flatMapIterable(Functions.justFunction(Arrays.asList(1)), 1)
  .assertResult(1, 1, 1);

代码示例来源:origin: ReactiveX/RxJava

public void normalBackpressured() {
  Flowable.range(1, 10)
  .flatMapSingle(new Function<Integer, SingleSource<Integer>>() {
    public SingleSource<Integer> apply(Integer v) throws Exception {
      return Single.just(v);
  .assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

代码示例来源:origin: ReactiveX/RxJava

public void normalMaxConcurrent2Backpressured() {
  Flowable.range(1, 10)
  .flatMapSingle(new Function<Integer, SingleSource<Integer>>() {
    public SingleSource<Integer> apply(Integer v) throws Exception {
      return Single.just(v);
  }, false, 2)
  .assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

代码示例来源:origin: ReactiveX/RxJava

public void normalMaxConcurrent2Backpressured() {
  Flowable.range(1, 10)
  .flatMapMaybe(new Function<Integer, MaybeSource<Integer>>() {
    public MaybeSource<Integer> apply(Integer v) throws Exception {
      return Maybe.just(v);
  }, false, 2)
  .assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

代码示例来源:origin: ReactiveX/RxJava

public void rangeDelayErrorBackpressure2() {
  Flowable.range(1, 3)
  .assertResult(1, 2, 3);

代码示例来源:origin: ReactiveX/RxJava

public void conditionalOneByOne() {
  Flowable.fromArray(new Integer[] { 1, 2, 3, 4, 5 })
  .assertResult(1, 2, 3, 4, 5);

代码示例来源:origin: ReactiveX/RxJava

public void conditionalSlowPathRebatch() {
  Flowable.rangeLong(1L, 5L)
  .assertResult(1L, 2L, 3L, 4L, 5L);

代码示例来源:origin: ReactiveX/RxJava

public void oneByOne() {
  Flowable.range(1, 10)
  .assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

代码示例来源:origin: ReactiveX/RxJava

public void conditionalSlowPathRebatch() {
  Flowable.range(1, 5)
  .assertResult(1, 2, 3, 4, 5);

代码示例来源:origin: ReactiveX/RxJava

public void rangeDelayErrorBackpressure() {
  Flowable.range(1, 3)
  .assertResult(1, 2);

代码示例来源:origin: ReactiveX/RxJava

public void syncFusedRequestOneByOneConditional() {
  Flowable.range(1, 5)
  .assertResult(1, 2, 3, 4, 5);

代码示例来源:origin: ReactiveX/RxJava

public void normalConditionalLong2() {
  Flowable.fromIterable(new CrashingIterable(100, 10 * 1000 * 1000, 10 * 1000 * 1000))
  .take(1000 * 1000)
  .assertValueCount(1000 * 1000)

代码示例来源:origin: ReactiveX/RxJava

public void innerLong() {
  int n = Flowable.bufferSize() * 2;
  .concatMapEager(Functions.justFunction(Flowable.range(1, n).hide()))

