本文整理了Java中io.reactivex.Flowable.rebatchRequests()
方法的一些代码示例,展示了Flowable.rebatchRequests()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.rebatchRequests()
方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:rebatchRequests
[英]Requests n initially from the upstream and then 75% of n subsequently after 75% of n values have been emitted to the downstream.
This operator allows preventing the downstream to trigger unbounded mode via request(Long.MAX_VALUE)or compensate for the per-item overhead of small and frequent requests. Backpressure: The operator expects backpressure from upstream and honors backpressure from downstream. Scheduler: rebatchRequests does not operate by default on a particular Scheduler.
[中]首先从上游请求n,然后在n值的75%被发送到下游之后,请求n的75%。
此运算符允许阻止下游通过请求(Long.MAX_值)触发无界模式,或补偿小请求和频繁请求的每项开销。背压:操作员期望来自上游的背压,并尊重来自下游的背压。调度程序:默认情况下,rebatchRequests不会在特定调度程序上运行。
代码示例来源:origin: ReactiveX/RxJava
@Override
public Publisher<Integer> createPublisher(long elements) {
return
Flowable.range(0, (int)elements).rebatchRequests(2)
;
}
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test
public void skipBackpressure() {
Flowable.range(1, 10)
.buffer(2, 3)
.rebatchRequests(1)
.test()
.assertResult(Arrays.asList(1, 2), Arrays.asList(4, 5), Arrays.asList(7, 8), Arrays.asList(10));
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void rebatchRequestsArgumentCheck() {
try {
Flowable.never().rebatchRequests(-99);
fail("Didn't throw IAE");
} catch (IllegalArgumentException ex) {
assertEquals("bufferSize > 0 required but it was -99", ex.getMessage());
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void slowPathRebatch() {
Flowable.rangeLong(1L, 5L)
.rebatchRequests(1)
.test()
.assertResult(1L, 2L, 3L, 4L, 5L);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void slowPathRebatch() {
Flowable.range(1, 5)
.rebatchRequests(1)
.test()
.assertResult(1, 2, 3, 4, 5);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void normalMaxConcurrent1Backpressured() {
Flowable.range(1, 10)
.flatMapMaybe(new Function<Integer, MaybeSource<Integer>>() {
@Override
public MaybeSource<Integer> apply(Integer v) throws Exception {
return Maybe.just(v);
}
}, false, 1)
.rebatchRequests(1)
.test()
.assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void normalBackpressured() {
Flowable.range(1, 10)
.flatMapMaybe(new Function<Integer, MaybeSource<Integer>>() {
@Override
public MaybeSource<Integer> apply(Integer v) throws Exception {
return Maybe.just(v);
}
})
.rebatchRequests(1)
.test()
.assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void oneByOne() {
Flowable.range(1, 3).hide()
.flatMapIterable(Functions.justFunction(Arrays.asList(1)), 1)
.rebatchRequests(1)
.test()
.assertResult(1, 1, 1);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void normalBackpressured() {
Flowable.range(1, 10)
.flatMapSingle(new Function<Integer, SingleSource<Integer>>() {
@Override
public SingleSource<Integer> apply(Integer v) throws Exception {
return Single.just(v);
}
})
.rebatchRequests(1)
.test()
.assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void normalMaxConcurrent2Backpressured() {
Flowable.range(1, 10)
.flatMapSingle(new Function<Integer, SingleSource<Integer>>() {
@Override
public SingleSource<Integer> apply(Integer v) throws Exception {
return Single.just(v);
}
}, false, 2)
.rebatchRequests(1)
.test()
.assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void normalMaxConcurrent2Backpressured() {
Flowable.range(1, 10)
.flatMapMaybe(new Function<Integer, MaybeSource<Integer>>() {
@Override
public MaybeSource<Integer> apply(Integer v) throws Exception {
return Maybe.just(v);
}
}, false, 2)
.rebatchRequests(1)
.test()
.assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void rangeDelayErrorBackpressure2() {
Flowable.range(1, 3)
.parallel(1)
.sequentialDelayError(1)
.rebatchRequests(1)
.test()
.assertResult(1, 2, 3);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void conditionalOneByOne() {
Flowable.fromArray(new Integer[] { 1, 2, 3, 4, 5 })
.filter(Functions.alwaysTrue())
.rebatchRequests(1)
.test()
.assertResult(1, 2, 3, 4, 5);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void conditionalSlowPathRebatch() {
Flowable.rangeLong(1L, 5L)
.filter(Functions.alwaysTrue())
.rebatchRequests(1)
.test()
.assertResult(1L, 2L, 3L, 4L, 5L);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void oneByOne() {
Flowable.range(1, 10)
.publish(Functions.<Flowable<Integer>>identity())
.rebatchRequests(1)
.test()
.assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void conditionalSlowPathRebatch() {
Flowable.range(1, 5)
.filter(Functions.alwaysTrue())
.rebatchRequests(1)
.test()
.assertResult(1, 2, 3, 4, 5);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void rangeDelayErrorBackpressure() {
Flowable.range(1, 3)
.parallel(1)
.sequentialDelayError(1)
.take(2)
.rebatchRequests(1)
.test()
.assertResult(1, 2);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void syncFusedRequestOneByOneConditional() {
Flowable.range(1, 5)
.observeOn(ImmediateThinScheduler.INSTANCE)
.filter(Functions.alwaysTrue())
.rebatchRequests(1)
.test()
.assertResult(1, 2, 3, 4, 5);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void normalConditionalLong2() {
Flowable.fromIterable(new CrashingIterable(100, 10 * 1000 * 1000, 10 * 1000 * 1000))
.filter(Functions.alwaysTrue())
.rebatchRequests(128)
.take(1000 * 1000)
.test()
.assertSubscribed()
.assertValueCount(1000 * 1000)
.assertNoErrors()
.assertComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void innerLong() {
int n = Flowable.bufferSize() * 2;
Flowable.just(1).hide()
.concatMapEager(Functions.justFunction(Flowable.range(1, n).hide()))
.rebatchRequests(1)
.test()
.assertValueCount(n)
.assertComplete()
.assertNoErrors();
}
内容来源于网络,如有侵权,请联系作者删除!