本文整理了Java中io.reactivex.Flowable.onBackpressureDrop()
方法的一些代码示例,展示了Flowable.onBackpressureDrop()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.onBackpressureDrop()
方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:onBackpressureDrop
[英]Instructs a Publisher that is emitting items faster than its Subscriber can consume them to discard, rather than emit, those items that its Subscriber is not prepared to observe.
If the downstream request count hits 0 then the Publisher will refrain from calling onNext until the Subscriber invokes request(n) again to increase the request count. Backpressure: The operator honors backpressure from downstream and consumes the source Publisher in an unbounded manner (i.e., not applying backpressure to it). Scheduler: onBackpressureDrop does not operate by default on a particular Scheduler.
[中]指示发送项目的速度快于其订阅者使用项目的速度的发布者丢弃而不是发送其订阅者不准备观察的项目。
如果下游请求计数达到0,则发布服务器将不调用onNext,直到订户再次调用请求(n)以增加请求计数。背压:操作员接受来自下游的背压,并以无限制的方式使用源发布服务器(即,不向其应用背压)。调度程序:onBackpressureDrop默认情况下不会在特定调度程序上运行。
代码示例来源:origin: ReactiveX/RxJava
@Override
public Publisher<Object> apply(Flowable<Object> f) throws Exception {
return f.onBackpressureDrop();
}
});
代码示例来源:origin: ReactiveX/RxJava
@Override
public Object apply(Flowable<Integer> f) throws Exception {
return f.onBackpressureDrop();
}
}, false, 1, 1, 1);
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void onBackpressureDropActionNull() {
just1.onBackpressureDrop(null);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void badRequest() {
TestHelper.assertBadRequestReported(Flowable.just(1).onBackpressureDrop());
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testRequestOverflow() throws InterruptedException {
final AtomicInteger count = new AtomicInteger();
int n = 10;
range(n).onBackpressureDrop().subscribe(new DefaultSubscriber<Long>() {
@Override
public void onStart() {
request(10);
}
@Override
public void onComplete() {
}
@Override
public void onError(Throwable e) {
throw new RuntimeException(e);
}
@Override
public void onNext(Long t) {
count.incrementAndGet();
//cause overflow of requested if not handled properly in onBackpressureDrop operator
request(Long.MAX_VALUE - 1);
}});
assertEquals(n, count.get());
}
代码示例来源:origin: ReactiveX/RxJava
@Test(timeout = 2000)
public void testTakeObserveOn() {
Subscriber<Object> subscriber = TestHelper.mockSubscriber();
TestSubscriber<Object> ts = new TestSubscriber<Object>(subscriber);
INFINITE_OBSERVABLE.onBackpressureDrop()
.observeOn(Schedulers.newThread()).take(1).subscribe(ts);
ts.awaitTerminalEvent();
ts.assertNoErrors();
verify(subscriber).onNext(1L);
verify(subscriber, never()).onNext(2L);
verify(subscriber).onComplete();
verify(subscriber, never()).onError(any(Throwable.class));
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void dispose() {
TestHelper.checkDisposed(Flowable.range(1, 5).window(1, TimeUnit.DAYS, Schedulers.single()).onBackpressureDrop());
TestHelper.checkDisposed(Flowable.range(1, 5).window(2, 1, TimeUnit.DAYS, Schedulers.single()).onBackpressureDrop());
TestHelper.checkDisposed(Flowable.range(1, 5).window(1, 2, TimeUnit.DAYS, Schedulers.single()).onBackpressureDrop());
TestHelper.checkDisposed(Flowable.never()
.window(1, TimeUnit.DAYS, Schedulers.single(), 2, true).onBackpressureDrop());
}
代码示例来源:origin: ReactiveX/RxJava
@Test(timeout = 10000)
public void testOnBackpressureDropSynchronous() {
for (int i = 0; i < 100; i++) {
int num = (int) (Flowable.bufferSize() * 1.1); // > 1 so that take doesn't prevent buffer overflow
AtomicInteger c = new AtomicInteger();
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
firehose(c).onBackpressureDrop()
.map(SLOW_PASS_THRU).take(num).subscribe(ts);
ts.awaitTerminalEvent();
ts.assertNoErrors();
List<Integer> onNextEvents = ts.values();
assertEquals(num, onNextEvents.size());
Integer lastEvent = onNextEvents.get(num - 1);
System.out.println("testOnBackpressureDrop => Received: " + onNextEvents.size() + " Emitted: " + c.get() + " Last value: " + lastEvent);
// it drop, so we should get some number far higher than what would have sequentially incremented
assertTrue(num - 1 <= lastEvent.intValue());
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test(timeout = 10000)
public void testOnBackpressureDropSynchronousWithAction() {
for (int i = 0; i < 100; i++) {
final AtomicInteger dropCount = new AtomicInteger();
int num = (int) (Flowable.bufferSize() * 1.1); // > 1 so that take doesn't prevent buffer overflow
AtomicInteger c = new AtomicInteger();
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
firehose(c).onBackpressureDrop(new Consumer<Integer>() {
@Override
public void accept(Integer j) {
dropCount.incrementAndGet();
}
})
.map(SLOW_PASS_THRU).take(num).subscribe(ts);
ts.awaitTerminalEvent();
ts.assertNoErrors();
List<Integer> onNextEvents = ts.values();
assertEquals(num, onNextEvents.size());
Integer lastEvent = onNextEvents.get(num - 1);
System.out.println("testOnBackpressureDrop => Received: " + onNextEvents.size() + " Dropped: " + dropCount.get() + " Emitted: " + c.get() + " Last value: " + lastEvent);
// it drop, so we should get some number far higher than what would have sequentially incremented
assertTrue(num - 1 <= lastEvent.intValue());
// no drop in synchronous mode
assertEquals(0, dropCount.get());
assertEquals(c.get(), onNextEvents.size());
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testNonFatalExceptionFromOverflowActionIsNotReportedFromUpstreamOperator() {
final AtomicBoolean errorOccurred = new AtomicBoolean(false);
//request 0
TestSubscriber<Long> ts = TestSubscriber.create(0);
//range method emits regardless of requests so should trigger onBackpressureDrop action
range(2)
// if haven't caught exception in onBackpressureDrop operator then would incorrectly
// be picked up by this call to doOnError
.doOnError(new Consumer<Throwable>() {
@Override
public void accept(Throwable t) {
errorOccurred.set(true);
}
})
.onBackpressureDrop(THROW_NON_FATAL)
.subscribe(ts);
assertFalse(errorOccurred.get());
}
代码示例来源:origin: ReactiveX/RxJava
@Test(timeout = 10000)
public void testOnBackpressureDrop() {
long t = System.currentTimeMillis();
for (int i = 0; i < 100; i++) {
// stop the test if we are getting close to the timeout because slow machines
// may not get through 100 iterations
if (System.currentTimeMillis() - t > TimeUnit.SECONDS.toMillis(9)) {
break;
}
int num = (int) (Flowable.bufferSize() * 1.1); // > 1 so that take doesn't prevent buffer overflow
AtomicInteger c = new AtomicInteger();
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
firehose(c).onBackpressureDrop()
.observeOn(Schedulers.computation())
.map(SLOW_PASS_THRU).take(num).subscribe(ts);
ts.awaitTerminalEvent();
ts.assertNoErrors();
List<Integer> onNextEvents = ts.values();
assertEquals(num, onNextEvents.size());
Integer lastEvent = onNextEvents.get(num - 1);
System.out.println("testOnBackpressureDrop => Received: " + onNextEvents.size() + " Emitted: " + c.get() + " Last value: " + lastEvent);
// it drop, so we should get some number far higher than what would have sequentially incremented
assertTrue(num - 1 <= lastEvent.intValue());
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test(timeout = 500)
public void testWithObserveOn() throws InterruptedException {
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
Flowable.range(0, Flowable.bufferSize() * 10).onBackpressureDrop().observeOn(Schedulers.io()).subscribe(ts);
ts.awaitTerminalEvent();
}
代码示例来源:origin: ReactiveX/RxJava
.onBackpressureDrop(new Consumer<Integer>() {
@Override
public void accept(Integer v) {
代码示例来源:origin: ReactiveX/RxJava
return f.onBackpressureDrop();
case LATEST:
return f.onBackpressureLatest();
代码示例来源:origin: ReactiveX/RxJava
infinite.subscribeOn(Schedulers.computation()).onBackpressureDrop().take(500).subscribe(ts);
代码示例来源:origin: redisson/redisson
return o.onBackpressureDrop();
case LATEST:
return o.onBackpressureLatest();
代码示例来源:origin: vert-x3/vertx-examples
@Override
public void start() throws Exception {
Router router = Router.router(vertx);
router.route().handler(BodyHandler.create());
router.route().handler(req -> req.response().putHeader("content-type", "text/html")
.end("<html><body><h1>Hello from vert.x!</h1></body></html>"));
HttpServer server = vertx.createHttpServer();
server.requestStream()
.toFlowable()
.map(HttpServerRequest::pause)
.onBackpressureDrop(req -> req.response().setStatusCode(503).end())
.observeOn(RxHelper.scheduler(vertx.getDelegate()))
.subscribe(req -> {
req.resume();
router.accept(req);
});
server.rxListen(PORT).subscribe(res -> generateRequests());
}
代码示例来源:origin: crazysunj/MultiTypeRecyclerViewAdapter
@SuppressLint("CheckResult")
@Override
protected void startRefresh(HandleBase<MultiHeaderEntity> refreshData) {
Flowable.just(refreshData)
.onBackpressureDrop()
.observeOn(Schedulers.io())
.map(handleBase -> handleRefresh(handleBase.getNewData(), handleBase.getNewHeader(), handleBase.getNewFooter(), handleBase.getLevel(), handleBase.getRefreshType()))
.observeOn(AndroidSchedulers.mainThread())
.subscribe(this::handleResult);
}
代码示例来源:origin: crazysunj/MultiTypeRecyclerViewAdapter
@SuppressLint("CheckResult")
@Override
protected void startRefresh(HandleBase<StickyItem> refreshData) {
Flowable.just(refreshData)
.onBackpressureDrop()
.observeOn(Schedulers.computation())
.map(handleBase -> handleRefresh(handleBase.getNewData(), handleBase.getNewHeader(), handleBase.getNewFooter(), handleBase.getLevel(), handleBase.getRefreshType()))
.observeOn(AndroidSchedulers.mainThread())
.subscribe(this::handleResult);
}
代码示例来源:origin: crazysunj/MultiTypeRecyclerViewAdapter
@SuppressLint("CheckResult")
@Override
protected void startRefresh(HandleBase<T> refreshData) {
Flowable.just(refreshData)
.onBackpressureDrop()
.observeOn(Schedulers.computation())
.map(handleBase -> handleRefresh(handleBase.getNewData(), handleBase.getNewHeader(), handleBase.getNewFooter(), handleBase.getLevel(), handleBase.getRefreshType()))
.observeOn(AndroidSchedulers.mainThread())
.subscribe(this::handleResult);
}
内容来源于网络,如有侵权,请联系作者删除!