本文整理了Java中io.reactivex.Flowable.repeatWhen()
方法的一些代码示例,展示了Flowable.repeatWhen()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.repeatWhen()
方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:repeatWhen
[英]Returns a Flowable that emits the same values as the source Publisher with the exception of an onComplete. An onComplete notification from the source will result in the emission of a void item to the Publisher provided as an argument to the notificationHandlerfunction. If that Publisher calls onComplete or onError then repeatWhen will call onComplete or onError on the child subscription. Otherwise, this Publisher will resubscribe to the source Publisher.
Backpressure: The operator honors downstream backpressure and expects the source Publisher to honor backpressure as well. If this expectation is violated, the operator may throw an IllegalStateException. Scheduler: repeatWhen does not operate by default on a particular Scheduler.
[中]返回与源发布服务器发出相同值的Flowable,但onComplete除外。源发出的未完成通知将导致向发布者发送一个无效项,该项作为NotificationHandler函数的参数提供。如果发布者调用onComplete或onError,则repeatWhen将对子订阅调用onComplete或onError。否则,此发布服务器将重新订阅源发布服务器。
背压:操作员接受下游背压,并希望源发布者也接受背压。如果违反此期望,运算符可能抛出一个非法状态异常。调度程序:repeatWhen默认情况下不在特定调度程序上运行。
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void repeatWhenNull() {
just1.repeatWhen(null);
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void repeatWhenFunctionReturnsNull() {
just1.repeatWhen(new Function<Flowable<Object>, Publisher<Object>>() {
@Override
public Publisher<Object> apply(Flowable<Object> v) {
return null;
}
}).blockingSubscribe();
}
代码示例来源:origin: ReactiveX/RxJava
/**
* Returns a Completable instance that repeats when the Publisher returned by the handler
* emits an item or completes when this Publisher emits a completed event.
* <p>
* <img width="640" height="586" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.repeatWhen.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code repeatWhen} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param handler the function that transforms the stream of values indicating the completion of
* this Completable and returns a Publisher that emits items for repeating or completes to indicate the
* repetition should stop
* @return the new Completable instance
* @throws NullPointerException if stop is null
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Completable repeatWhen(Function<? super Flowable<Object>, ? extends Publisher<?>> handler) {
return fromPublisher(toFlowable().repeatWhen(handler));
}
代码示例来源:origin: ReactiveX/RxJava
/**
* Re-subscribes to the current Single if
* the Publisher returned by the handler function signals a value in response to a
* value signalled through the Flowable the handle receives.
* <p>
* <img width="640" height="1478" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.repeatWhen.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The returned {@code Flowable} honors the backpressure of the downstream consumer.
* The {@code Publisher} returned by the handler function is expected to honor backpressure as well.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code repeatWhen} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param handler the function that is called with a Flowable that signals a value when the Single
* signalled a success value and returns a Publisher that has to signal a value to
* trigger a resubscription to the current Single, otherwise the terminal signal of
* the Publisher will be the terminal signal of the sequence as well.
* @return the new Flowable instance
* @since 2.0
*/
@BackpressureSupport(BackpressureKind.FULL)
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Flowable<T> repeatWhen(Function<? super Flowable<Object>, ? extends Publisher<?>> handler) {
return toFlowable().repeatWhen(handler);
}
代码示例来源:origin: ReactiveX/RxJava
/**
* Returns a Flowable that emits the same values as the source Publisher with the exception of an
* {@code onComplete}. An {@code onComplete} notification from the source will result in the emission of
* a {@code void} item to the Publisher provided as an argument to the {@code notificationHandler}
* function. If that Publisher calls {@code onComplete} or {@code onError} then {@code repeatWhen} will
* call {@code onComplete} or {@code onError} on the child subscription. Otherwise, this Publisher will
* resubscribe to the source Publisher.
* <p>
* <img width="640" height="430" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/repeatWhen.f.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator honors downstream backpressure and expects the source {@code Publisher} to honor backpressure as well.
* If this expectation is violated, the operator <em>may</em> throw an {@code IllegalStateException}.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code repeatWhen} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param handler
* receives a Publisher of notifications with which a user can complete or error, aborting the repeat.
* @return the source Publisher modified with repeat logic
* @see <a href="http://reactivex.io/documentation/operators/repeat.html">ReactiveX operators documentation: Repeat</a>
*/
@BackpressureSupport(BackpressureKind.FULL)
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Flowable<T> repeatWhen(final Function<? super Flowable<Object>, ? extends Publisher<?>> handler) {
return toFlowable().repeatWhen(handler);
}
代码示例来源:origin: redisson/redisson
/**
* Returns a Completable instance that repeats when the Publisher returned by the handler
* emits an item or completes when this Publisher emits a completed event.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code repeatWhen} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param handler the function that transforms the stream of values indicating the completion of
* this Completable and returns a Publisher that emits items for repeating or completes to indicate the
* repetition should stop
* @return the new Completable instance
* @throws NullPointerException if stop is null
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Completable repeatWhen(Function<? super Flowable<Object>, ? extends Publisher<?>> handler) {
return fromPublisher(toFlowable().repeatWhen(handler));
}
代码示例来源:origin: redisson/redisson
/**
* Re-subscribes to the current Single if
* the Publisher returned by the handler function signals a value in response to a
* value signalled through the Flowable the handle receives.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The returned {@code Flowable} honors the backpressure of the downstream consumer.
* The {@code Publisher} returned by the handler function is expected to honor backpressure as well.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code repeatWhen} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param handler the function that is called with a Flowable that signals a value when the Single
* signalled a success value and returns a Publisher that has to signal a value to
* trigger a resubscription to the current Single, otherwise the terminal signal of
* the Publisher will be the terminal signal of the sequence as well.
* @return the new Flowable instance
* @since 2.0
*/
@BackpressureSupport(BackpressureKind.FULL)
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Flowable<T> repeatWhen(Function<? super Flowable<Object>, ? extends Publisher<?>> handler) {
return toFlowable().repeatWhen(handler);
}
代码示例来源:origin: redisson/redisson
/**
* Returns a Flowable that emits the same values as the source Publisher with the exception of an
* {@code onComplete}. An {@code onComplete} notification from the source will result in the emission of
* a {@code void} item to the Publisher provided as an argument to the {@code notificationHandler}
* function. If that Publisher calls {@code onComplete} or {@code onError} then {@code repeatWhen} will
* call {@code onComplete} or {@code onError} on the child subscription. Otherwise, this Publisher will
* resubscribe to the source Publisher.
* <p>
* <img width="640" height="430" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/repeatWhen.f.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator honors downstream backpressure and expects the source {@code Publisher} to honor backpressure as well.
* If this expectation is violated, the operator <em>may</em> throw an {@code IllegalStateException}.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code repeatWhen} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param handler
* receives a Publisher of notifications with which a user can complete or error, aborting the repeat.
* @return the source Publisher modified with repeat logic
* @see <a href="http://reactivex.io/documentation/operators/repeat.html">ReactiveX operators documentation: Repeat</a>
*/
@BackpressureSupport(BackpressureKind.FULL)
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Flowable<T> repeatWhen(final Function<? super Flowable<Object>, ? extends Publisher<?>> handler) {
return toFlowable().repeatWhen(handler);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void whenTake() {
Flowable.range(1, 3).repeatWhen(new Function<Flowable<Object>, Flowable<Object>>() {
@Override
public Flowable<Object> apply(Flowable<Object> handler) throws Exception {
return handler.take(2);
}
})
.test()
.assertResult(1, 2, 3, 1, 2, 3);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void noCancelPreviousRepeatWhen() {
final AtomicInteger counter = new AtomicInteger();
Flowable<Integer> source = Flowable.just(1).doOnCancel(new Action() {
@Override
public void run() throws Exception {
counter.getAndIncrement();
}
});
final AtomicInteger times = new AtomicInteger();
source.repeatWhen(new Function<Flowable<Object>, Flowable<?>>() {
@Override
public Flowable<?> apply(Flowable<Object> e) throws Exception {
return e.takeWhile(new Predicate<Object>() {
@Override
public boolean test(Object v) throws Exception {
return times.getAndIncrement() < 4;
}
});
}
})
.test()
.assertResult(1, 1, 1, 1, 1);
assertEquals(0, counter.get());
}
代码示例来源:origin: kaushikgopal/RxJava-Android-Samples
@OnClick(R.id.btn_start_increasingly_delayed_polling)
public void onStartIncreasinglyDelayedPolling() {
_setupLogger();
final int pollingInterval = POLLING_INTERVAL;
final int pollCount = POLL_COUNT;
_log(
String.format(
Locale.US, "Start increasingly delayed polling now time: [xx:%02d]", _getSecondHand()));
_disposables.add(
Flowable.just(1L)
.repeatWhen(new RepeatWithDelay(pollCount, pollingInterval))
.subscribe(
o ->
_log(
String.format(
Locale.US,
"Executing polled task now time : [xx:%02d]",
_getSecondHand())),
e -> Timber.d(e, "arrrr. Error")));
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testRepeatWhen() {
Flowable.error(new TestException())
.repeatWhen(new Function<Flowable<Object>, Flowable<Object>>() {
@Override
public Flowable<Object> apply(Flowable<Object> v) throws Exception {
return v.delay(10, TimeUnit.SECONDS);
}
})
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertFailure(TestException.class);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void shouldDisposeInnerObservable() {
final PublishProcessor<Object> processor = PublishProcessor.create();
final Disposable disposable = Flowable.just("Leak")
.repeatWhen(new Function<Flowable<Object>, Flowable<Object>>() {
@Override
public Flowable<Object> apply(Flowable<Object> completions) throws Exception {
return completions.switchMap(new Function<Object, Flowable<Object>>() {
@Override
public Flowable<Object> apply(Object ignore) throws Exception {
return processor;
}
});
}
})
.subscribe();
assertTrue(processor.hasSubscribers());
disposable.dispose();
assertFalse(processor.hasSubscribers());
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
public void repeatWhenDefaultScheduler() {
TestSubscriber<Integer> ts = TestSubscriber.create();
Flowable.just(1).repeatWhen((Function)new Function<Flowable, Flowable>() {
@Override
public Flowable apply(Flowable f) {
return f.take(2);
}
}).subscribe(ts);
ts.assertValues(1, 1);
ts.assertNoErrors();
ts.assertComplete();
}
代码示例来源:origin: ReactiveX/RxJava
.repeatWhen(new Function<Flowable<Object>, Flowable<Integer>>() {
@Override
public Flowable<Integer> apply(Flowable<Object> v)
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
public void repeatWhenTrampolineScheduler() {
TestSubscriber<Integer> ts = TestSubscriber.create();
Flowable.just(1).subscribeOn(Schedulers.trampoline())
.repeatWhen((Function)new Function<Flowable, Flowable>() {
@Override
public Flowable apply(Flowable f) {
return f.take(2);
}
}).subscribe(ts);
ts.assertValues(1, 1);
ts.assertNoErrors();
ts.assertComplete();
}
代码示例来源:origin: akarnokd/RxJava2Extensions
/**
* Repeats this Perhaps when the Publisher returned by the handler function emits
* an item or terminates if this Publisher terminates.
* @param handler the function that receives Flowable that emits an item
* when this Perhaps completes and returns a Publisher that should emit an item
* to trigger a repeat or terminate to trigger a termination.
* @return the new Flowable instance
*/
public final Flowable<T> repeatWhen(Function<? super Flowable<Object>, ? extends Publisher<?>> handler) {
return Flowable.fromPublisher(this).repeatWhen(handler);
}
代码示例来源:origin: com.github.akarnokd/rxjava2-extensions
/**
* Repeats this Solo when the Publisher returned by the handler function emits
* an item or terminates if this Publisher terminates.
* @param handler the function that receives Flowable that emits an item
* when this Solo completes and returns a Publisher that should emit an item
* to trigger a repeat or terminate to trigger a termination.
* @return the new Flowable instance
*/
public final Flowable<T> repeatWhen(Function<? super Flowable<Object>, ? extends Publisher<?>> handler) {
return Flowable.fromPublisher(this).repeatWhen(handler);
}
代码示例来源:origin: com.github.akarnokd/rxjava2-extensions
/**
* Repeats this Perhaps when the Publisher returned by the handler function emits
* an item or terminates if this Publisher terminates.
* @param handler the function that receives Flowable that emits an item
* when this Perhaps completes and returns a Publisher that should emit an item
* to trigger a repeat or terminate to trigger a termination.
* @return the new Flowable instance
*/
public final Flowable<T> repeatWhen(Function<? super Flowable<Object>, ? extends Publisher<?>> handler) {
return Flowable.fromPublisher(this).repeatWhen(handler);
}
代码示例来源:origin: akarnokd/RxJava2Extensions
/**
* Repeats this Solo when the Publisher returned by the handler function emits
* an item or terminates if this Publisher terminates.
* @param handler the function that receives Flowable that emits an item
* when this Solo completes and returns a Publisher that should emit an item
* to trigger a repeat or terminate to trigger a termination.
* @return the new Flowable instance
*/
public final Flowable<T> repeatWhen(Function<? super Flowable<Object>, ? extends Publisher<?>> handler) {
return Flowable.fromPublisher(this).repeatWhen(handler);
}
内容来源于网络,如有侵权,请联系作者删除!