io.reactivex.Flowable.doOnEach()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(14.2k)|赞(0)|评价(0)|浏览(207)

本文整理了Java中io.reactivex.Flowable.doOnEach()方法的一些代码示例,展示了Flowable.doOnEach()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.doOnEach()方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:doOnEach

Flowable.doOnEach介绍

[英]Modifies the source Publisher so that it invokes an action for each item it emits.

Backpressure: The operator doesn't interfere with backpressure which is determined by the source Publisher's backpressure behavior. Scheduler: doOnEach does not operate by default on a particular Scheduler.
[中]修改源发布服务器,使其为发出的每个项调用操作。
背压:操作员不会干扰由源发布者的背压行为确定的背压。调度程序:默认情况下,doOnEach不会在特定调度程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Flowable<Object> apply(Flowable<Object> f) throws Exception {
    return f.doOnEach(new TestSubscriber<Object>());
  }
});

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void doOnEachSubscriberNull() {
  just1.doOnEach((Subscriber<Integer>)null);
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void doOnEachSupplierNull() {
  just1.doOnEach((Consumer<Notification<Integer>>)null);
}

代码示例来源:origin: ReactiveX/RxJava

/**
 * Modifies the source Publisher so that it invokes an action if it calls {@code onError}.
 * <p>
 * In case the {@code onError} action throws, the downstream will receive a composite exception containing
 * the original exception and the exception thrown by {@code onError}.
 * <p>
 * <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/doOnError.png" alt="">
 * <dl>
 *  <dt><b>Backpressure:</b></dt>
 *  <dd>The operator doesn't interfere with backpressure which is determined by the source {@code Publisher}'s
 *  backpressure behavior.</dd>
 *  <dt><b>Scheduler:</b></dt>
 *  <dd>{@code doOnError} does not operate by default on a particular {@link Scheduler}.</dd>
 * </dl>
 *
 * @param onError
 *            the action to invoke if the source Publisher calls {@code onError}
 * @return the source Publisher with the side-effecting behavior applied
 * @see <a href="http://reactivex.io/documentation/operators/do.html">ReactiveX operators documentation: Do</a>
 */
@CheckReturnValue
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
@SchedulerSupport(SchedulerSupport.NONE)
public final Flowable<T> doOnError(Consumer<? super Throwable> onError) {
  return doOnEach(Functions.emptyConsumer(), onError,
      Functions.EMPTY_ACTION, Functions.EMPTY_ACTION);
}

代码示例来源:origin: ReactiveX/RxJava

/**
 * Modifies the source Publisher so that it invokes an action when it calls {@code onNext}.
 * <p>
 * <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/doOnNext.png" alt="">
 * <dl>
 *  <dt><b>Backpressure:</b></dt>
 *  <dd>The operator doesn't interfere with backpressure which is determined by the source {@code Publisher}'s
 *  backpressure behavior.</dd>
 *  <dt><b>Scheduler:</b></dt>
 *  <dd>{@code doOnNext} does not operate by default on a particular {@link Scheduler}.</dd>
 * </dl>
 *
 * @param onNext
 *            the action to invoke when the source Publisher calls {@code onNext}
 * @return the source Publisher with the side-effecting behavior applied
 * @see <a href="http://reactivex.io/documentation/operators/do.html">ReactiveX operators documentation: Do</a>
 */
@CheckReturnValue
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
@SchedulerSupport(SchedulerSupport.NONE)
public final Flowable<T> doOnNext(Consumer<? super T> onNext) {
  return doOnEach(onNext, Functions.emptyConsumer(),
      Functions.EMPTY_ACTION, Functions.EMPTY_ACTION);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testDoOnEach() {
  Flowable<String> base = Flowable.just("a", "b", "c");
  Flowable<String> doOnEach = base.doOnEach(sideEffectSubscriber);
  doOnEach.subscribe(subscribedSubscriber);
  // ensure the leaf observer is still getting called
  verify(subscribedSubscriber, never()).onError(any(Throwable.class));
  verify(subscribedSubscriber, times(1)).onNext("a");
  verify(subscribedSubscriber, times(1)).onNext("b");
  verify(subscribedSubscriber, times(1)).onNext("c");
  verify(subscribedSubscriber, times(1)).onComplete();
  // ensure our injected observer is getting called
  verify(sideEffectSubscriber, never()).onError(any(Throwable.class));
  verify(sideEffectSubscriber, times(1)).onNext("a");
  verify(sideEffectSubscriber, times(1)).onNext("b");
  verify(sideEffectSubscriber, times(1)).onNext("c");
  verify(sideEffectSubscriber, times(1)).onComplete();
}

代码示例来源:origin: ReactiveX/RxJava

/**
 * Registers an {@link Action} to be called when this Publisher invokes either
 * {@link Subscriber#onComplete onComplete} or {@link Subscriber#onError onError}.
 * <p>
 * <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/finallyDo.png" alt="">
 * <dl>
 *  <dt><b>Backpressure:</b></dt>
 *  <dd>The operator doesn't interfere with backpressure which is determined by the source {@code Publisher}'s backpressure
 *  behavior.</dd>
 *  <dt><b>Scheduler:</b></dt>
 *  <dd>{@code doAfterTerminate} does not operate by default on a particular {@link Scheduler}.</dd>
 * </dl>
 *
 * @param onAfterTerminate
 *            an {@link Action} to be invoked when the source Publisher finishes
 * @return a Flowable that emits the same items as the source Publisher, then invokes the
 *         {@link Action}
 * @see <a href="http://reactivex.io/documentation/operators/do.html">ReactiveX operators documentation: Do</a>
 * @see #doOnTerminate(Action)
 */
@CheckReturnValue
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
@SchedulerSupport(SchedulerSupport.NONE)
public final Flowable<T> doAfterTerminate(Action onAfterTerminate) {
  return doOnEach(Functions.emptyConsumer(), Functions.emptyConsumer(),
      Functions.EMPTY_ACTION, onAfterTerminate);
}

代码示例来源:origin: ReactiveX/RxJava

/**
 * Modifies the source Publisher so that it invokes an action when it calls {@code onComplete}.
 * <p>
 * <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/doOnComplete.png" alt="">
 * <dl>
 *  <dt><b>Backpressure:</b></dt>
 *  <dd>The operator doesn't interfere with backpressure which is determined by the source {@code Publisher}'s
 *  backpressure behavior.</dd>
 *  <dt><b>Scheduler:</b></dt>
 *  <dd>{@code doOnComplete} does not operate by default on a particular {@link Scheduler}.</dd>
 * </dl>
 *
 * @param onComplete
 *            the action to invoke when the source Publisher calls {@code onComplete}
 * @return the source Publisher with the side-effecting behavior applied
 * @see <a href="http://reactivex.io/documentation/operators/do.html">ReactiveX operators documentation: Do</a>
 */
@CheckReturnValue
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
@SchedulerSupport(SchedulerSupport.NONE)
public final Flowable<T> doOnComplete(Action onComplete) {
  return doOnEach(Functions.emptyConsumer(), Functions.emptyConsumer(),
      onComplete, Functions.EMPTY_ACTION);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testDoOnEachWithError() {
  Flowable<String> base = Flowable.just("one", "fail", "two", "three", "fail");
  Flowable<String> errs = base.map(new Function<String, String>() {
    @Override
    public String apply(String s) {
      if ("fail".equals(s)) {
        throw new RuntimeException("Forced Failure");
      }
      return s;
    }
  });
  Flowable<String> doOnEach = errs.doOnEach(sideEffectSubscriber);
  doOnEach.subscribe(subscribedSubscriber);
  verify(subscribedSubscriber, times(1)).onNext("one");
  verify(subscribedSubscriber, never()).onNext("two");
  verify(subscribedSubscriber, never()).onNext("three");
  verify(subscribedSubscriber, never()).onComplete();
  verify(subscribedSubscriber, times(1)).onError(any(Throwable.class));
  verify(sideEffectSubscriber, times(1)).onNext("one");
  verify(sideEffectSubscriber, never()).onNext("two");
  verify(sideEffectSubscriber, never()).onNext("three");
  verify(sideEffectSubscriber, never()).onComplete();
  verify(sideEffectSubscriber, times(1)).onError(any(Throwable.class));
}

代码示例来源:origin: ReactiveX/RxJava

/**
 * Modifies the source Publisher so that it invokes an action when it calls {@code onComplete} or
 * {@code onError}.
 * <p>
 * <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/doOnTerminate.png" alt="">
 * <p>
 * This differs from {@code doAfterTerminate} in that this happens <em>before</em> the {@code onComplete} or
 * {@code onError} notification.
 * <dl>
 *  <dt><b>Backpressure:</b></dt>
 *  <dd>The operator doesn't interfere with backpressure which is determined by the source {@code Publisher}'s
 *  backpressure behavior.</dd>
 *  <dt><b>Scheduler:</b></dt>
 *  <dd>{@code doOnTerminate} does not operate by default on a particular {@link Scheduler}.</dd>
 * </dl>
 *
 * @param onTerminate
 *            the action to invoke when the source Publisher calls {@code onComplete} or {@code onError}
 * @return the source Publisher with the side-effecting behavior applied
 * @see <a href="http://reactivex.io/documentation/operators/do.html">ReactiveX operators documentation: Do</a>
 * @see #doAfterTerminate(Action)
 */
@CheckReturnValue
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
@SchedulerSupport(SchedulerSupport.NONE)
public final Flowable<T> doOnTerminate(final Action onTerminate) {
  return doOnEach(Functions.emptyConsumer(), Functions.actionConsumer(onTerminate),
      onTerminate, Functions.EMPTY_ACTION);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void dispose() {
  TestHelper.checkDisposed(Flowable.just(1).doOnEach(new TestSubscriber<Integer>()));
}

代码示例来源:origin: ReactiveX/RxJava

}).doOnEach(new Consumer<Notification<String>>() {

代码示例来源:origin: ReactiveX/RxJava

/**
 * Modifies the source Publisher so that it invokes an action for each item it emits.
 * <p>
 * <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/doOnEach.png" alt="">
 * <dl>
 *  <dt><b>Backpressure:</b></dt>
 *  <dd>The operator doesn't interfere with backpressure which is determined by the source {@code Publisher}'s
 *  backpressure behavior.</dd>
 *  <dt><b>Scheduler:</b></dt>
 *  <dd>{@code doOnEach} does not operate by default on a particular {@link Scheduler}.</dd>
 * </dl>
 *
 * @param onNotification
 *            the action to invoke for each item emitted by the source Publisher
 * @return the source Publisher with the side-effecting behavior applied
 * @see <a href="http://reactivex.io/documentation/operators/do.html">ReactiveX operators documentation: Do</a>
 */
@CheckReturnValue
@NonNull
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
@SchedulerSupport(SchedulerSupport.NONE)
public final Flowable<T> doOnEach(final Consumer<? super Notification<T>> onNotification) {
  ObjectHelper.requireNonNull(onNotification, "consumer is null");
  return doOnEach(
      Functions.notificationOnNext(onNotification),
      Functions.notificationOnError(onNotification),
      Functions.notificationOnComplete(onNotification),
      Functions.EMPTY_ACTION
    );
}

代码示例来源:origin: ReactiveX/RxJava

}).doOnEach(new Consumer<Notification<String>>() {

代码示例来源:origin: ReactiveX/RxJava

public final Flowable<T> doOnEach(final Subscriber<? super T> subscriber) {
  ObjectHelper.requireNonNull(subscriber, "subscriber is null");
  return doOnEach(
      FlowableInternalHelper.subscriberOnNext(subscriber),
      FlowableInternalHelper.subscriberOnError(subscriber),

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testWithCombineLatestIssue1717() throws InterruptedException {
  final CountDownLatch latch = new CountDownLatch(1);
  final AtomicInteger count = new AtomicInteger();
  final int SIZE = 2000;
  Flowable<Long> timer = Flowable.interval(0, 1, TimeUnit.MILLISECONDS)
      .observeOn(Schedulers.newThread())
      .doOnEach(new Consumer<Notification<Long>>() {
        @Override
        public void accept(Notification<Long> n) {
            //                        System.out.println(n);
            if (count.incrementAndGet() >= SIZE) {
              latch.countDown();
            }
        }
      }).take(SIZE);
  TestSubscriber<Long> ts = new TestSubscriber<Long>();
  Flowable.combineLatest(timer, Flowable.<Integer> never(), new BiFunction<Long, Integer, Long>() {
    @Override
    public Long apply(Long t1, Integer t2) {
      return t1;
    }
  }).subscribe(ts);
  if (!latch.await(SIZE + 2000, TimeUnit.MILLISECONDS)) {
    fail("timed out");
  }
  assertEquals(SIZE, count.get());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testErrorPropagatesWhenNoOutstandingRequests() {
  Flowable<Long> timer = Flowable.interval(0, 1, TimeUnit.MICROSECONDS)
      .doOnEach(new Consumer<Notification<Long>>() {
      .doOnEach(new Consumer<Notification<Long>>() {

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testDelayEmitsEverything() {
  Flowable<Integer> source = Flowable.range(1, 5);
  Flowable<Integer> delayed = source.delay(500L, TimeUnit.MILLISECONDS, scheduler);
  delayed = delayed.doOnEach(new Consumer<Notification<Integer>>() {
    @Override
    public void accept(Notification<Integer> t1) {
      System.out.println(t1);
    }
  });
  TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
  delayed.subscribe(ts);
  // all will be delivered after 500ms since range does not delay between them
  scheduler.advanceTimeBy(500L, TimeUnit.MILLISECONDS);
  ts.assertValues(1, 2, 3, 4, 5);
}

代码示例来源:origin: ReactiveX/RxJava

}).doOnEach(new Consumer<Notification<String>>() {

代码示例来源:origin: ReactiveX/RxJava

@Override
public Flowable<String> apply(final GroupedFlowable<Integer, Integer> group) {
  if (group.getKey() < 3) {
    return group.map(new Function<Integer, String>() {
      @Override
      public String apply(Integer t1) {
        return "first groups: " + t1;
      }
    })
        // must take(2) so an onComplete + unsubscribe happens on these first 2 groups
        .take(2).doOnComplete(new Action() {
          @Override
          public void run() {
            first.countDown();
          }
        });
  } else {
    return group.subscribeOn(Schedulers.newThread()).delay(400, TimeUnit.MILLISECONDS).map(new Function<Integer, String>() {
      @Override
      public String apply(Integer t1) {
        return "last group: " + t1;
      }
    }).doOnEach(new Consumer<Notification<String>>() {
      @Override
      public void accept(Notification<String> t1) {
        System.err.println("subscribeOn notification => " + t1);
      }
    });
  }
}

相关文章

Flowable类方法