io.reactivex.Observable.doOnNext()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(9.0k)|赞(0)|评价(0)|浏览(173)

本文整理了Java中io.reactivex.Observable.doOnNext()方法的一些代码示例,展示了Observable.doOnNext()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.doOnNext()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:doOnNext

Observable.doOnNext介绍

[英]Modifies the source ObservableSource so that it invokes an action when it calls onNext.

Scheduler: doOnNext does not operate by default on a particular Scheduler.
[中]修改源ObservableSource,以便在调用onNext时调用操作。
调度器:默认情况下,doOnNext不会在特定的调度器上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public ObservableSource<List<Integer>> apply(List<Integer> v)
  3. throws Exception {
  4. return Observable.just(v)
  5. .subscribeOn(Schedulers.io())
  6. .doOnNext(new Consumer<List<Integer>>() {
  7. @Override
  8. public void accept(List<Integer> v)
  9. throws Exception {
  10. Thread.sleep(new Random().nextInt(20));
  11. }
  12. });
  13. }
  14. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = NullPointerException.class)
  2. public void doOnNextNull() {
  3. just1.doOnNext(null);
  4. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testDoOnEach() {
  3. final AtomicReference<String> r = new AtomicReference<String>();
  4. String output = Observable.just("one").doOnNext(new Consumer<String>() {
  5. @Override
  6. public void accept(String v) {
  7. r.set(v);
  8. }
  9. }).blockingSingle();
  10. assertEquals("one", output);
  11. assertEquals("one", r.get());
  12. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public Observable<Integer> apply(Integer t) {
  3. return Observable.range(1, Observable.bufferSize() * 2)
  4. .doOnNext(new Consumer<Integer>() {
  5. @Override
  6. public void accept(Integer t) {
  7. count.getAndIncrement();
  8. }
  9. }).hide();
  10. }
  11. }).subscribe(to);

代码示例来源:origin: ReactiveX/RxJava

  1. @SuppressWarnings("unchecked")
  2. @Test
  3. public void testEagerness2() {
  4. final AtomicInteger count = new AtomicInteger();
  5. Observable<Integer> source = Observable.just(1).doOnNext(new Consumer<Integer>() {
  6. @Override
  7. public void accept(Integer t) {
  8. count.getAndIncrement();
  9. }
  10. }).hide();
  11. Observable.concatArrayEager(source, source).subscribe(to);
  12. Assert.assertEquals(2, count.get());
  13. to.assertValueCount(count.get());
  14. to.assertNoErrors();
  15. to.assertComplete();
  16. }

代码示例来源:origin: ReactiveX/RxJava

  1. @SuppressWarnings("unchecked")
  2. @Test
  3. public void testEagerness3() {
  4. final AtomicInteger count = new AtomicInteger();
  5. Observable<Integer> source = Observable.just(1).doOnNext(new Consumer<Integer>() {
  6. @Override
  7. public void accept(Integer t) {
  8. count.getAndIncrement();
  9. }
  10. }).hide();
  11. Observable.concatArrayEager(source, source, source).subscribe(to);
  12. Assert.assertEquals(3, count.get());
  13. to.assertValueCount(count.get());
  14. to.assertNoErrors();
  15. to.assertComplete();
  16. }

代码示例来源:origin: ReactiveX/RxJava

  1. @SuppressWarnings("unchecked")
  2. @Test
  3. public void testEagerness6() {
  4. final AtomicInteger count = new AtomicInteger();
  5. Observable<Integer> source = Observable.just(1).doOnNext(new Consumer<Integer>() {
  6. @Override
  7. public void accept(Integer t) {
  8. count.getAndIncrement();
  9. }
  10. }).hide();
  11. Observable.concatArrayEager(source, source, source, source, source, source).subscribe(to);
  12. Assert.assertEquals(6, count.get());
  13. to.assertValueCount(count.get());
  14. to.assertNoErrors();
  15. to.assertComplete();
  16. }

代码示例来源:origin: ReactiveX/RxJava

  1. @SuppressWarnings("unchecked")
  2. @Test
  3. public void testEagerness5() {
  4. final AtomicInteger count = new AtomicInteger();
  5. Observable<Integer> source = Observable.just(1).doOnNext(new Consumer<Integer>() {
  6. @Override
  7. public void accept(Integer t) {
  8. count.getAndIncrement();
  9. }
  10. }).hide();
  11. Observable.concatArrayEager(source, source, source, source, source).subscribe(to);
  12. Assert.assertEquals(5, count.get());
  13. to.assertValueCount(count.get());
  14. to.assertNoErrors();
  15. to.assertComplete();
  16. }

代码示例来源:origin: ReactiveX/RxJava

  1. @SuppressWarnings("unchecked")
  2. @Test
  3. public void testEagerness7() {
  4. final AtomicInteger count = new AtomicInteger();
  5. Observable<Integer> source = Observable.just(1).doOnNext(new Consumer<Integer>() {
  6. @Override
  7. public void accept(Integer t) {
  8. count.getAndIncrement();
  9. }
  10. }).hide();
  11. Observable.concatArrayEager(source, source, source, source, source, source, source).subscribe(to);
  12. Assert.assertEquals(7, count.get());
  13. to.assertValueCount(count.get());
  14. to.assertNoErrors();
  15. to.assertComplete();
  16. }

代码示例来源:origin: ReactiveX/RxJava

  1. @SuppressWarnings("unchecked")
  2. @Test
  3. public void testEagerness4() {
  4. final AtomicInteger count = new AtomicInteger();
  5. Observable<Integer> source = Observable.just(1).doOnNext(new Consumer<Integer>() {
  6. @Override
  7. public void accept(Integer t) {
  8. count.getAndIncrement();
  9. }
  10. }).hide();
  11. Observable.concatArrayEager(source, source, source, source).subscribe(to);
  12. Assert.assertEquals(4, count.get());
  13. to.assertValueCount(count.get());
  14. to.assertNoErrors();
  15. to.assertComplete();
  16. }

代码示例来源:origin: ReactiveX/RxJava

  1. @SuppressWarnings("unchecked")
  2. @Test
  3. public void testEagerness8() {
  4. final AtomicInteger count = new AtomicInteger();
  5. Observable<Integer> source = Observable.just(1).doOnNext(new Consumer<Integer>() {
  6. @Override
  7. public void accept(Integer t) {
  8. count.getAndIncrement();
  9. }
  10. }).hide();
  11. Observable.concatArrayEager(source, source, source, source, source, source, source, source).subscribe(to);
  12. Assert.assertEquals(8, count.get());
  13. to.assertValueCount(count.get());
  14. to.assertNoErrors();
  15. to.assertComplete();
  16. }

代码示例来源:origin: ReactiveX/RxJava

  1. @SuppressWarnings("unchecked")
  2. @Test
  3. public void testEagerness9() {
  4. final AtomicInteger count = new AtomicInteger();
  5. Observable<Integer> source = Observable.just(1).doOnNext(new Consumer<Integer>() {
  6. @Override
  7. public void accept(Integer t) {
  8. count.getAndIncrement();
  9. }
  10. }).hide();
  11. Observable.concatArrayEager(source, source, source, source, source, source, source, source, source).subscribe(to);
  12. Assert.assertEquals(9, count.get());
  13. to.assertValueCount(count.get());
  14. to.assertNoErrors();
  15. to.assertComplete();
  16. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testUpstreamIsProcessedButIgnored() {
  3. final int num = 10;
  4. final AtomicInteger upstreamCount = new AtomicInteger();
  5. Object count = Observable.range(1, num)
  6. .doOnNext(new Consumer<Integer>() {
  7. @Override
  8. public void accept(Integer t) {
  9. upstreamCount.incrementAndGet();
  10. }
  11. })
  12. .ignoreElements()
  13. .blockingGet();
  14. assertEquals(num, upstreamCount.get());
  15. assertNull(count);
  16. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void dispose() {
  3. TestHelper.checkDisposed(Observable.just(1).groupBy(Functions.justFunction(1)));
  4. Observable.just(1)
  5. .groupBy(Functions.justFunction(1))
  6. .doOnNext(new Consumer<GroupedObservable<Integer, Integer>>() {
  7. @Override
  8. public void accept(GroupedObservable<Integer, Integer> g) throws Exception {
  9. TestHelper.checkDisposed(g);
  10. }
  11. })
  12. .test();
  13. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testTakeLastZeroProcessesAllItemsButIgnoresThem() {
  3. final AtomicInteger upstreamCount = new AtomicInteger();
  4. final int num = 10;
  5. long count = Observable.range(1, num).doOnNext(new Consumer<Integer>() {
  6. @Override
  7. public void accept(Integer t) {
  8. upstreamCount.incrementAndGet();
  9. }})
  10. .takeLast(0).count().blockingGet();
  11. assertEquals(num, upstreamCount.get());
  12. assertEquals(0L, count);
  13. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testWithCompletionCausingError() {
  3. TestObserver<Notification<Integer>> to = new TestObserver<Notification<Integer>>();
  4. final RuntimeException ex = new RuntimeException("boo");
  5. Observable.<Integer>empty().materialize().doOnNext(new Consumer<Object>() {
  6. @Override
  7. public void accept(Object t) {
  8. throw ex;
  9. }
  10. }).subscribe(to);
  11. to.assertError(ex);
  12. to.assertNoValues();
  13. to.assertTerminated();
  14. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testWindowUnsubscribeOverlapping() {
  3. TestObserver<Integer> to = new TestObserver<Integer>();
  4. final AtomicInteger count = new AtomicInteger();
  5. Observable.merge(Observable.range(1, 10000).doOnNext(new Consumer<Integer>() {
  6. @Override
  7. public void accept(Integer t1) {
  8. count.incrementAndGet();
  9. }
  10. }).window(5, 4).take(2)).subscribe(to);
  11. to.awaitTerminalEvent(500, TimeUnit.MILLISECONDS);
  12. to.assertTerminated();
  13. // System.out.println(ts.getOnNextEvents());
  14. to.assertValues(1, 2, 3, 4, 5, 5, 6, 7, 8, 9);
  15. assertEquals(9, count.get());
  16. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testWindowUnsubscribeNonOverlapping() {
  3. TestObserver<Integer> to = new TestObserver<Integer>();
  4. final AtomicInteger count = new AtomicInteger();
  5. Observable.merge(Observable.range(1, 10000).doOnNext(new Consumer<Integer>() {
  6. @Override
  7. public void accept(Integer t1) {
  8. count.incrementAndGet();
  9. }
  10. }).window(5).take(2)).subscribe(to);
  11. to.awaitTerminalEvent(500, TimeUnit.MILLISECONDS);
  12. to.assertTerminated();
  13. to.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
  14. // System.out.println(ts.getOnNextEvents());
  15. assertEquals(10, count.get());
  16. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testReentrantTake() {
  3. final PublishSubject<Integer> source = PublishSubject.create();
  4. TestObserver<Integer> to = new TestObserver<Integer>();
  5. source.take(1).doOnNext(new Consumer<Integer>() {
  6. @Override
  7. public void accept(Integer v) {
  8. source.onNext(2);
  9. }
  10. }).subscribe(to);
  11. source.onNext(1);
  12. to.assertValue(1);
  13. to.assertNoErrors();
  14. to.assertComplete();
  15. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void delayError() {
  3. Observable.range(1, 5).concatWith(Observable.<Integer>error(new TestException()))
  4. .observeOn(Schedulers.computation(), true)
  5. .doOnNext(new Consumer<Integer>() {
  6. @Override
  7. public void accept(Integer v) throws Exception {
  8. if (v == 1) {
  9. Thread.sleep(100);
  10. }
  11. }
  12. })
  13. .test()
  14. .awaitDone(5, TimeUnit.SECONDS)
  15. .assertFailure(TestException.class, 1, 2, 3, 4, 5);
  16. }

相关文章

Observable类方法