io.reactivex.Observable.doOnEach()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(16.1k)|赞(0)|评价(0)|浏览(120)

本文整理了Java中io.reactivex.Observable.doOnEach()方法的一些代码示例,展示了Observable.doOnEach()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.doOnEach()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:doOnEach

Observable.doOnEach介绍

[英]Modifies the source ObservableSource so that it notifies an Observer for each item and terminal event it emits.

In case the onError of the supplied observer throws, the downstream will receive a composite exception containing the original exception and the exception thrown by onError. If either the onNext or the onComplete method of the supplied observer throws, the downstream will be terminated and will receive this thrown exception.

Scheduler: doOnEach does not operate by default on a particular Scheduler.
[中]修改源ObservableSource,以便它为其发出的每个项目和终端事件通知观察者。
如果提供的观察者的onError抛出,下游将接收一个包含原始异常和onError抛出的异常的复合异常。如果提供的观察者的onNext或onComplete方法抛出,则下游将被终止,并将接收该抛出的异常。
调度器:默认情况下,doOnEach不会在特定的调度器上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Override
  public ObservableSource<Object> apply(Observable<Object> o) throws Exception {
    return o.doOnEach(new TestObserver<Object>());
  }
});

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void doOnEachSupplierNull() {
  just1.doOnEach((Consumer<Notification<Integer>>)null);
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void doOnEachSubscriberNull() {
  just1.doOnEach((Observer<Integer>)null);
}

代码示例来源:origin: ReactiveX/RxJava

/**
 * Modifies the source ObservableSource so that it invokes an action when it calls {@code onNext}.
 * <p>
 * <img width="640" height="360" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/doOnNext.o.png" alt="">
 * <dl>
 *  <dt><b>Scheduler:</b></dt>
 *  <dd>{@code doOnNext} does not operate by default on a particular {@link Scheduler}.</dd>
 * </dl>
 *
 * @param onNext
 *            the action to invoke when the source ObservableSource calls {@code onNext}
 * @return the source ObservableSource with the side-effecting behavior applied
 * @see <a href="http://reactivex.io/documentation/operators/do.html">ReactiveX operators documentation: Do</a>
 */
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Observable<T> doOnNext(Consumer<? super T> onNext) {
  return doOnEach(onNext, Functions.emptyConsumer(), Functions.EMPTY_ACTION, Functions.EMPTY_ACTION);
}

代码示例来源:origin: ReactiveX/RxJava

/**
 * Modifies the source ObservableSource so that it invokes an action if it calls {@code onError}.
 * <p>
 * In case the {@code onError} action throws, the downstream will receive a composite exception containing
 * the original exception and the exception thrown by {@code onError}.
 * <p>
 * <img width="640" height="355" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/doOnError.o.png" alt="">
 * <dl>
 *  <dt><b>Scheduler:</b></dt>
 *  <dd>{@code doOnError} does not operate by default on a particular {@link Scheduler}.</dd>
 * </dl>
 *
 * @param onError
 *            the action to invoke if the source ObservableSource calls {@code onError}
 * @return the source ObservableSource with the side-effecting behavior applied
 * @see <a href="http://reactivex.io/documentation/operators/do.html">ReactiveX operators documentation: Do</a>
 */
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Observable<T> doOnError(Consumer<? super Throwable> onError) {
  return doOnEach(Functions.emptyConsumer(), onError, Functions.EMPTY_ACTION, Functions.EMPTY_ACTION);
}

代码示例来源:origin: ReactiveX/RxJava

/**
 * Modifies the source ObservableSource so that it invokes an action when it calls {@code onComplete}.
 * <p>
 * <img width="640" height="358" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/doOnComplete.o.png" alt="">
 * <dl>
 *  <dt><b>Scheduler:</b></dt>
 *  <dd>{@code doOnComplete} does not operate by default on a particular {@link Scheduler}.</dd>
 * </dl>
 *
 * @param onComplete
 *            the action to invoke when the source ObservableSource calls {@code onComplete}
 * @return the source ObservableSource with the side-effecting behavior applied
 * @see <a href="http://reactivex.io/documentation/operators/do.html">ReactiveX operators documentation: Do</a>
 */
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Observable<T> doOnComplete(Action onComplete) {
  return doOnEach(Functions.emptyConsumer(), Functions.emptyConsumer(), onComplete, Functions.EMPTY_ACTION);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testDoOnEach() {
  Observable<String> base = Observable.just("a", "b", "c");
  Observable<String> doOnEach = base.doOnEach(sideEffectObserver);
  doOnEach.subscribe(subscribedObserver);
  // ensure the leaf Observer is still getting called
  verify(subscribedObserver, never()).onError(any(Throwable.class));
  verify(subscribedObserver, times(1)).onNext("a");
  verify(subscribedObserver, times(1)).onNext("b");
  verify(subscribedObserver, times(1)).onNext("c");
  verify(subscribedObserver, times(1)).onComplete();
  // ensure our injected Observer is getting called
  verify(sideEffectObserver, never()).onError(any(Throwable.class));
  verify(sideEffectObserver, times(1)).onNext("a");
  verify(sideEffectObserver, times(1)).onNext("b");
  verify(sideEffectObserver, times(1)).onNext("c");
  verify(sideEffectObserver, times(1)).onComplete();
}

代码示例来源:origin: redisson/redisson

/**
 * Modifies the source ObservableSource so that it invokes an action when it calls {@code onNext}.
 * <p>
 * <img width="640" height="360" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/doOnNext.o.png" alt="">
 * <dl>
 *  <dt><b>Scheduler:</b></dt>
 *  <dd>{@code doOnNext} does not operate by default on a particular {@link Scheduler}.</dd>
 * </dl>
 *
 * @param onNext
 *            the action to invoke when the source ObservableSource calls {@code onNext}
 * @return the source ObservableSource with the side-effecting behavior applied
 * @see <a href="http://reactivex.io/documentation/operators/do.html">ReactiveX operators documentation: Do</a>
 */
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Observable<T> doOnNext(Consumer<? super T> onNext) {
  return doOnEach(onNext, Functions.emptyConsumer(), Functions.EMPTY_ACTION, Functions.EMPTY_ACTION);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void dispose() {
  TestHelper.checkDisposed(Observable.just(1).doOnEach(new TestObserver<Integer>()));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testDoOnEachWithError() {
  Observable<String> base = Observable.just("one", "fail", "two", "three", "fail");
  Observable<String> errs = base.map(new Function<String, String>() {
    @Override
    public String apply(String s) {
      if ("fail".equals(s)) {
        throw new RuntimeException("Forced Failure");
      }
      return s;
    }
  });
  Observable<String> doOnEach = errs.doOnEach(sideEffectObserver);
  doOnEach.subscribe(subscribedObserver);
  verify(subscribedObserver, times(1)).onNext("one");
  verify(subscribedObserver, never()).onNext("two");
  verify(subscribedObserver, never()).onNext("three");
  verify(subscribedObserver, never()).onComplete();
  verify(subscribedObserver, times(1)).onError(any(Throwable.class));
  verify(sideEffectObserver, times(1)).onNext("one");
  verify(sideEffectObserver, never()).onNext("two");
  verify(sideEffectObserver, never()).onNext("three");
  verify(sideEffectObserver, never()).onComplete();
  verify(sideEffectObserver, times(1)).onError(any(Throwable.class));
}

代码示例来源:origin: ReactiveX/RxJava

/**
 * Registers an {@link Action} to be called when this ObservableSource invokes either
 * {@link Observer#onComplete onComplete} or {@link Observer#onError onError}.
 * <p>
 * <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/doAfterTerminate.png" alt="">
 * <dl>
 *  <dt><b>Scheduler:</b></dt>
 *  <dd>{@code doAfterTerminate} does not operate by default on a particular {@link Scheduler}.</dd>
 * </dl>
 *
 * @param onFinally
 *            an {@link Action} to be invoked when the source ObservableSource finishes
 * @return an Observable that emits the same items as the source ObservableSource, then invokes the
 *         {@link Action}
 * @see <a href="http://reactivex.io/documentation/operators/do.html">ReactiveX operators documentation: Do</a>
 * @see #doOnTerminate(Action)
 */
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Observable<T> doAfterTerminate(Action onFinally) {
  ObjectHelper.requireNonNull(onFinally, "onFinally is null");
  return doOnEach(Functions.emptyConsumer(), Functions.emptyConsumer(), Functions.EMPTY_ACTION, onFinally);
}

代码示例来源:origin: ReactiveX/RxJava

/**
 * Modifies the source ObservableSource so that it invokes an action when it calls {@code onComplete} or
 * {@code onError}.
 * <p>
 * <img width="640" height="327" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/doOnTerminate.o.png" alt="">
 * <p>
 * This differs from {@code doAfterTerminate} in that this happens <em>before</em> the {@code onComplete} or
 * {@code onError} notification.
 * <dl>
 *  <dt><b>Scheduler:</b></dt>
 *  <dd>{@code doOnTerminate} does not operate by default on a particular {@link Scheduler}.</dd>
 * </dl>
 *
 * @param onTerminate
 *            the action to invoke when the source ObservableSource calls {@code onComplete} or {@code onError}
 * @return the source ObservableSource with the side-effecting behavior applied
 * @see <a href="http://reactivex.io/documentation/operators/do.html">ReactiveX operators documentation: Do</a>
 * @see #doAfterTerminate(Action)
 */
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Observable<T> doOnTerminate(final Action onTerminate) {
  ObjectHelper.requireNonNull(onTerminate, "onTerminate is null");
  return doOnEach(Functions.emptyConsumer(),
      Functions.actionConsumer(onTerminate), onTerminate,
      Functions.EMPTY_ACTION);
}

代码示例来源:origin: redisson/redisson

/**
 * Modifies the source ObservableSource so that it invokes an action when it calls {@code onComplete}.
 * <p>
 * <img width="640" height="358" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/doOnComplete.o.png" alt="">
 * <dl>
 *  <dt><b>Scheduler:</b></dt>
 *  <dd>{@code doOnComplete} does not operate by default on a particular {@link Scheduler}.</dd>
 * </dl>
 *
 * @param onComplete
 *            the action to invoke when the source ObservableSource calls {@code onComplete}
 * @return the source ObservableSource with the side-effecting behavior applied
 * @see <a href="http://reactivex.io/documentation/operators/do.html">ReactiveX operators documentation: Do</a>
 */
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Observable<T> doOnComplete(Action onComplete) {
  return doOnEach(Functions.emptyConsumer(), Functions.emptyConsumer(), onComplete, Functions.EMPTY_ACTION);
}

代码示例来源:origin: ReactiveX/RxJava

/**
 * Modifies the source ObservableSource so that it invokes an action for each item it emits.
 * <p>
 * <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/doOnEach.png" alt="">
 * <dl>
 *  <dt><b>Scheduler:</b></dt>
 *  <dd>{@code doOnEach} does not operate by default on a particular {@link Scheduler}.</dd>
 * </dl>
 *
 * @param onNotification
 *            the action to invoke for each item emitted by the source ObservableSource
 * @return the source ObservableSource with the side-effecting behavior applied
 * @see <a href="http://reactivex.io/documentation/operators/do.html">ReactiveX operators documentation: Do</a>
 */
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Observable<T> doOnEach(final Consumer<? super Notification<T>> onNotification) {
  ObjectHelper.requireNonNull(onNotification, "consumer is null");
  return doOnEach(
      Functions.notificationOnNext(onNotification),
      Functions.notificationOnError(onNotification),
      Functions.notificationOnComplete(onNotification),
      Functions.EMPTY_ACTION
    );
}

代码示例来源:origin: ReactiveX/RxJava

/**
 * Modifies the source ObservableSource so that it notifies an Observer for each item and terminal event it emits.
 * <p>
 * In case the {@code onError} of the supplied observer throws, the downstream will receive a composite
 * exception containing the original exception and the exception thrown by {@code onError}. If either the
 * {@code onNext} or the {@code onComplete} method of the supplied observer throws, the downstream will be
 * terminated and will receive this thrown exception.
 * <p>
 * <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/doOnEach.o.png" alt="">
 * <dl>
 *  <dt><b>Scheduler:</b></dt>
 *  <dd>{@code doOnEach} does not operate by default on a particular {@link Scheduler}.</dd>
 * </dl>
 *
 * @param observer
 *            the observer to be notified about onNext, onError and onComplete events on its
 *            respective methods before the actual downstream Observer gets notified.
 * @return the source ObservableSource with the side-effecting behavior applied
 * @see <a href="http://reactivex.io/documentation/operators/do.html">ReactiveX operators documentation: Do</a>
 */
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Observable<T> doOnEach(final Observer<? super T> observer) {
  ObjectHelper.requireNonNull(observer, "observer is null");
  return doOnEach(
      ObservableInternalHelper.observerOnNext(observer),
      ObservableInternalHelper.observerOnError(observer),
      ObservableInternalHelper.observerOnComplete(observer),
      Functions.EMPTY_ACTION);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testWithCombineLatestIssue1717() throws InterruptedException {
  final CountDownLatch latch = new CountDownLatch(1);
  final AtomicInteger count = new AtomicInteger();
  final int SIZE = 2000;
  Observable<Long> timer = Observable.interval(0, 1, TimeUnit.MILLISECONDS)
      .observeOn(Schedulers.newThread())
      .doOnEach(new Consumer<Notification<Long>>() {
        @Override
        public void accept(Notification<Long> n) {
            //                        System.out.println(n);
            if (count.incrementAndGet() >= SIZE) {
              latch.countDown();
            }
        }
      }).take(SIZE);
  TestObserver<Long> to = new TestObserver<Long>();
  Observable.combineLatest(timer, Observable.<Integer> never(), new BiFunction<Long, Integer, Long>() {
    @Override
    public Long apply(Long t1, Integer t2) {
      return t1;
    }
  }).subscribe(to);
  if (!latch.await(SIZE + 1000, TimeUnit.MILLISECONDS)) {
    fail("timed out");
  }
  assertEquals(SIZE, count.get());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testDelayEmitsEverything() {
  Observable<Integer> source = Observable.range(1, 5);
  Observable<Integer> delayed = source.delay(500L, TimeUnit.MILLISECONDS, scheduler);
  delayed = delayed.doOnEach(new Consumer<Notification<Integer>>() {
    @Override
    public void accept(Notification<Integer> t1) {
      System.out.println(t1);
    }
  });
  TestObserver<Integer> observer = new TestObserver<Integer>();
  delayed.subscribe(observer);
  // all will be delivered after 500ms since range does not delay between them
  scheduler.advanceTimeBy(500L, TimeUnit.MILLISECONDS);
  observer.assertValues(1, 2, 3, 4, 5);
}

代码示例来源:origin: ReactiveX/RxJava

}).doOnEach(new Consumer<Notification<String>>() {

代码示例来源:origin: ReactiveX/RxJava

}).doOnEach(new Consumer<Notification<String>>() {

代码示例来源:origin: ReactiveX/RxJava

@Override
public Observable<String> apply(final GroupedObservable<Integer, Integer> group) {
  if (group.getKey() < 3) {
    return group.map(new Function<Integer, String>() {
      @Override
      public String apply(Integer t1) {
        return "first groups: " + t1;
      }
    })
        // must take(2) so an onComplete + unsubscribe happens on these first 2 groups
        .take(2).doOnComplete(new Action() {
          @Override
          public void run() {
            first.countDown();
          }
        });
  } else {
    return group.subscribeOn(Schedulers.newThread()).delay(400, TimeUnit.MILLISECONDS).map(new Function<Integer, String>() {
      @Override
      public String apply(Integer t1) {
        return "last group: " + t1;
      }
    }).doOnEach(new Consumer<Notification<String>>() {
      @Override
      public void accept(Notification<String> t1) {
        System.err.println("subscribeOn notification => " + t1);
      }
    });
  }
}

相关文章

Observable类方法