本文整理了Java中io.reactivex.Observable.doOnNext()
方法的一些代码示例,展示了Observable.doOnNext()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.doOnNext()
方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:doOnNext
[英]Modifies the source ObservableSource so that it invokes an action when it calls onNext.
Scheduler: doOnNext does not operate by default on a particular Scheduler.
[中]修改源ObservableSource,以便在调用onNext时调用操作。
调度器:默认情况下,doOnNext不会在特定的调度器上运行。
代码示例来源:origin: ReactiveX/RxJava
@Override
public ObservableSource<List<Integer>> apply(List<Integer> v)
throws Exception {
return Observable.just(v)
.subscribeOn(Schedulers.io())
.doOnNext(new Consumer<List<Integer>>() {
@Override
public void accept(List<Integer> v)
throws Exception {
Thread.sleep(new Random().nextInt(20));
}
});
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void doOnNextNull() {
just1.doOnNext(null);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testDoOnEach() {
final AtomicReference<String> r = new AtomicReference<String>();
String output = Observable.just("one").doOnNext(new Consumer<String>() {
@Override
public void accept(String v) {
r.set(v);
}
}).blockingSingle();
assertEquals("one", output);
assertEquals("one", r.get());
}
代码示例来源:origin: ReactiveX/RxJava
@Override
public Observable<Integer> apply(Integer t) {
return Observable.range(1, Observable.bufferSize() * 2)
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer t) {
count.getAndIncrement();
}
}).hide();
}
}).subscribe(to);
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test
public void testEagerness2() {
final AtomicInteger count = new AtomicInteger();
Observable<Integer> source = Observable.just(1).doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer t) {
count.getAndIncrement();
}
}).hide();
Observable.concatArrayEager(source, source).subscribe(to);
Assert.assertEquals(2, count.get());
to.assertValueCount(count.get());
to.assertNoErrors();
to.assertComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test
public void testEagerness3() {
final AtomicInteger count = new AtomicInteger();
Observable<Integer> source = Observable.just(1).doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer t) {
count.getAndIncrement();
}
}).hide();
Observable.concatArrayEager(source, source, source).subscribe(to);
Assert.assertEquals(3, count.get());
to.assertValueCount(count.get());
to.assertNoErrors();
to.assertComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test
public void testEagerness6() {
final AtomicInteger count = new AtomicInteger();
Observable<Integer> source = Observable.just(1).doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer t) {
count.getAndIncrement();
}
}).hide();
Observable.concatArrayEager(source, source, source, source, source, source).subscribe(to);
Assert.assertEquals(6, count.get());
to.assertValueCount(count.get());
to.assertNoErrors();
to.assertComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test
public void testEagerness5() {
final AtomicInteger count = new AtomicInteger();
Observable<Integer> source = Observable.just(1).doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer t) {
count.getAndIncrement();
}
}).hide();
Observable.concatArrayEager(source, source, source, source, source).subscribe(to);
Assert.assertEquals(5, count.get());
to.assertValueCount(count.get());
to.assertNoErrors();
to.assertComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test
public void testEagerness7() {
final AtomicInteger count = new AtomicInteger();
Observable<Integer> source = Observable.just(1).doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer t) {
count.getAndIncrement();
}
}).hide();
Observable.concatArrayEager(source, source, source, source, source, source, source).subscribe(to);
Assert.assertEquals(7, count.get());
to.assertValueCount(count.get());
to.assertNoErrors();
to.assertComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test
public void testEagerness4() {
final AtomicInteger count = new AtomicInteger();
Observable<Integer> source = Observable.just(1).doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer t) {
count.getAndIncrement();
}
}).hide();
Observable.concatArrayEager(source, source, source, source).subscribe(to);
Assert.assertEquals(4, count.get());
to.assertValueCount(count.get());
to.assertNoErrors();
to.assertComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test
public void testEagerness8() {
final AtomicInteger count = new AtomicInteger();
Observable<Integer> source = Observable.just(1).doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer t) {
count.getAndIncrement();
}
}).hide();
Observable.concatArrayEager(source, source, source, source, source, source, source, source).subscribe(to);
Assert.assertEquals(8, count.get());
to.assertValueCount(count.get());
to.assertNoErrors();
to.assertComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test
public void testEagerness9() {
final AtomicInteger count = new AtomicInteger();
Observable<Integer> source = Observable.just(1).doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer t) {
count.getAndIncrement();
}
}).hide();
Observable.concatArrayEager(source, source, source, source, source, source, source, source, source).subscribe(to);
Assert.assertEquals(9, count.get());
to.assertValueCount(count.get());
to.assertNoErrors();
to.assertComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testUpstreamIsProcessedButIgnored() {
final int num = 10;
final AtomicInteger upstreamCount = new AtomicInteger();
Object count = Observable.range(1, num)
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer t) {
upstreamCount.incrementAndGet();
}
})
.ignoreElements()
.blockingGet();
assertEquals(num, upstreamCount.get());
assertNull(count);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void dispose() {
TestHelper.checkDisposed(Observable.just(1).groupBy(Functions.justFunction(1)));
Observable.just(1)
.groupBy(Functions.justFunction(1))
.doOnNext(new Consumer<GroupedObservable<Integer, Integer>>() {
@Override
public void accept(GroupedObservable<Integer, Integer> g) throws Exception {
TestHelper.checkDisposed(g);
}
})
.test();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testTakeLastZeroProcessesAllItemsButIgnoresThem() {
final AtomicInteger upstreamCount = new AtomicInteger();
final int num = 10;
long count = Observable.range(1, num).doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer t) {
upstreamCount.incrementAndGet();
}})
.takeLast(0).count().blockingGet();
assertEquals(num, upstreamCount.get());
assertEquals(0L, count);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testWithCompletionCausingError() {
TestObserver<Notification<Integer>> to = new TestObserver<Notification<Integer>>();
final RuntimeException ex = new RuntimeException("boo");
Observable.<Integer>empty().materialize().doOnNext(new Consumer<Object>() {
@Override
public void accept(Object t) {
throw ex;
}
}).subscribe(to);
to.assertError(ex);
to.assertNoValues();
to.assertTerminated();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testWindowUnsubscribeOverlapping() {
TestObserver<Integer> to = new TestObserver<Integer>();
final AtomicInteger count = new AtomicInteger();
Observable.merge(Observable.range(1, 10000).doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer t1) {
count.incrementAndGet();
}
}).window(5, 4).take(2)).subscribe(to);
to.awaitTerminalEvent(500, TimeUnit.MILLISECONDS);
to.assertTerminated();
// System.out.println(ts.getOnNextEvents());
to.assertValues(1, 2, 3, 4, 5, 5, 6, 7, 8, 9);
assertEquals(9, count.get());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testWindowUnsubscribeNonOverlapping() {
TestObserver<Integer> to = new TestObserver<Integer>();
final AtomicInteger count = new AtomicInteger();
Observable.merge(Observable.range(1, 10000).doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer t1) {
count.incrementAndGet();
}
}).window(5).take(2)).subscribe(to);
to.awaitTerminalEvent(500, TimeUnit.MILLISECONDS);
to.assertTerminated();
to.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
// System.out.println(ts.getOnNextEvents());
assertEquals(10, count.get());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testReentrantTake() {
final PublishSubject<Integer> source = PublishSubject.create();
TestObserver<Integer> to = new TestObserver<Integer>();
source.take(1).doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer v) {
source.onNext(2);
}
}).subscribe(to);
source.onNext(1);
to.assertValue(1);
to.assertNoErrors();
to.assertComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void delayError() {
Observable.range(1, 5).concatWith(Observable.<Integer>error(new TestException()))
.observeOn(Schedulers.computation(), true)
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer v) throws Exception {
if (v == 1) {
Thread.sleep(100);
}
}
})
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertFailure(TestException.class, 1, 2, 3, 4, 5);
}
内容来源于网络,如有侵权,请联系作者删除!