io.reactivex.Observable.doOnEach()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(16.1k)|赞(0)|评价(0)|浏览(142)

本文整理了Java中io.reactivex.Observable.doOnEach()方法的一些代码示例,展示了Observable.doOnEach()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.doOnEach()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:doOnEach

Observable.doOnEach介绍

[英]Modifies the source ObservableSource so that it notifies an Observer for each item and terminal event it emits.

In case the onError of the supplied observer throws, the downstream will receive a composite exception containing the original exception and the exception thrown by onError. If either the onNext or the onComplete method of the supplied observer throws, the downstream will be terminated and will receive this thrown exception.

Scheduler: doOnEach does not operate by default on a particular Scheduler.
[中]修改源ObservableSource,以便它为其发出的每个项目和终端事件通知观察者。
如果提供的观察者的onError抛出,下游将接收一个包含原始异常和onError抛出的异常的复合异常。如果提供的观察者的onNext或onComplete方法抛出,则下游将被终止,并将接收该抛出的异常。
调度器:默认情况下,doOnEach不会在特定的调度器上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public ObservableSource<Object> apply(Observable<Object> o) throws Exception {
  3. return o.doOnEach(new TestObserver<Object>());
  4. }
  5. });

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = NullPointerException.class)
  2. public void doOnEachSupplierNull() {
  3. just1.doOnEach((Consumer<Notification<Integer>>)null);
  4. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = NullPointerException.class)
  2. public void doOnEachSubscriberNull() {
  3. just1.doOnEach((Observer<Integer>)null);
  4. }

代码示例来源:origin: ReactiveX/RxJava

  1. /**
  2. * Modifies the source ObservableSource so that it invokes an action when it calls {@code onNext}.
  3. * <p>
  4. * <img width="640" height="360" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/doOnNext.o.png" alt="">
  5. * <dl>
  6. * <dt><b>Scheduler:</b></dt>
  7. * <dd>{@code doOnNext} does not operate by default on a particular {@link Scheduler}.</dd>
  8. * </dl>
  9. *
  10. * @param onNext
  11. * the action to invoke when the source ObservableSource calls {@code onNext}
  12. * @return the source ObservableSource with the side-effecting behavior applied
  13. * @see <a href="http://reactivex.io/documentation/operators/do.html">ReactiveX operators documentation: Do</a>
  14. */
  15. @CheckReturnValue
  16. @SchedulerSupport(SchedulerSupport.NONE)
  17. public final Observable<T> doOnNext(Consumer<? super T> onNext) {
  18. return doOnEach(onNext, Functions.emptyConsumer(), Functions.EMPTY_ACTION, Functions.EMPTY_ACTION);
  19. }

代码示例来源:origin: ReactiveX/RxJava

  1. /**
  2. * Modifies the source ObservableSource so that it invokes an action if it calls {@code onError}.
  3. * <p>
  4. * In case the {@code onError} action throws, the downstream will receive a composite exception containing
  5. * the original exception and the exception thrown by {@code onError}.
  6. * <p>
  7. * <img width="640" height="355" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/doOnError.o.png" alt="">
  8. * <dl>
  9. * <dt><b>Scheduler:</b></dt>
  10. * <dd>{@code doOnError} does not operate by default on a particular {@link Scheduler}.</dd>
  11. * </dl>
  12. *
  13. * @param onError
  14. * the action to invoke if the source ObservableSource calls {@code onError}
  15. * @return the source ObservableSource with the side-effecting behavior applied
  16. * @see <a href="http://reactivex.io/documentation/operators/do.html">ReactiveX operators documentation: Do</a>
  17. */
  18. @CheckReturnValue
  19. @SchedulerSupport(SchedulerSupport.NONE)
  20. public final Observable<T> doOnError(Consumer<? super Throwable> onError) {
  21. return doOnEach(Functions.emptyConsumer(), onError, Functions.EMPTY_ACTION, Functions.EMPTY_ACTION);
  22. }

代码示例来源:origin: ReactiveX/RxJava

  1. /**
  2. * Modifies the source ObservableSource so that it invokes an action when it calls {@code onComplete}.
  3. * <p>
  4. * <img width="640" height="358" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/doOnComplete.o.png" alt="">
  5. * <dl>
  6. * <dt><b>Scheduler:</b></dt>
  7. * <dd>{@code doOnComplete} does not operate by default on a particular {@link Scheduler}.</dd>
  8. * </dl>
  9. *
  10. * @param onComplete
  11. * the action to invoke when the source ObservableSource calls {@code onComplete}
  12. * @return the source ObservableSource with the side-effecting behavior applied
  13. * @see <a href="http://reactivex.io/documentation/operators/do.html">ReactiveX operators documentation: Do</a>
  14. */
  15. @CheckReturnValue
  16. @SchedulerSupport(SchedulerSupport.NONE)
  17. public final Observable<T> doOnComplete(Action onComplete) {
  18. return doOnEach(Functions.emptyConsumer(), Functions.emptyConsumer(), onComplete, Functions.EMPTY_ACTION);
  19. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testDoOnEach() {
  3. Observable<String> base = Observable.just("a", "b", "c");
  4. Observable<String> doOnEach = base.doOnEach(sideEffectObserver);
  5. doOnEach.subscribe(subscribedObserver);
  6. // ensure the leaf Observer is still getting called
  7. verify(subscribedObserver, never()).onError(any(Throwable.class));
  8. verify(subscribedObserver, times(1)).onNext("a");
  9. verify(subscribedObserver, times(1)).onNext("b");
  10. verify(subscribedObserver, times(1)).onNext("c");
  11. verify(subscribedObserver, times(1)).onComplete();
  12. // ensure our injected Observer is getting called
  13. verify(sideEffectObserver, never()).onError(any(Throwable.class));
  14. verify(sideEffectObserver, times(1)).onNext("a");
  15. verify(sideEffectObserver, times(1)).onNext("b");
  16. verify(sideEffectObserver, times(1)).onNext("c");
  17. verify(sideEffectObserver, times(1)).onComplete();
  18. }

代码示例来源:origin: redisson/redisson

  1. /**
  2. * Modifies the source ObservableSource so that it invokes an action when it calls {@code onNext}.
  3. * <p>
  4. * <img width="640" height="360" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/doOnNext.o.png" alt="">
  5. * <dl>
  6. * <dt><b>Scheduler:</b></dt>
  7. * <dd>{@code doOnNext} does not operate by default on a particular {@link Scheduler}.</dd>
  8. * </dl>
  9. *
  10. * @param onNext
  11. * the action to invoke when the source ObservableSource calls {@code onNext}
  12. * @return the source ObservableSource with the side-effecting behavior applied
  13. * @see <a href="http://reactivex.io/documentation/operators/do.html">ReactiveX operators documentation: Do</a>
  14. */
  15. @CheckReturnValue
  16. @SchedulerSupport(SchedulerSupport.NONE)
  17. public final Observable<T> doOnNext(Consumer<? super T> onNext) {
  18. return doOnEach(onNext, Functions.emptyConsumer(), Functions.EMPTY_ACTION, Functions.EMPTY_ACTION);
  19. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void dispose() {
  3. TestHelper.checkDisposed(Observable.just(1).doOnEach(new TestObserver<Integer>()));
  4. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testDoOnEachWithError() {
  3. Observable<String> base = Observable.just("one", "fail", "two", "three", "fail");
  4. Observable<String> errs = base.map(new Function<String, String>() {
  5. @Override
  6. public String apply(String s) {
  7. if ("fail".equals(s)) {
  8. throw new RuntimeException("Forced Failure");
  9. }
  10. return s;
  11. }
  12. });
  13. Observable<String> doOnEach = errs.doOnEach(sideEffectObserver);
  14. doOnEach.subscribe(subscribedObserver);
  15. verify(subscribedObserver, times(1)).onNext("one");
  16. verify(subscribedObserver, never()).onNext("two");
  17. verify(subscribedObserver, never()).onNext("three");
  18. verify(subscribedObserver, never()).onComplete();
  19. verify(subscribedObserver, times(1)).onError(any(Throwable.class));
  20. verify(sideEffectObserver, times(1)).onNext("one");
  21. verify(sideEffectObserver, never()).onNext("two");
  22. verify(sideEffectObserver, never()).onNext("three");
  23. verify(sideEffectObserver, never()).onComplete();
  24. verify(sideEffectObserver, times(1)).onError(any(Throwable.class));
  25. }

代码示例来源:origin: ReactiveX/RxJava

  1. /**
  2. * Registers an {@link Action} to be called when this ObservableSource invokes either
  3. * {@link Observer#onComplete onComplete} or {@link Observer#onError onError}.
  4. * <p>
  5. * <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/doAfterTerminate.png" alt="">
  6. * <dl>
  7. * <dt><b>Scheduler:</b></dt>
  8. * <dd>{@code doAfterTerminate} does not operate by default on a particular {@link Scheduler}.</dd>
  9. * </dl>
  10. *
  11. * @param onFinally
  12. * an {@link Action} to be invoked when the source ObservableSource finishes
  13. * @return an Observable that emits the same items as the source ObservableSource, then invokes the
  14. * {@link Action}
  15. * @see <a href="http://reactivex.io/documentation/operators/do.html">ReactiveX operators documentation: Do</a>
  16. * @see #doOnTerminate(Action)
  17. */
  18. @CheckReturnValue
  19. @SchedulerSupport(SchedulerSupport.NONE)
  20. public final Observable<T> doAfterTerminate(Action onFinally) {
  21. ObjectHelper.requireNonNull(onFinally, "onFinally is null");
  22. return doOnEach(Functions.emptyConsumer(), Functions.emptyConsumer(), Functions.EMPTY_ACTION, onFinally);
  23. }

代码示例来源:origin: ReactiveX/RxJava

  1. /**
  2. * Modifies the source ObservableSource so that it invokes an action when it calls {@code onComplete} or
  3. * {@code onError}.
  4. * <p>
  5. * <img width="640" height="327" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/doOnTerminate.o.png" alt="">
  6. * <p>
  7. * This differs from {@code doAfterTerminate} in that this happens <em>before</em> the {@code onComplete} or
  8. * {@code onError} notification.
  9. * <dl>
  10. * <dt><b>Scheduler:</b></dt>
  11. * <dd>{@code doOnTerminate} does not operate by default on a particular {@link Scheduler}.</dd>
  12. * </dl>
  13. *
  14. * @param onTerminate
  15. * the action to invoke when the source ObservableSource calls {@code onComplete} or {@code onError}
  16. * @return the source ObservableSource with the side-effecting behavior applied
  17. * @see <a href="http://reactivex.io/documentation/operators/do.html">ReactiveX operators documentation: Do</a>
  18. * @see #doAfterTerminate(Action)
  19. */
  20. @CheckReturnValue
  21. @SchedulerSupport(SchedulerSupport.NONE)
  22. public final Observable<T> doOnTerminate(final Action onTerminate) {
  23. ObjectHelper.requireNonNull(onTerminate, "onTerminate is null");
  24. return doOnEach(Functions.emptyConsumer(),
  25. Functions.actionConsumer(onTerminate), onTerminate,
  26. Functions.EMPTY_ACTION);
  27. }

代码示例来源:origin: redisson/redisson

  1. /**
  2. * Modifies the source ObservableSource so that it invokes an action when it calls {@code onComplete}.
  3. * <p>
  4. * <img width="640" height="358" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/doOnComplete.o.png" alt="">
  5. * <dl>
  6. * <dt><b>Scheduler:</b></dt>
  7. * <dd>{@code doOnComplete} does not operate by default on a particular {@link Scheduler}.</dd>
  8. * </dl>
  9. *
  10. * @param onComplete
  11. * the action to invoke when the source ObservableSource calls {@code onComplete}
  12. * @return the source ObservableSource with the side-effecting behavior applied
  13. * @see <a href="http://reactivex.io/documentation/operators/do.html">ReactiveX operators documentation: Do</a>
  14. */
  15. @CheckReturnValue
  16. @SchedulerSupport(SchedulerSupport.NONE)
  17. public final Observable<T> doOnComplete(Action onComplete) {
  18. return doOnEach(Functions.emptyConsumer(), Functions.emptyConsumer(), onComplete, Functions.EMPTY_ACTION);
  19. }

代码示例来源:origin: ReactiveX/RxJava

  1. /**
  2. * Modifies the source ObservableSource so that it invokes an action for each item it emits.
  3. * <p>
  4. * <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/doOnEach.png" alt="">
  5. * <dl>
  6. * <dt><b>Scheduler:</b></dt>
  7. * <dd>{@code doOnEach} does not operate by default on a particular {@link Scheduler}.</dd>
  8. * </dl>
  9. *
  10. * @param onNotification
  11. * the action to invoke for each item emitted by the source ObservableSource
  12. * @return the source ObservableSource with the side-effecting behavior applied
  13. * @see <a href="http://reactivex.io/documentation/operators/do.html">ReactiveX operators documentation: Do</a>
  14. */
  15. @CheckReturnValue
  16. @SchedulerSupport(SchedulerSupport.NONE)
  17. public final Observable<T> doOnEach(final Consumer<? super Notification<T>> onNotification) {
  18. ObjectHelper.requireNonNull(onNotification, "consumer is null");
  19. return doOnEach(
  20. Functions.notificationOnNext(onNotification),
  21. Functions.notificationOnError(onNotification),
  22. Functions.notificationOnComplete(onNotification),
  23. Functions.EMPTY_ACTION
  24. );
  25. }

代码示例来源:origin: ReactiveX/RxJava

  1. /**
  2. * Modifies the source ObservableSource so that it notifies an Observer for each item and terminal event it emits.
  3. * <p>
  4. * In case the {@code onError} of the supplied observer throws, the downstream will receive a composite
  5. * exception containing the original exception and the exception thrown by {@code onError}. If either the
  6. * {@code onNext} or the {@code onComplete} method of the supplied observer throws, the downstream will be
  7. * terminated and will receive this thrown exception.
  8. * <p>
  9. * <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/doOnEach.o.png" alt="">
  10. * <dl>
  11. * <dt><b>Scheduler:</b></dt>
  12. * <dd>{@code doOnEach} does not operate by default on a particular {@link Scheduler}.</dd>
  13. * </dl>
  14. *
  15. * @param observer
  16. * the observer to be notified about onNext, onError and onComplete events on its
  17. * respective methods before the actual downstream Observer gets notified.
  18. * @return the source ObservableSource with the side-effecting behavior applied
  19. * @see <a href="http://reactivex.io/documentation/operators/do.html">ReactiveX operators documentation: Do</a>
  20. */
  21. @CheckReturnValue
  22. @SchedulerSupport(SchedulerSupport.NONE)
  23. public final Observable<T> doOnEach(final Observer<? super T> observer) {
  24. ObjectHelper.requireNonNull(observer, "observer is null");
  25. return doOnEach(
  26. ObservableInternalHelper.observerOnNext(observer),
  27. ObservableInternalHelper.observerOnError(observer),
  28. ObservableInternalHelper.observerOnComplete(observer),
  29. Functions.EMPTY_ACTION);
  30. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testWithCombineLatestIssue1717() throws InterruptedException {
  3. final CountDownLatch latch = new CountDownLatch(1);
  4. final AtomicInteger count = new AtomicInteger();
  5. final int SIZE = 2000;
  6. Observable<Long> timer = Observable.interval(0, 1, TimeUnit.MILLISECONDS)
  7. .observeOn(Schedulers.newThread())
  8. .doOnEach(new Consumer<Notification<Long>>() {
  9. @Override
  10. public void accept(Notification<Long> n) {
  11. // System.out.println(n);
  12. if (count.incrementAndGet() >= SIZE) {
  13. latch.countDown();
  14. }
  15. }
  16. }).take(SIZE);
  17. TestObserver<Long> to = new TestObserver<Long>();
  18. Observable.combineLatest(timer, Observable.<Integer> never(), new BiFunction<Long, Integer, Long>() {
  19. @Override
  20. public Long apply(Long t1, Integer t2) {
  21. return t1;
  22. }
  23. }).subscribe(to);
  24. if (!latch.await(SIZE + 1000, TimeUnit.MILLISECONDS)) {
  25. fail("timed out");
  26. }
  27. assertEquals(SIZE, count.get());
  28. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testDelayEmitsEverything() {
  3. Observable<Integer> source = Observable.range(1, 5);
  4. Observable<Integer> delayed = source.delay(500L, TimeUnit.MILLISECONDS, scheduler);
  5. delayed = delayed.doOnEach(new Consumer<Notification<Integer>>() {
  6. @Override
  7. public void accept(Notification<Integer> t1) {
  8. System.out.println(t1);
  9. }
  10. });
  11. TestObserver<Integer> observer = new TestObserver<Integer>();
  12. delayed.subscribe(observer);
  13. // all will be delivered after 500ms since range does not delay between them
  14. scheduler.advanceTimeBy(500L, TimeUnit.MILLISECONDS);
  15. observer.assertValues(1, 2, 3, 4, 5);
  16. }

代码示例来源:origin: ReactiveX/RxJava

  1. }).doOnEach(new Consumer<Notification<String>>() {

代码示例来源:origin: ReactiveX/RxJava

  1. }).doOnEach(new Consumer<Notification<String>>() {

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public Observable<String> apply(final GroupedObservable<Integer, Integer> group) {
  3. if (group.getKey() < 3) {
  4. return group.map(new Function<Integer, String>() {
  5. @Override
  6. public String apply(Integer t1) {
  7. return "first groups: " + t1;
  8. }
  9. })
  10. // must take(2) so an onComplete + unsubscribe happens on these first 2 groups
  11. .take(2).doOnComplete(new Action() {
  12. @Override
  13. public void run() {
  14. first.countDown();
  15. }
  16. });
  17. } else {
  18. return group.subscribeOn(Schedulers.newThread()).delay(400, TimeUnit.MILLISECONDS).map(new Function<Integer, String>() {
  19. @Override
  20. public String apply(Integer t1) {
  21. return "last group: " + t1;
  22. }
  23. }).doOnEach(new Consumer<Notification<String>>() {
  24. @Override
  25. public void accept(Notification<String> t1) {
  26. System.err.println("subscribeOn notification => " + t1);
  27. }
  28. });
  29. }
  30. }

相关文章

Observable类方法