本文整理了Java中io.reactivex.Flowable.doOnRequest()
方法的一些代码示例,展示了Flowable.doOnRequest()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.doOnRequest()
方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:doOnRequest
[英]Modifies the source Publisher so that it invokes the given action when it receives a request for more items.
Note: This operator is for tracing the internal behavior of back-pressure request patterns and generally intended for debugging use. Backpressure: The operator doesn't interfere with backpressure which is determined by the source Publisher's backpressure behavior. Scheduler: doOnRequest does not operate by default on a particular Scheduler.
[中]修改源发布服务器,使其在收到更多项目的请求时调用给定的操作。
注意:此操作符用于跟踪背压请求模式的内部行为,通常用于调试。背压:操作员不会干扰由源发布者的背压行为确定的背压。调度器:默认情况下,doOnRequest不会在特定的调度器上运行。
代码示例来源:origin: ReactiveX/RxJava
@Override
public Flowable<Long> apply(Long t) {
return Flowable.fromIterable(Arrays.asList(1L, 2L, 3L))
.doOnRequest(new LongConsumer() {
@Override
public void accept(long v) {
requests.add(v);
}
});
}
}).take(3)).subscribe(ts);
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void doOnRequestNull() {
just1.doOnRequest(null);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testSingleDoesNotRequestMoreThanItNeedsToEmitItem() {
final AtomicLong request = new AtomicLong();
Flowable.just(1).doOnRequest(new LongConsumer() {
@Override
public void accept(long n) {
request.addAndGet(n);
}
}).blockingSingle();
// FIXME single now triggers fast-path
assertEquals(Long.MAX_VALUE, request.get());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testSingleDoesNotRequestMoreThanItNeedsToEmitErrorFromEmpty() {
final AtomicLong request = new AtomicLong();
try {
Flowable.empty().doOnRequest(new LongConsumer() {
@Override
public void accept(long n) {
request.addAndGet(n);
}
}).blockingSingle();
} catch (NoSuchElementException e) {
// FIXME single now triggers fast-path
assertEquals(Long.MAX_VALUE, request.get());
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testSingleDoesNotRequestMoreThanItNeedsToEmitErrorFromMoreThanOne() {
final AtomicLong request = new AtomicLong();
try {
Flowable.just(1, 2).doOnRequest(new LongConsumer() {
@Override
public void accept(long n) {
request.addAndGet(n);
}
}).blockingSingle();
} catch (IllegalArgumentException e) {
// FIXME single now triggers fast-path
assertEquals(Long.MAX_VALUE, request.get());
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void longerSequence() {
Flowable.range(1, 6)
.doOnRequest(this)
.limit(5)
.test()
.assertResult(1, 2, 3, 4, 5);
assertEquals(5, requests.get(0).intValue());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void shorterSequence() {
Flowable.range(1, 5)
.doOnRequest(this)
.limit(6)
.test()
.assertResult(1, 2, 3, 4, 5);
assertEquals(6, requests.get(0).intValue());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void exactSequence() {
Flowable.range(1, 5)
.doOnRequest(this)
.doOnCancel(this)
.limit(5)
.test()
.assertResult(1, 2, 3, 4, 5);
assertEquals(2, requests.size());
assertEquals(5, requests.get(0).intValue());
assertEquals(CANCELLED, requests.get(1));
}
代码示例来源:origin: ReactiveX/RxJava
@Test(timeout = 5000)
public void mergeDelayErrorObservableMaxConcurrent() {
final List<Long> requested = new ArrayList<Long>();
Flowable<Completable> cs = Flowable
.just(normal.completable)
.repeat(10)
.doOnRequest(new LongConsumer() {
@Override
public void accept(long v) {
requested.add(v);
}
});
Completable c = Completable.mergeDelayError(cs, 5);
c.blockingAwait();
// FIXME this request pattern looks odd because all 10 completions trigger 1 requests
Assert.assertEquals(Arrays.asList(5L, 1L, 1L, 1L, 1L, 1L, 1L, 1L, 1L, 1L, 1L), requested);
}
代码示例来源:origin: ReactiveX/RxJava
@Test(timeout = 5000)
public void concatObservablePrefetch() {
final List<Long> requested = new ArrayList<Long>();
Flowable<Completable> cs = Flowable
.just(normal.completable)
.repeat(10)
.doOnRequest(new LongConsumer() {
@Override
public void accept(long v) {
requested.add(v);
}
});
Completable c = Completable.concat(cs, 5);
c.blockingAwait();
Assert.assertEquals(Arrays.asList(5L, 4L, 4L), requested);
}
代码示例来源:origin: ReactiveX/RxJava
@Test(timeout = 5000)
public void mergeObservableMaxConcurrent() {
final List<Long> requested = new ArrayList<Long>();
Flowable<Completable> cs = Flowable
.just(normal.completable)
.repeat(10)
.doOnRequest(new LongConsumer() {
@Override
public void accept(long v) {
requested.add(v);
}
});
Completable c = Completable.merge(cs, 5);
c.blockingAwait();
// FIXME this request pattern looks odd because all 10 completions trigger 1 requests
Assert.assertEquals(Arrays.asList(5L, 1L, 1L, 1L, 1L, 1L, 1L, 1L, 1L, 1L, 1L), requested);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testUnsubscribeHappensAgainstParent() {
final AtomicBoolean unsubscribed = new AtomicBoolean(false);
Flowable.just(1).concatWith(Flowable.<Integer>never())
//
.doOnCancel(new Action() {
@Override
public void run() {
unsubscribed.set(true);
}
})
//
.doOnRequest(new LongConsumer() {
@Override
public void accept(long n) {
// do nothing
}
})
//
.subscribe().dispose();
assertTrue(unsubscribed.get());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void limitZero() {
Flowable.range(1, 5)
.doOnCancel(this)
.doOnRequest(this)
.limit(0)
.test()
.assertResult();
assertEquals(1, requests.size());
assertEquals(CANCELLED, requests.get(0));
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testMaxConcurrent5() {
final List<Long> requests = new ArrayList<Long>();
Flowable.range(1, 100).doOnRequest(new LongConsumer() {
@Override
public void accept(long reqCount) {
requests.add(reqCount);
}
}).concatMapEager(toJust, 5, Flowable.bufferSize()).subscribe(ts);
ts.assertNoErrors();
ts.assertValueCount(100);
ts.assertComplete();
Assert.assertEquals(5, (long) requests.get(0));
Assert.assertEquals(1, (long) requests.get(1));
Assert.assertEquals(1, (long) requests.get(2));
Assert.assertEquals(1, (long) requests.get(3));
Assert.assertEquals(1, (long) requests.get(4));
Assert.assertEquals(1, (long) requests.get(5));
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testBackpressureMultipleSmallAsyncRequests() throws InterruptedException {
final AtomicLong requests = new AtomicLong(0);
TestSubscriber<Long> ts = new TestSubscriber<Long>(0L);
Flowable.interval(100, TimeUnit.MILLISECONDS)
.doOnRequest(new LongConsumer() {
@Override
public void accept(long n) {
requests.addAndGet(n);
}
}).skip(4).subscribe(ts);
Thread.sleep(100);
ts.request(1);
ts.request(1);
Thread.sleep(100);
ts.dispose();
// FIXME not assertable anymore
// ts.assertUnsubscribed();
ts.assertNoErrors();
assertEquals(6, requests.get());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void limitAndTake() {
Flowable.range(1, 5)
.doOnCancel(this)
.doOnRequest(this)
.limit(6)
.take(5)
.test()
.assertResult(1, 2, 3, 4, 5);
assertEquals(Arrays.asList(6L, CANCELLED), requests);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void synchronousRebatching() {
final List<Long> requests = new ArrayList<Long>();
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
Flowable.range(1, 50)
.doOnRequest(new LongConsumer() {
@Override
public void accept(long r) {
requests.add(r);
}
})
.rebatchRequests(20)
.subscribe(ts);
ts.assertValueCount(50);
ts.assertNoErrors();
ts.assertComplete();
assertEquals(Arrays.asList(20L, 15L, 15L, 15L), requests);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void limitStep() {
TestSubscriber<Integer> ts = Flowable.range(1, 6)
.doOnRequest(this)
.limit(5)
.test(0L);
assertEquals(0, requests.size());
ts.request(1);
ts.assertValue(1);
ts.request(2);
ts.assertValues(1, 2, 3);
ts.request(3);
ts.assertResult(1, 2, 3, 4, 5);
assertEquals(Arrays.asList(1L, 2L, 2L), requests);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testDelayErrorMaxConcurrent() {
final List<Long> requests = new ArrayList<Long>();
Flowable<Integer> source = Flowable.mergeDelayError(Flowable.just(
Flowable.just(1).hide(),
Flowable.<Integer>error(new TestException()))
.doOnRequest(new LongConsumer() {
@Override
public void accept(long t1) {
requests.add(t1);
}
}), 1);
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
source.subscribe(ts);
ts.assertValue(1);
ts.assertTerminated();
ts.assertError(TestException.class);
assertEquals(Arrays.asList(1L, 1L, 1L), requests);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testBackpressureBounded() {
final AtomicLong requested = new AtomicLong();
Flowable<Integer> source = Flowable.range(1, 1000)
.doOnRequest(new LongConsumer() {
@Override
public void accept(long t) {
requested.addAndGet(t);
}
});
ConnectableFlowable<Integer> cf = source.replay(50);
TestSubscriber<Integer> ts1 = new TestSubscriber<Integer>(10L);
TestSubscriber<Integer> ts2 = new TestSubscriber<Integer>(90L);
cf.subscribe(ts1);
cf.subscribe(ts2);
ts2.request(10);
cf.connect();
ts1.assertValueCount(10);
ts1.assertNotTerminated();
ts2.assertValueCount(100);
ts2.assertNotTerminated();
Assert.assertEquals(100, requested.get());
}
内容来源于网络,如有侵权,请联系作者删除!