本文整理了Java中io.reactivex.Flowable.doOnNext()
方法的一些代码示例,展示了Flowable.doOnNext()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.doOnNext()
方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:doOnNext
[英]Modifies the source Publisher so that it invokes an action when it calls onNext.
Backpressure: The operator doesn't interfere with backpressure which is determined by the source Publisher's backpressure behavior. Scheduler: doOnNext does not operate by default on a particular Scheduler.
[中]修改源发布服务器,使其在调用onNext时调用操作。
背压:操作员不会干扰由源发布者的背压行为确定的背压。Scheduler:doOnNext默认情况下不会在特定计划程序上运行。
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void doOnNextNull() {
just1.doOnNext(null);
}
代码示例来源:origin: ReactiveX/RxJava
@Override
public Flowable<List<Integer>> apply(List<Integer> v)
throws Exception {
return Flowable.just(v)
.subscribeOn(Schedulers.io())
.doOnNext(new Consumer<List<Integer>>() {
@Override
public void accept(List<Integer> v)
throws Exception {
Thread.sleep(new Random().nextInt(20));
}
});
}
}
代码示例来源:origin: ReactiveX/RxJava
@Override
public Publisher<Integer> createPublisher(long elements) {
return
Flowable.range(0, (int)elements).doOnNext(Functions.emptyConsumer())
;
}
}
代码示例来源:origin: ReactiveX/RxJava
@Override
public Flowable<Integer> apply(Integer t) {
return Flowable.range(1, Flowable.bufferSize() * 2)
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer t) {
count.getAndIncrement();
}
}).hide();
}
}).subscribe(ts);
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testDoOnEach() {
final AtomicReference<String> r = new AtomicReference<String>();
String output = Flowable.just("one").doOnNext(new Consumer<String>() {
@Override
public void accept(String v) {
r.set(v);
}
}).blockingSingle();
assertEquals("one", output);
assertEquals("one", r.get());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testTakeLastZeroProcessesAllItemsButIgnoresThem() {
final AtomicInteger upstreamCount = new AtomicInteger();
final int num = 10;
long count = Flowable.range(1, num).doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer t) {
upstreamCount.incrementAndGet();
}})
.takeLast(0).count().blockingGet();
assertEquals(num, upstreamCount.get());
assertEquals(0L, count);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testUpstreamIsProcessedButIgnored() {
final int num = 10;
final AtomicInteger upstreamCount = new AtomicInteger();
Object count = Flowable.range(1, num)
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer t) {
upstreamCount.incrementAndGet();
}
})
.ignoreElements()
.blockingGet();
assertEquals(num, upstreamCount.get());
assertNull(count);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void switchOnNextPrefetch() {
final List<Integer> list = new ArrayList<Integer>();
Flowable<Integer> source = Flowable.range(1, 10).hide().doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer v) throws Exception {
list.add(v);
}
});
Flowable.switchOnNext(Flowable.just(source).hide(), 2)
.test(1);
assertEquals(Arrays.asList(1, 2, 3), list);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void switchOnNextDelayError() {
final List<Integer> list = new ArrayList<Integer>();
Flowable<Integer> source = Flowable.range(1, 10).hide().doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer v) throws Exception {
list.add(v);
}
});
Flowable.switchOnNextDelayError(Flowable.just(source).hide())
.test(1);
assertEquals(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), list);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void switchOnNextDelayErrorPrefetch() {
final List<Integer> list = new ArrayList<Integer>();
Flowable<Integer> source = Flowable.range(1, 10).hide().doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer v) throws Exception {
list.add(v);
}
});
Flowable.switchOnNextDelayError(Flowable.just(source).hide(), 2)
.test(1);
assertEquals(Arrays.asList(1, 2, 3), list);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testUpstreamIsProcessedButIgnoredFlowable() {
final int num = 10;
final AtomicInteger upstreamCount = new AtomicInteger();
long count = Flowable.range(1, num)
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer t) {
upstreamCount.incrementAndGet();
}
})
.ignoreElements()
.toFlowable()
.count().blockingGet();
assertEquals(num, upstreamCount.get());
assertEquals(0, count);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testWithCompletionCausingError() {
TestSubscriber<Notification<Integer>> ts = new TestSubscriber<Notification<Integer>>();
final RuntimeException ex = new RuntimeException("boo");
Flowable.<Integer>empty().materialize().doOnNext(new Consumer<Object>() {
@Override
public void accept(Object t) {
throw ex;
}
}).subscribe(ts);
ts.assertError(ex);
ts.assertNoValues();
ts.assertTerminated();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testWindowUnsubscribeOverlapping() {
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
final AtomicInteger count = new AtomicInteger();
Flowable.merge(Flowable.range(1, 10000).doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer t1) {
count.incrementAndGet();
}
}).window(5, 4).take(2)).subscribe(ts);
ts.awaitTerminalEvent(500, TimeUnit.MILLISECONDS);
ts.assertTerminated();
// System.out.println(ts.getOnNextEvents());
ts.assertValues(1, 2, 3, 4, 5, 5, 6, 7, 8, 9);
assertEquals(9, count.get());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testWindowUnsubscribeNonOverlapping() {
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
final AtomicInteger count = new AtomicInteger();
Flowable.merge(Flowable.range(1, 10000).doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer t1) {
count.incrementAndGet();
}
}).window(5).take(2)).subscribe(ts);
ts.awaitTerminalEvent(500, TimeUnit.MILLISECONDS);
ts.assertTerminated();
ts.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
// System.out.println(ts.getOnNextEvents());
assertEquals(10, count.get());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void dispose() {
TestHelper.checkDisposed(Flowable.just(1).groupBy(Functions.justFunction(1)));
Flowable.just(1)
.groupBy(Functions.justFunction(1))
.doOnNext(new Consumer<GroupedFlowable<Integer, Integer>>() {
@Override
public void accept(GroupedFlowable<Integer, Integer> g) throws Exception {
TestHelper.checkDisposed(g);
}
})
.test();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testReentrantTake() {
final PublishProcessor<Integer> source = PublishProcessor.create();
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
source.take(1).doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer v) {
source.onNext(2);
}
}).subscribe(ts);
source.onNext(1);
ts.assertValue(1);
ts.assertNoErrors();
ts.assertComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test
public void errorSkipInner() {
@SuppressWarnings("rawtypes")
final TestSubscriber[] to = { null };
Flowable.just(1).concatWith(Flowable.<Integer>error(new TestException()))
.window(2, 3)
.doOnNext(new Consumer<Flowable<Integer>>() {
@Override
public void accept(Flowable<Integer> w) throws Exception {
to[0] = w.test();
}
})
.test()
.assertError(TestException.class);
to[0].assertFailure(TestException.class, 1);
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test
public void errorExactInner() {
@SuppressWarnings("rawtypes")
final TestSubscriber[] to = { null };
Flowable.just(1).concatWith(Flowable.<Integer>error(new TestException()))
.window(2)
.doOnNext(new Consumer<Flowable<Integer>>() {
@Override
public void accept(Flowable<Integer> w) throws Exception {
to[0] = w.test();
}
})
.test()
.assertError(TestException.class);
to[0].assertFailure(TestException.class, 1);
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test
public void errorOverlapInner() {
@SuppressWarnings("rawtypes")
final TestSubscriber[] to = { null };
Flowable.just(1).concatWith(Flowable.<Integer>error(new TestException()))
.window(3, 2)
.doOnNext(new Consumer<Flowable<Integer>>() {
@Override
public void accept(Flowable<Integer> w) throws Exception {
to[0] = w.test();
}
})
.test()
.assertError(TestException.class);
to[0].assertFailure(TestException.class, 1);
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void delayError() {
Flowable.range(1, 5).concatWith(Flowable.<Integer>error(new TestException()))
.observeOn(Schedulers.computation(), true)
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer v) throws Exception {
if (v == 1) {
Thread.sleep(100);
}
}
})
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertFailure(TestException.class, 1, 2, 3, 4, 5);
}
内容来源于网络,如有侵权,请联系作者删除!