io.reactivex.Observable.doOnNext()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(9.0k)|赞(0)|评价(0)|浏览(150)

本文整理了Java中io.reactivex.Observable.doOnNext()方法的一些代码示例,展示了Observable.doOnNext()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.doOnNext()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:doOnNext

Observable.doOnNext介绍

[英]Modifies the source ObservableSource so that it invokes an action when it calls onNext.

Scheduler: doOnNext does not operate by default on a particular Scheduler.
[中]修改源ObservableSource,以便在调用onNext时调用操作。
调度器:默认情况下,doOnNext不会在特定的调度器上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Override
  public ObservableSource<List<Integer>> apply(List<Integer> v)
      throws Exception {
    return Observable.just(v)
        .subscribeOn(Schedulers.io())
        .doOnNext(new Consumer<List<Integer>>() {
          @Override
          public void accept(List<Integer> v)
              throws Exception {
            Thread.sleep(new Random().nextInt(20));
          }
        });
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void doOnNextNull() {
  just1.doOnNext(null);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testDoOnEach() {
  final AtomicReference<String> r = new AtomicReference<String>();
  String output = Observable.just("one").doOnNext(new Consumer<String>() {
    @Override
    public void accept(String v) {
      r.set(v);
    }
  }).blockingSingle();
  assertEquals("one", output);
  assertEquals("one", r.get());
}

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Observable<Integer> apply(Integer t) {
    return Observable.range(1, Observable.bufferSize() * 2)
        .doOnNext(new Consumer<Integer>() {
          @Override
          public void accept(Integer t) {
            count.getAndIncrement();
          }
        }).hide();
  }
}).subscribe(to);

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unchecked")
@Test
public void testEagerness2() {
  final AtomicInteger count = new AtomicInteger();
  Observable<Integer> source = Observable.just(1).doOnNext(new Consumer<Integer>() {
    @Override
    public void accept(Integer t) {
      count.getAndIncrement();
    }
  }).hide();
  Observable.concatArrayEager(source, source).subscribe(to);
  Assert.assertEquals(2, count.get());
  to.assertValueCount(count.get());
  to.assertNoErrors();
  to.assertComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unchecked")
@Test
public void testEagerness3() {
  final AtomicInteger count = new AtomicInteger();
  Observable<Integer> source = Observable.just(1).doOnNext(new Consumer<Integer>() {
    @Override
    public void accept(Integer t) {
      count.getAndIncrement();
    }
  }).hide();
  Observable.concatArrayEager(source, source, source).subscribe(to);
  Assert.assertEquals(3, count.get());
  to.assertValueCount(count.get());
  to.assertNoErrors();
  to.assertComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unchecked")
@Test
public void testEagerness6() {
  final AtomicInteger count = new AtomicInteger();
  Observable<Integer> source = Observable.just(1).doOnNext(new Consumer<Integer>() {
    @Override
    public void accept(Integer t) {
      count.getAndIncrement();
    }
  }).hide();
  Observable.concatArrayEager(source, source, source, source, source, source).subscribe(to);
  Assert.assertEquals(6, count.get());
  to.assertValueCount(count.get());
  to.assertNoErrors();
  to.assertComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unchecked")
@Test
public void testEagerness5() {
  final AtomicInteger count = new AtomicInteger();
  Observable<Integer> source = Observable.just(1).doOnNext(new Consumer<Integer>() {
    @Override
    public void accept(Integer t) {
      count.getAndIncrement();
    }
  }).hide();
  Observable.concatArrayEager(source, source, source, source, source).subscribe(to);
  Assert.assertEquals(5, count.get());
  to.assertValueCount(count.get());
  to.assertNoErrors();
  to.assertComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unchecked")
@Test
public void testEagerness7() {
  final AtomicInteger count = new AtomicInteger();
  Observable<Integer> source = Observable.just(1).doOnNext(new Consumer<Integer>() {
    @Override
    public void accept(Integer t) {
      count.getAndIncrement();
    }
  }).hide();
  Observable.concatArrayEager(source, source, source, source, source, source, source).subscribe(to);
  Assert.assertEquals(7, count.get());
  to.assertValueCount(count.get());
  to.assertNoErrors();
  to.assertComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unchecked")
@Test
public void testEagerness4() {
  final AtomicInteger count = new AtomicInteger();
  Observable<Integer> source = Observable.just(1).doOnNext(new Consumer<Integer>() {
    @Override
    public void accept(Integer t) {
      count.getAndIncrement();
    }
  }).hide();
  Observable.concatArrayEager(source, source, source, source).subscribe(to);
  Assert.assertEquals(4, count.get());
  to.assertValueCount(count.get());
  to.assertNoErrors();
  to.assertComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unchecked")
@Test
public void testEagerness8() {
  final AtomicInteger count = new AtomicInteger();
  Observable<Integer> source = Observable.just(1).doOnNext(new Consumer<Integer>() {
    @Override
    public void accept(Integer t) {
      count.getAndIncrement();
    }
  }).hide();
  Observable.concatArrayEager(source, source, source, source, source, source, source, source).subscribe(to);
  Assert.assertEquals(8, count.get());
  to.assertValueCount(count.get());
  to.assertNoErrors();
  to.assertComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unchecked")
@Test
public void testEagerness9() {
  final AtomicInteger count = new AtomicInteger();
  Observable<Integer> source = Observable.just(1).doOnNext(new Consumer<Integer>() {
    @Override
    public void accept(Integer t) {
      count.getAndIncrement();
    }
  }).hide();
  Observable.concatArrayEager(source, source, source, source, source, source, source, source, source).subscribe(to);
  Assert.assertEquals(9, count.get());
  to.assertValueCount(count.get());
  to.assertNoErrors();
  to.assertComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testUpstreamIsProcessedButIgnored() {
  final int num = 10;
  final AtomicInteger upstreamCount = new AtomicInteger();
  Object count = Observable.range(1, num)
      .doOnNext(new Consumer<Integer>() {
        @Override
        public void accept(Integer t) {
          upstreamCount.incrementAndGet();
        }
      })
      .ignoreElements()
      .blockingGet();
  assertEquals(num, upstreamCount.get());
  assertNull(count);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void dispose() {
  TestHelper.checkDisposed(Observable.just(1).groupBy(Functions.justFunction(1)));
  Observable.just(1)
  .groupBy(Functions.justFunction(1))
  .doOnNext(new Consumer<GroupedObservable<Integer, Integer>>() {
    @Override
    public void accept(GroupedObservable<Integer, Integer> g) throws Exception {
      TestHelper.checkDisposed(g);
    }
  })
  .test();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testTakeLastZeroProcessesAllItemsButIgnoresThem() {
  final AtomicInteger upstreamCount = new AtomicInteger();
  final int num = 10;
  long count = Observable.range(1, num).doOnNext(new Consumer<Integer>() {
    @Override
    public void accept(Integer t) {
      upstreamCount.incrementAndGet();
    }})
    .takeLast(0).count().blockingGet();
  assertEquals(num, upstreamCount.get());
  assertEquals(0L, count);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testWithCompletionCausingError() {
  TestObserver<Notification<Integer>> to = new TestObserver<Notification<Integer>>();
  final RuntimeException ex = new RuntimeException("boo");
  Observable.<Integer>empty().materialize().doOnNext(new Consumer<Object>() {
    @Override
    public void accept(Object t) {
      throw ex;
    }
  }).subscribe(to);
  to.assertError(ex);
  to.assertNoValues();
  to.assertTerminated();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testWindowUnsubscribeOverlapping() {
  TestObserver<Integer> to = new TestObserver<Integer>();
  final AtomicInteger count = new AtomicInteger();
  Observable.merge(Observable.range(1, 10000).doOnNext(new Consumer<Integer>() {
    @Override
    public void accept(Integer t1) {
      count.incrementAndGet();
    }
  }).window(5, 4).take(2)).subscribe(to);
  to.awaitTerminalEvent(500, TimeUnit.MILLISECONDS);
  to.assertTerminated();
  //        System.out.println(ts.getOnNextEvents());
  to.assertValues(1, 2, 3, 4, 5, 5, 6, 7, 8, 9);
  assertEquals(9, count.get());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testWindowUnsubscribeNonOverlapping() {
  TestObserver<Integer> to = new TestObserver<Integer>();
  final AtomicInteger count = new AtomicInteger();
  Observable.merge(Observable.range(1, 10000).doOnNext(new Consumer<Integer>() {
    @Override
    public void accept(Integer t1) {
      count.incrementAndGet();
    }
  }).window(5).take(2)).subscribe(to);
  to.awaitTerminalEvent(500, TimeUnit.MILLISECONDS);
  to.assertTerminated();
  to.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
  //        System.out.println(ts.getOnNextEvents());
  assertEquals(10, count.get());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testReentrantTake() {
  final PublishSubject<Integer> source = PublishSubject.create();
  TestObserver<Integer> to = new TestObserver<Integer>();
  source.take(1).doOnNext(new Consumer<Integer>() {
    @Override
    public void accept(Integer v) {
      source.onNext(2);
    }
  }).subscribe(to);
  source.onNext(1);
  to.assertValue(1);
  to.assertNoErrors();
  to.assertComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void delayError() {
  Observable.range(1, 5).concatWith(Observable.<Integer>error(new TestException()))
  .observeOn(Schedulers.computation(), true)
  .doOnNext(new Consumer<Integer>() {
    @Override
    public void accept(Integer v) throws Exception {
      if (v == 1) {
        Thread.sleep(100);
      }
    }
  })
  .test()
  .awaitDone(5, TimeUnit.SECONDS)
  .assertFailure(TestException.class, 1, 2, 3, 4, 5);
}

相关文章

Observable类方法