本文整理了Java中io.reactivex.Flowable.onBackpressureBuffer()
方法的一些代码示例,展示了Flowable.onBackpressureBuffer()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.onBackpressureBuffer()
方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:onBackpressureBuffer
[英]Instructs a Publisher that is emitting items faster than its Subscriber can consume them to buffer these items indefinitely until they can be emitted.
Backpressure: The operator honors backpressure from downstream and consumes the source Publisher in an unbounded manner (i.e., not applying backpressure to it). Scheduler: onBackpressureBuffer does not operate by default on a particular Scheduler.
[中]
代码示例来源:origin: ReactiveX/RxJava
@Override
public Flowable<Object> apply(Flowable<Object> f) throws Exception {
return f.onBackpressureBuffer(8, Functions.EMPTY_ACTION, BackpressureOverflowStrategy.ERROR);
}
});
代码示例来源:origin: ReactiveX/RxJava
@Override
public Object apply(Flowable<Object> f) throws Exception {
return f.onBackpressureBuffer(8, Functions.EMPTY_ACTION, BackpressureOverflowStrategy.ERROR);
}
}, false, 1, 1, 1);
代码示例来源:origin: ReactiveX/RxJava
@Override
public Publisher<Long> createPublisher(long elements) {
return
Flowable.intervalRange(0, elements, 0, 1, TimeUnit.MILLISECONDS)
.onBackpressureBuffer()
;
}
}
代码示例来源:origin: ReactiveX/RxJava
@Override
public Publisher<Integer> createPublisher(long elements) {
return
Flowable.range(0, (int)elements).onBackpressureBuffer()
;
}
}
代码示例来源:origin: ReactiveX/RxJava
@Override
public Publisher<Long> createPublisher(long elements) {
return
Flowable.interval(0, 1, TimeUnit.MILLISECONDS).take(elements)
.onBackpressureBuffer()
;
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void badRequest() {
TestHelper.assertBadRequestReported(Flowable.just(1)
.onBackpressureBuffer(16, Functions.EMPTY_ACTION, BackpressureOverflowStrategy.ERROR));
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void dispose() {
TestHelper.checkDisposed(Flowable.just(1)
.onBackpressureBuffer(16, Functions.EMPTY_ACTION, BackpressureOverflowStrategy.ERROR));
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void empty() {
Flowable.empty()
.onBackpressureBuffer(16, Functions.EMPTY_ACTION, BackpressureOverflowStrategy.ERROR)
.test(0L)
.assertResult();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void overflowError() {
Flowable.range(1, 20)
.onBackpressureBuffer(8, Functions.EMPTY_ACTION, BackpressureOverflowStrategy.ERROR)
.test(0L)
.assertFailure(MissingBackpressureException.class);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void emptyDelayError() {
Flowable.empty()
.onBackpressureBuffer(true)
.test()
.assertResult();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void overflowCrashes() {
Flowable.range(1, 20)
.onBackpressureBuffer(8, new Action() {
@Override
public void run() throws Exception {
throw new TestException();
}
}, BackpressureOverflowStrategy.DROP_OLDEST)
.test(0L)
.assertFailure(TestException.class);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void justTake() {
Flowable.just(1)
.onBackpressureBuffer(16, Functions.EMPTY_ACTION, BackpressureOverflowStrategy.ERROR)
.take(1)
.test()
.assertResult(1);
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void error() {
Flowable
.error(new TestException())
.onBackpressureBuffer(16, Functions.EMPTY_ACTION, BackpressureOverflowStrategy.ERROR)
.test()
.assertFailure(TestException.class);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void fusionRejected() {
TestSubscriber<Integer> ts = SubscriberFusion.newTest(QueueFuseable.SYNC);
Flowable.<Integer>never().onBackpressureBuffer().subscribe(ts);
SubscriberFusion.assertFusion(ts, QueueFuseable.NONE)
.assertEmpty();
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void maxSize() {
TestSubscriber<Integer> ts = TestSubscriber.create(0);
Flowable.range(1, 10).onBackpressureBuffer(1).subscribe(ts);
ts.assertNoValues();
ts.assertError(MissingBackpressureException.class);
ts.assertNotComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void noDelayError() {
Flowable.just(1).concatWith(Flowable.<Integer>error(new TestException()))
.onBackpressureBuffer(false)
.test(0L)
.assertFailure(TestException.class);
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings({ "rawtypes", "unchecked" })
@Override
public Publisher<List<Long>> createPublisher(long elements) {
return
Flowable.fromIterable(iterate(elements))
.window(Flowable.just(1).concatWith(Flowable.<Integer>never()))
.onBackpressureBuffer()
.flatMap((Function)Functions.identity())
;
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void delayError() {
TestSubscriber<Integer> ts = Flowable.just(1).concatWith(Flowable.<Integer>error(new TestException()))
.onBackpressureBuffer(true)
.test(0L)
.assertEmpty();
ts.request(1);
ts.assertFailure(TestException.class, 1);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void fusedNormal() {
TestSubscriber<Integer> ts = SubscriberFusion.newTest(QueueFuseable.ANY);
Flowable.range(1, 10).onBackpressureBuffer().subscribe(ts);
ts.assertOf(SubscriberFusion.<Integer>assertFuseable())
.assertOf(SubscriberFusion.<Integer>assertFusionMode(QueueFuseable.ASYNC))
.assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void delayErrorBuffer() {
TestSubscriber<Integer> ts = Flowable.just(1).concatWith(Flowable.<Integer>error(new TestException()))
.onBackpressureBuffer(16, true)
.test(0L)
.assertEmpty();
ts.request(1);
ts.assertFailure(TestException.class, 1);
}
内容来源于网络,如有侵权,请联系作者删除!