io.reactivex.Flowable.rebatchRequests()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(5.6k)|赞(0)|评价(0)|浏览(191)

本文整理了Java中io.reactivex.Flowable.rebatchRequests()方法的一些代码示例,展示了Flowable.rebatchRequests()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.rebatchRequests()方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:rebatchRequests

Flowable.rebatchRequests介绍

[英]Requests n initially from the upstream and then 75% of n subsequently after 75% of n values have been emitted to the downstream.

This operator allows preventing the downstream to trigger unbounded mode via request(Long.MAX_VALUE)or compensate for the per-item overhead of small and frequent requests. Backpressure: The operator expects backpressure from upstream and honors backpressure from downstream. Scheduler: rebatchRequests does not operate by default on a particular Scheduler.
[中]首先从上游请求n,然后在n值的75%被发送到下游之后,请求n的75%。
此运算符允许阻止下游通过请求(Long.MAX_值)触发无界模式,或补偿小请求和频繁请求的每项开销。背压:操作员期望来自上游的背压,并尊重来自下游的背压。调度程序:默认情况下,rebatchRequests不会在特定调度程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Publisher<Integer> createPublisher(long elements) {
    return
        Flowable.range(0, (int)elements).rebatchRequests(2)
    ;
  }
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unchecked")
@Test
public void skipBackpressure() {
  Flowable.range(1, 10)
  .buffer(2, 3)
  .rebatchRequests(1)
  .test()
  .assertResult(Arrays.asList(1, 2), Arrays.asList(4, 5), Arrays.asList(7, 8), Arrays.asList(10));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void rebatchRequestsArgumentCheck() {
  try {
    Flowable.never().rebatchRequests(-99);
    fail("Didn't throw IAE");
  } catch (IllegalArgumentException ex) {
    assertEquals("bufferSize > 0 required but it was -99", ex.getMessage());
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void slowPathRebatch() {
  Flowable.rangeLong(1L, 5L)
  .rebatchRequests(1)
  .test()
  .assertResult(1L, 2L, 3L, 4L, 5L);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void slowPathRebatch() {
  Flowable.range(1, 5)
  .rebatchRequests(1)
  .test()
  .assertResult(1, 2, 3, 4, 5);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void normalMaxConcurrent1Backpressured() {
  Flowable.range(1, 10)
  .flatMapMaybe(new Function<Integer, MaybeSource<Integer>>() {
    @Override
    public MaybeSource<Integer> apply(Integer v) throws Exception {
      return Maybe.just(v);
    }
  }, false, 1)
  .rebatchRequests(1)
  .test()
  .assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void normalBackpressured() {
  Flowable.range(1, 10)
  .flatMapMaybe(new Function<Integer, MaybeSource<Integer>>() {
    @Override
    public MaybeSource<Integer> apply(Integer v) throws Exception {
      return Maybe.just(v);
    }
  })
  .rebatchRequests(1)
  .test()
  .assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void oneByOne() {
  Flowable.range(1, 3).hide()
  .flatMapIterable(Functions.justFunction(Arrays.asList(1)), 1)
  .rebatchRequests(1)
  .test()
  .assertResult(1, 1, 1);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void normalBackpressured() {
  Flowable.range(1, 10)
  .flatMapSingle(new Function<Integer, SingleSource<Integer>>() {
    @Override
    public SingleSource<Integer> apply(Integer v) throws Exception {
      return Single.just(v);
    }
  })
  .rebatchRequests(1)
  .test()
  .assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void normalMaxConcurrent2Backpressured() {
  Flowable.range(1, 10)
  .flatMapSingle(new Function<Integer, SingleSource<Integer>>() {
    @Override
    public SingleSource<Integer> apply(Integer v) throws Exception {
      return Single.just(v);
    }
  }, false, 2)
  .rebatchRequests(1)
  .test()
  .assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void normalMaxConcurrent2Backpressured() {
  Flowable.range(1, 10)
  .flatMapMaybe(new Function<Integer, MaybeSource<Integer>>() {
    @Override
    public MaybeSource<Integer> apply(Integer v) throws Exception {
      return Maybe.just(v);
    }
  }, false, 2)
  .rebatchRequests(1)
  .test()
  .assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void rangeDelayErrorBackpressure2() {
  Flowable.range(1, 3)
  .parallel(1)
  .sequentialDelayError(1)
  .rebatchRequests(1)
  .test()
  .assertResult(1, 2, 3);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void conditionalOneByOne() {
  Flowable.fromArray(new Integer[] { 1, 2, 3, 4, 5 })
  .filter(Functions.alwaysTrue())
  .rebatchRequests(1)
  .test()
  .assertResult(1, 2, 3, 4, 5);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void conditionalSlowPathRebatch() {
  Flowable.rangeLong(1L, 5L)
  .filter(Functions.alwaysTrue())
  .rebatchRequests(1)
  .test()
  .assertResult(1L, 2L, 3L, 4L, 5L);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void oneByOne() {
  Flowable.range(1, 10)
  .publish(Functions.<Flowable<Integer>>identity())
  .rebatchRequests(1)
  .test()
  .assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void conditionalSlowPathRebatch() {
  Flowable.range(1, 5)
  .filter(Functions.alwaysTrue())
  .rebatchRequests(1)
  .test()
  .assertResult(1, 2, 3, 4, 5);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void rangeDelayErrorBackpressure() {
  Flowable.range(1, 3)
  .parallel(1)
  .sequentialDelayError(1)
  .take(2)
  .rebatchRequests(1)
  .test()
  .assertResult(1, 2);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void syncFusedRequestOneByOneConditional() {
  Flowable.range(1, 5)
  .observeOn(ImmediateThinScheduler.INSTANCE)
  .filter(Functions.alwaysTrue())
  .rebatchRequests(1)
  .test()
  .assertResult(1, 2, 3, 4, 5);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void normalConditionalLong2() {
  Flowable.fromIterable(new CrashingIterable(100, 10 * 1000 * 1000, 10 * 1000 * 1000))
  .filter(Functions.alwaysTrue())
  .rebatchRequests(128)
  .take(1000 * 1000)
  .test()
  .assertSubscribed()
  .assertValueCount(1000 * 1000)
  .assertNoErrors()
  .assertComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void innerLong() {
  int n = Flowable.bufferSize() * 2;
  Flowable.just(1).hide()
  .concatMapEager(Functions.justFunction(Flowable.range(1, n).hide()))
  .rebatchRequests(1)
  .test()
  .assertValueCount(n)
  .assertComplete()
  .assertNoErrors();
}

相关文章

Flowable类方法