io.reactivex.Flowable.doOnEach()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(14.2k)|赞(0)|评价(0)|浏览(239)

本文整理了Java中io.reactivex.Flowable.doOnEach()方法的一些代码示例,展示了Flowable.doOnEach()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.doOnEach()方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:doOnEach

Flowable.doOnEach介绍

[英]Modifies the source Publisher so that it invokes an action for each item it emits.

Backpressure: The operator doesn't interfere with backpressure which is determined by the source Publisher's backpressure behavior. Scheduler: doOnEach does not operate by default on a particular Scheduler.
[中]修改源发布服务器,使其为发出的每个项调用操作。
背压:操作员不会干扰由源发布者的背压行为确定的背压。调度程序:默认情况下,doOnEach不会在特定调度程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public Flowable<Object> apply(Flowable<Object> f) throws Exception {
  3. return f.doOnEach(new TestSubscriber<Object>());
  4. }
  5. });

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = NullPointerException.class)
  2. public void doOnEachSubscriberNull() {
  3. just1.doOnEach((Subscriber<Integer>)null);
  4. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = NullPointerException.class)
  2. public void doOnEachSupplierNull() {
  3. just1.doOnEach((Consumer<Notification<Integer>>)null);
  4. }

代码示例来源:origin: ReactiveX/RxJava

  1. /**
  2. * Modifies the source Publisher so that it invokes an action if it calls {@code onError}.
  3. * <p>
  4. * In case the {@code onError} action throws, the downstream will receive a composite exception containing
  5. * the original exception and the exception thrown by {@code onError}.
  6. * <p>
  7. * <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/doOnError.png" alt="">
  8. * <dl>
  9. * <dt><b>Backpressure:</b></dt>
  10. * <dd>The operator doesn't interfere with backpressure which is determined by the source {@code Publisher}'s
  11. * backpressure behavior.</dd>
  12. * <dt><b>Scheduler:</b></dt>
  13. * <dd>{@code doOnError} does not operate by default on a particular {@link Scheduler}.</dd>
  14. * </dl>
  15. *
  16. * @param onError
  17. * the action to invoke if the source Publisher calls {@code onError}
  18. * @return the source Publisher with the side-effecting behavior applied
  19. * @see <a href="http://reactivex.io/documentation/operators/do.html">ReactiveX operators documentation: Do</a>
  20. */
  21. @CheckReturnValue
  22. @BackpressureSupport(BackpressureKind.PASS_THROUGH)
  23. @SchedulerSupport(SchedulerSupport.NONE)
  24. public final Flowable<T> doOnError(Consumer<? super Throwable> onError) {
  25. return doOnEach(Functions.emptyConsumer(), onError,
  26. Functions.EMPTY_ACTION, Functions.EMPTY_ACTION);
  27. }

代码示例来源:origin: ReactiveX/RxJava

  1. /**
  2. * Modifies the source Publisher so that it invokes an action when it calls {@code onNext}.
  3. * <p>
  4. * <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/doOnNext.png" alt="">
  5. * <dl>
  6. * <dt><b>Backpressure:</b></dt>
  7. * <dd>The operator doesn't interfere with backpressure which is determined by the source {@code Publisher}'s
  8. * backpressure behavior.</dd>
  9. * <dt><b>Scheduler:</b></dt>
  10. * <dd>{@code doOnNext} does not operate by default on a particular {@link Scheduler}.</dd>
  11. * </dl>
  12. *
  13. * @param onNext
  14. * the action to invoke when the source Publisher calls {@code onNext}
  15. * @return the source Publisher with the side-effecting behavior applied
  16. * @see <a href="http://reactivex.io/documentation/operators/do.html">ReactiveX operators documentation: Do</a>
  17. */
  18. @CheckReturnValue
  19. @BackpressureSupport(BackpressureKind.PASS_THROUGH)
  20. @SchedulerSupport(SchedulerSupport.NONE)
  21. public final Flowable<T> doOnNext(Consumer<? super T> onNext) {
  22. return doOnEach(onNext, Functions.emptyConsumer(),
  23. Functions.EMPTY_ACTION, Functions.EMPTY_ACTION);
  24. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testDoOnEach() {
  3. Flowable<String> base = Flowable.just("a", "b", "c");
  4. Flowable<String> doOnEach = base.doOnEach(sideEffectSubscriber);
  5. doOnEach.subscribe(subscribedSubscriber);
  6. // ensure the leaf observer is still getting called
  7. verify(subscribedSubscriber, never()).onError(any(Throwable.class));
  8. verify(subscribedSubscriber, times(1)).onNext("a");
  9. verify(subscribedSubscriber, times(1)).onNext("b");
  10. verify(subscribedSubscriber, times(1)).onNext("c");
  11. verify(subscribedSubscriber, times(1)).onComplete();
  12. // ensure our injected observer is getting called
  13. verify(sideEffectSubscriber, never()).onError(any(Throwable.class));
  14. verify(sideEffectSubscriber, times(1)).onNext("a");
  15. verify(sideEffectSubscriber, times(1)).onNext("b");
  16. verify(sideEffectSubscriber, times(1)).onNext("c");
  17. verify(sideEffectSubscriber, times(1)).onComplete();
  18. }

代码示例来源:origin: ReactiveX/RxJava

  1. /**
  2. * Registers an {@link Action} to be called when this Publisher invokes either
  3. * {@link Subscriber#onComplete onComplete} or {@link Subscriber#onError onError}.
  4. * <p>
  5. * <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/finallyDo.png" alt="">
  6. * <dl>
  7. * <dt><b>Backpressure:</b></dt>
  8. * <dd>The operator doesn't interfere with backpressure which is determined by the source {@code Publisher}'s backpressure
  9. * behavior.</dd>
  10. * <dt><b>Scheduler:</b></dt>
  11. * <dd>{@code doAfterTerminate} does not operate by default on a particular {@link Scheduler}.</dd>
  12. * </dl>
  13. *
  14. * @param onAfterTerminate
  15. * an {@link Action} to be invoked when the source Publisher finishes
  16. * @return a Flowable that emits the same items as the source Publisher, then invokes the
  17. * {@link Action}
  18. * @see <a href="http://reactivex.io/documentation/operators/do.html">ReactiveX operators documentation: Do</a>
  19. * @see #doOnTerminate(Action)
  20. */
  21. @CheckReturnValue
  22. @BackpressureSupport(BackpressureKind.PASS_THROUGH)
  23. @SchedulerSupport(SchedulerSupport.NONE)
  24. public final Flowable<T> doAfterTerminate(Action onAfterTerminate) {
  25. return doOnEach(Functions.emptyConsumer(), Functions.emptyConsumer(),
  26. Functions.EMPTY_ACTION, onAfterTerminate);
  27. }

代码示例来源:origin: ReactiveX/RxJava

  1. /**
  2. * Modifies the source Publisher so that it invokes an action when it calls {@code onComplete}.
  3. * <p>
  4. * <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/doOnComplete.png" alt="">
  5. * <dl>
  6. * <dt><b>Backpressure:</b></dt>
  7. * <dd>The operator doesn't interfere with backpressure which is determined by the source {@code Publisher}'s
  8. * backpressure behavior.</dd>
  9. * <dt><b>Scheduler:</b></dt>
  10. * <dd>{@code doOnComplete} does not operate by default on a particular {@link Scheduler}.</dd>
  11. * </dl>
  12. *
  13. * @param onComplete
  14. * the action to invoke when the source Publisher calls {@code onComplete}
  15. * @return the source Publisher with the side-effecting behavior applied
  16. * @see <a href="http://reactivex.io/documentation/operators/do.html">ReactiveX operators documentation: Do</a>
  17. */
  18. @CheckReturnValue
  19. @BackpressureSupport(BackpressureKind.PASS_THROUGH)
  20. @SchedulerSupport(SchedulerSupport.NONE)
  21. public final Flowable<T> doOnComplete(Action onComplete) {
  22. return doOnEach(Functions.emptyConsumer(), Functions.emptyConsumer(),
  23. onComplete, Functions.EMPTY_ACTION);
  24. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testDoOnEachWithError() {
  3. Flowable<String> base = Flowable.just("one", "fail", "two", "three", "fail");
  4. Flowable<String> errs = base.map(new Function<String, String>() {
  5. @Override
  6. public String apply(String s) {
  7. if ("fail".equals(s)) {
  8. throw new RuntimeException("Forced Failure");
  9. }
  10. return s;
  11. }
  12. });
  13. Flowable<String> doOnEach = errs.doOnEach(sideEffectSubscriber);
  14. doOnEach.subscribe(subscribedSubscriber);
  15. verify(subscribedSubscriber, times(1)).onNext("one");
  16. verify(subscribedSubscriber, never()).onNext("two");
  17. verify(subscribedSubscriber, never()).onNext("three");
  18. verify(subscribedSubscriber, never()).onComplete();
  19. verify(subscribedSubscriber, times(1)).onError(any(Throwable.class));
  20. verify(sideEffectSubscriber, times(1)).onNext("one");
  21. verify(sideEffectSubscriber, never()).onNext("two");
  22. verify(sideEffectSubscriber, never()).onNext("three");
  23. verify(sideEffectSubscriber, never()).onComplete();
  24. verify(sideEffectSubscriber, times(1)).onError(any(Throwable.class));
  25. }

代码示例来源:origin: ReactiveX/RxJava

  1. /**
  2. * Modifies the source Publisher so that it invokes an action when it calls {@code onComplete} or
  3. * {@code onError}.
  4. * <p>
  5. * <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/doOnTerminate.png" alt="">
  6. * <p>
  7. * This differs from {@code doAfterTerminate} in that this happens <em>before</em> the {@code onComplete} or
  8. * {@code onError} notification.
  9. * <dl>
  10. * <dt><b>Backpressure:</b></dt>
  11. * <dd>The operator doesn't interfere with backpressure which is determined by the source {@code Publisher}'s
  12. * backpressure behavior.</dd>
  13. * <dt><b>Scheduler:</b></dt>
  14. * <dd>{@code doOnTerminate} does not operate by default on a particular {@link Scheduler}.</dd>
  15. * </dl>
  16. *
  17. * @param onTerminate
  18. * the action to invoke when the source Publisher calls {@code onComplete} or {@code onError}
  19. * @return the source Publisher with the side-effecting behavior applied
  20. * @see <a href="http://reactivex.io/documentation/operators/do.html">ReactiveX operators documentation: Do</a>
  21. * @see #doAfterTerminate(Action)
  22. */
  23. @CheckReturnValue
  24. @BackpressureSupport(BackpressureKind.PASS_THROUGH)
  25. @SchedulerSupport(SchedulerSupport.NONE)
  26. public final Flowable<T> doOnTerminate(final Action onTerminate) {
  27. return doOnEach(Functions.emptyConsumer(), Functions.actionConsumer(onTerminate),
  28. onTerminate, Functions.EMPTY_ACTION);
  29. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void dispose() {
  3. TestHelper.checkDisposed(Flowable.just(1).doOnEach(new TestSubscriber<Integer>()));
  4. }

代码示例来源:origin: ReactiveX/RxJava

  1. }).doOnEach(new Consumer<Notification<String>>() {

代码示例来源:origin: ReactiveX/RxJava

  1. /**
  2. * Modifies the source Publisher so that it invokes an action for each item it emits.
  3. * <p>
  4. * <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/doOnEach.png" alt="">
  5. * <dl>
  6. * <dt><b>Backpressure:</b></dt>
  7. * <dd>The operator doesn't interfere with backpressure which is determined by the source {@code Publisher}'s
  8. * backpressure behavior.</dd>
  9. * <dt><b>Scheduler:</b></dt>
  10. * <dd>{@code doOnEach} does not operate by default on a particular {@link Scheduler}.</dd>
  11. * </dl>
  12. *
  13. * @param onNotification
  14. * the action to invoke for each item emitted by the source Publisher
  15. * @return the source Publisher with the side-effecting behavior applied
  16. * @see <a href="http://reactivex.io/documentation/operators/do.html">ReactiveX operators documentation: Do</a>
  17. */
  18. @CheckReturnValue
  19. @NonNull
  20. @BackpressureSupport(BackpressureKind.PASS_THROUGH)
  21. @SchedulerSupport(SchedulerSupport.NONE)
  22. public final Flowable<T> doOnEach(final Consumer<? super Notification<T>> onNotification) {
  23. ObjectHelper.requireNonNull(onNotification, "consumer is null");
  24. return doOnEach(
  25. Functions.notificationOnNext(onNotification),
  26. Functions.notificationOnError(onNotification),
  27. Functions.notificationOnComplete(onNotification),
  28. Functions.EMPTY_ACTION
  29. );
  30. }

代码示例来源:origin: ReactiveX/RxJava

  1. }).doOnEach(new Consumer<Notification<String>>() {

代码示例来源:origin: ReactiveX/RxJava

  1. public final Flowable<T> doOnEach(final Subscriber<? super T> subscriber) {
  2. ObjectHelper.requireNonNull(subscriber, "subscriber is null");
  3. return doOnEach(
  4. FlowableInternalHelper.subscriberOnNext(subscriber),
  5. FlowableInternalHelper.subscriberOnError(subscriber),

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testWithCombineLatestIssue1717() throws InterruptedException {
  3. final CountDownLatch latch = new CountDownLatch(1);
  4. final AtomicInteger count = new AtomicInteger();
  5. final int SIZE = 2000;
  6. Flowable<Long> timer = Flowable.interval(0, 1, TimeUnit.MILLISECONDS)
  7. .observeOn(Schedulers.newThread())
  8. .doOnEach(new Consumer<Notification<Long>>() {
  9. @Override
  10. public void accept(Notification<Long> n) {
  11. // System.out.println(n);
  12. if (count.incrementAndGet() >= SIZE) {
  13. latch.countDown();
  14. }
  15. }
  16. }).take(SIZE);
  17. TestSubscriber<Long> ts = new TestSubscriber<Long>();
  18. Flowable.combineLatest(timer, Flowable.<Integer> never(), new BiFunction<Long, Integer, Long>() {
  19. @Override
  20. public Long apply(Long t1, Integer t2) {
  21. return t1;
  22. }
  23. }).subscribe(ts);
  24. if (!latch.await(SIZE + 2000, TimeUnit.MILLISECONDS)) {
  25. fail("timed out");
  26. }
  27. assertEquals(SIZE, count.get());
  28. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testErrorPropagatesWhenNoOutstandingRequests() {
  3. Flowable<Long> timer = Flowable.interval(0, 1, TimeUnit.MICROSECONDS)
  4. .doOnEach(new Consumer<Notification<Long>>() {
  5. .doOnEach(new Consumer<Notification<Long>>() {

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testDelayEmitsEverything() {
  3. Flowable<Integer> source = Flowable.range(1, 5);
  4. Flowable<Integer> delayed = source.delay(500L, TimeUnit.MILLISECONDS, scheduler);
  5. delayed = delayed.doOnEach(new Consumer<Notification<Integer>>() {
  6. @Override
  7. public void accept(Notification<Integer> t1) {
  8. System.out.println(t1);
  9. }
  10. });
  11. TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
  12. delayed.subscribe(ts);
  13. // all will be delivered after 500ms since range does not delay between them
  14. scheduler.advanceTimeBy(500L, TimeUnit.MILLISECONDS);
  15. ts.assertValues(1, 2, 3, 4, 5);
  16. }

代码示例来源:origin: ReactiveX/RxJava

  1. }).doOnEach(new Consumer<Notification<String>>() {

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public Flowable<String> apply(final GroupedFlowable<Integer, Integer> group) {
  3. if (group.getKey() < 3) {
  4. return group.map(new Function<Integer, String>() {
  5. @Override
  6. public String apply(Integer t1) {
  7. return "first groups: " + t1;
  8. }
  9. })
  10. // must take(2) so an onComplete + unsubscribe happens on these first 2 groups
  11. .take(2).doOnComplete(new Action() {
  12. @Override
  13. public void run() {
  14. first.countDown();
  15. }
  16. });
  17. } else {
  18. return group.subscribeOn(Schedulers.newThread()).delay(400, TimeUnit.MILLISECONDS).map(new Function<Integer, String>() {
  19. @Override
  20. public String apply(Integer t1) {
  21. return "last group: " + t1;
  22. }
  23. }).doOnEach(new Consumer<Notification<String>>() {
  24. @Override
  25. public void accept(Notification<String> t1) {
  26. System.err.println("subscribeOn notification => " + t1);
  27. }
  28. });
  29. }
  30. }

相关文章

Flowable类方法