本文整理了Java中io.reactivex.Flowable.doOnEach()
方法的一些代码示例,展示了Flowable.doOnEach()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.doOnEach()
方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:doOnEach
[英]Modifies the source Publisher so that it invokes an action for each item it emits.
Backpressure: The operator doesn't interfere with backpressure which is determined by the source Publisher's backpressure behavior. Scheduler: doOnEach does not operate by default on a particular Scheduler.
[中]修改源发布服务器,使其为发出的每个项调用操作。
背压:操作员不会干扰由源发布者的背压行为确定的背压。调度程序:默认情况下,doOnEach不会在特定调度程序上运行。
代码示例来源:origin: ReactiveX/RxJava
@Override
public Flowable<Object> apply(Flowable<Object> f) throws Exception {
return f.doOnEach(new TestSubscriber<Object>());
}
});
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void doOnEachSubscriberNull() {
just1.doOnEach((Subscriber<Integer>)null);
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void doOnEachSupplierNull() {
just1.doOnEach((Consumer<Notification<Integer>>)null);
}
代码示例来源:origin: ReactiveX/RxJava
/**
* Modifies the source Publisher so that it invokes an action if it calls {@code onError}.
* <p>
* In case the {@code onError} action throws, the downstream will receive a composite exception containing
* the original exception and the exception thrown by {@code onError}.
* <p>
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/doOnError.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator doesn't interfere with backpressure which is determined by the source {@code Publisher}'s
* backpressure behavior.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code doOnError} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param onError
* the action to invoke if the source Publisher calls {@code onError}
* @return the source Publisher with the side-effecting behavior applied
* @see <a href="http://reactivex.io/documentation/operators/do.html">ReactiveX operators documentation: Do</a>
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
@SchedulerSupport(SchedulerSupport.NONE)
public final Flowable<T> doOnError(Consumer<? super Throwable> onError) {
return doOnEach(Functions.emptyConsumer(), onError,
Functions.EMPTY_ACTION, Functions.EMPTY_ACTION);
}
代码示例来源:origin: ReactiveX/RxJava
/**
* Modifies the source Publisher so that it invokes an action when it calls {@code onNext}.
* <p>
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/doOnNext.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator doesn't interfere with backpressure which is determined by the source {@code Publisher}'s
* backpressure behavior.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code doOnNext} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param onNext
* the action to invoke when the source Publisher calls {@code onNext}
* @return the source Publisher with the side-effecting behavior applied
* @see <a href="http://reactivex.io/documentation/operators/do.html">ReactiveX operators documentation: Do</a>
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
@SchedulerSupport(SchedulerSupport.NONE)
public final Flowable<T> doOnNext(Consumer<? super T> onNext) {
return doOnEach(onNext, Functions.emptyConsumer(),
Functions.EMPTY_ACTION, Functions.EMPTY_ACTION);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testDoOnEach() {
Flowable<String> base = Flowable.just("a", "b", "c");
Flowable<String> doOnEach = base.doOnEach(sideEffectSubscriber);
doOnEach.subscribe(subscribedSubscriber);
// ensure the leaf observer is still getting called
verify(subscribedSubscriber, never()).onError(any(Throwable.class));
verify(subscribedSubscriber, times(1)).onNext("a");
verify(subscribedSubscriber, times(1)).onNext("b");
verify(subscribedSubscriber, times(1)).onNext("c");
verify(subscribedSubscriber, times(1)).onComplete();
// ensure our injected observer is getting called
verify(sideEffectSubscriber, never()).onError(any(Throwable.class));
verify(sideEffectSubscriber, times(1)).onNext("a");
verify(sideEffectSubscriber, times(1)).onNext("b");
verify(sideEffectSubscriber, times(1)).onNext("c");
verify(sideEffectSubscriber, times(1)).onComplete();
}
代码示例来源:origin: ReactiveX/RxJava
/**
* Registers an {@link Action} to be called when this Publisher invokes either
* {@link Subscriber#onComplete onComplete} or {@link Subscriber#onError onError}.
* <p>
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/finallyDo.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator doesn't interfere with backpressure which is determined by the source {@code Publisher}'s backpressure
* behavior.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code doAfterTerminate} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param onAfterTerminate
* an {@link Action} to be invoked when the source Publisher finishes
* @return a Flowable that emits the same items as the source Publisher, then invokes the
* {@link Action}
* @see <a href="http://reactivex.io/documentation/operators/do.html">ReactiveX operators documentation: Do</a>
* @see #doOnTerminate(Action)
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
@SchedulerSupport(SchedulerSupport.NONE)
public final Flowable<T> doAfterTerminate(Action onAfterTerminate) {
return doOnEach(Functions.emptyConsumer(), Functions.emptyConsumer(),
Functions.EMPTY_ACTION, onAfterTerminate);
}
代码示例来源:origin: ReactiveX/RxJava
/**
* Modifies the source Publisher so that it invokes an action when it calls {@code onComplete}.
* <p>
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/doOnComplete.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator doesn't interfere with backpressure which is determined by the source {@code Publisher}'s
* backpressure behavior.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code doOnComplete} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param onComplete
* the action to invoke when the source Publisher calls {@code onComplete}
* @return the source Publisher with the side-effecting behavior applied
* @see <a href="http://reactivex.io/documentation/operators/do.html">ReactiveX operators documentation: Do</a>
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
@SchedulerSupport(SchedulerSupport.NONE)
public final Flowable<T> doOnComplete(Action onComplete) {
return doOnEach(Functions.emptyConsumer(), Functions.emptyConsumer(),
onComplete, Functions.EMPTY_ACTION);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testDoOnEachWithError() {
Flowable<String> base = Flowable.just("one", "fail", "two", "three", "fail");
Flowable<String> errs = base.map(new Function<String, String>() {
@Override
public String apply(String s) {
if ("fail".equals(s)) {
throw new RuntimeException("Forced Failure");
}
return s;
}
});
Flowable<String> doOnEach = errs.doOnEach(sideEffectSubscriber);
doOnEach.subscribe(subscribedSubscriber);
verify(subscribedSubscriber, times(1)).onNext("one");
verify(subscribedSubscriber, never()).onNext("two");
verify(subscribedSubscriber, never()).onNext("three");
verify(subscribedSubscriber, never()).onComplete();
verify(subscribedSubscriber, times(1)).onError(any(Throwable.class));
verify(sideEffectSubscriber, times(1)).onNext("one");
verify(sideEffectSubscriber, never()).onNext("two");
verify(sideEffectSubscriber, never()).onNext("three");
verify(sideEffectSubscriber, never()).onComplete();
verify(sideEffectSubscriber, times(1)).onError(any(Throwable.class));
}
代码示例来源:origin: ReactiveX/RxJava
/**
* Modifies the source Publisher so that it invokes an action when it calls {@code onComplete} or
* {@code onError}.
* <p>
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/doOnTerminate.png" alt="">
* <p>
* This differs from {@code doAfterTerminate} in that this happens <em>before</em> the {@code onComplete} or
* {@code onError} notification.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator doesn't interfere with backpressure which is determined by the source {@code Publisher}'s
* backpressure behavior.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code doOnTerminate} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param onTerminate
* the action to invoke when the source Publisher calls {@code onComplete} or {@code onError}
* @return the source Publisher with the side-effecting behavior applied
* @see <a href="http://reactivex.io/documentation/operators/do.html">ReactiveX operators documentation: Do</a>
* @see #doAfterTerminate(Action)
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
@SchedulerSupport(SchedulerSupport.NONE)
public final Flowable<T> doOnTerminate(final Action onTerminate) {
return doOnEach(Functions.emptyConsumer(), Functions.actionConsumer(onTerminate),
onTerminate, Functions.EMPTY_ACTION);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void dispose() {
TestHelper.checkDisposed(Flowable.just(1).doOnEach(new TestSubscriber<Integer>()));
}
代码示例来源:origin: ReactiveX/RxJava
}).doOnEach(new Consumer<Notification<String>>() {
代码示例来源:origin: ReactiveX/RxJava
/**
* Modifies the source Publisher so that it invokes an action for each item it emits.
* <p>
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/doOnEach.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator doesn't interfere with backpressure which is determined by the source {@code Publisher}'s
* backpressure behavior.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code doOnEach} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param onNotification
* the action to invoke for each item emitted by the source Publisher
* @return the source Publisher with the side-effecting behavior applied
* @see <a href="http://reactivex.io/documentation/operators/do.html">ReactiveX operators documentation: Do</a>
*/
@CheckReturnValue
@NonNull
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
@SchedulerSupport(SchedulerSupport.NONE)
public final Flowable<T> doOnEach(final Consumer<? super Notification<T>> onNotification) {
ObjectHelper.requireNonNull(onNotification, "consumer is null");
return doOnEach(
Functions.notificationOnNext(onNotification),
Functions.notificationOnError(onNotification),
Functions.notificationOnComplete(onNotification),
Functions.EMPTY_ACTION
);
}
代码示例来源:origin: ReactiveX/RxJava
}).doOnEach(new Consumer<Notification<String>>() {
代码示例来源:origin: ReactiveX/RxJava
public final Flowable<T> doOnEach(final Subscriber<? super T> subscriber) {
ObjectHelper.requireNonNull(subscriber, "subscriber is null");
return doOnEach(
FlowableInternalHelper.subscriberOnNext(subscriber),
FlowableInternalHelper.subscriberOnError(subscriber),
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testWithCombineLatestIssue1717() throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);
final AtomicInteger count = new AtomicInteger();
final int SIZE = 2000;
Flowable<Long> timer = Flowable.interval(0, 1, TimeUnit.MILLISECONDS)
.observeOn(Schedulers.newThread())
.doOnEach(new Consumer<Notification<Long>>() {
@Override
public void accept(Notification<Long> n) {
// System.out.println(n);
if (count.incrementAndGet() >= SIZE) {
latch.countDown();
}
}
}).take(SIZE);
TestSubscriber<Long> ts = new TestSubscriber<Long>();
Flowable.combineLatest(timer, Flowable.<Integer> never(), new BiFunction<Long, Integer, Long>() {
@Override
public Long apply(Long t1, Integer t2) {
return t1;
}
}).subscribe(ts);
if (!latch.await(SIZE + 2000, TimeUnit.MILLISECONDS)) {
fail("timed out");
}
assertEquals(SIZE, count.get());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testErrorPropagatesWhenNoOutstandingRequests() {
Flowable<Long> timer = Flowable.interval(0, 1, TimeUnit.MICROSECONDS)
.doOnEach(new Consumer<Notification<Long>>() {
.doOnEach(new Consumer<Notification<Long>>() {
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testDelayEmitsEverything() {
Flowable<Integer> source = Flowable.range(1, 5);
Flowable<Integer> delayed = source.delay(500L, TimeUnit.MILLISECONDS, scheduler);
delayed = delayed.doOnEach(new Consumer<Notification<Integer>>() {
@Override
public void accept(Notification<Integer> t1) {
System.out.println(t1);
}
});
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
delayed.subscribe(ts);
// all will be delivered after 500ms since range does not delay between them
scheduler.advanceTimeBy(500L, TimeUnit.MILLISECONDS);
ts.assertValues(1, 2, 3, 4, 5);
}
代码示例来源:origin: ReactiveX/RxJava
}).doOnEach(new Consumer<Notification<String>>() {
代码示例来源:origin: ReactiveX/RxJava
@Override
public Flowable<String> apply(final GroupedFlowable<Integer, Integer> group) {
if (group.getKey() < 3) {
return group.map(new Function<Integer, String>() {
@Override
public String apply(Integer t1) {
return "first groups: " + t1;
}
})
// must take(2) so an onComplete + unsubscribe happens on these first 2 groups
.take(2).doOnComplete(new Action() {
@Override
public void run() {
first.countDown();
}
});
} else {
return group.subscribeOn(Schedulers.newThread()).delay(400, TimeUnit.MILLISECONDS).map(new Function<Integer, String>() {
@Override
public String apply(Integer t1) {
return "last group: " + t1;
}
}).doOnEach(new Consumer<Notification<String>>() {
@Override
public void accept(Notification<String> t1) {
System.err.println("subscribeOn notification => " + t1);
}
});
}
}
内容来源于网络,如有侵权,请联系作者删除!