[英]Returns a Flowable that emits the same values as the source Publisher with the exception of an onError. An onError notification from the source will result in the emission of a Throwable item to the Publisher provided as an argument to the notificationHandlerfunction. If that Publisher calls onComplete or onError then retry will call onComplete or onError on the child subscription. Otherwise, this Publisher will resubscribe to the source Publisher.
Example: This retries 3 times, each time incrementing the number of seconds it waits.
Flowable.create((FlowableEmitter<? super String> s) -> {
s.onError(new RuntimeException("always fails"));
}, BackpressureStrategy.BUFFER).retryWhen(attempts -> {
return attempts.zipWith(Flowable.range(1, 3), (n, i) -> i).flatMap(i -> {
System.out.println("delay retry by " + i + " second(s)");
return Flowable.timer(i, TimeUnit.SECONDS);
Output is:
Note that the inner Publisher returned by the handler function should signal either onNext, onError or onComplete in response to the received Throwable to indicate the operator should retry or terminate. If the upstream to the operator is asynchronous, signaling onNext followed by onComplete immediately may result in the sequence to be completed immediately. Similarly, if this inner Publisher signals onError or onComplete while the upstream is active, the sequence is terminated with the same signal immediately.
The following example demonstrates how to retry an asynchronous source with a delay:
Flowable.timer(1, TimeUnit.SECONDS)
.doOnSubscribe(s -> System.out.println("subscribing"))
.map(v -> { throw new RuntimeException(); })
.retryWhen(errors -> {
AtomicInteger counter = new AtomicInteger();
return errors
.takeWhile(e -> counter.getAndIncrement() != 3)
.flatMap(e -> {
System.out.println("delay retry by " + counter.get() + " second(s)");
return Flowable.timer(counter.get(), TimeUnit.SECONDS);
.blockingSubscribe(System.out::println, System.out::println);
Backpressure: The operator honors downstream backpressure and expects both the source and inner Publishers to honor backpressure as well. If this expectation is violated, the operator may throw an IllegalStateException. Scheduler: retryWhen does not operate by default on a particular Scheduler.
@Test(expected = NullPointerException.class)
@Test(expected = NullPointerException.class)
public void retryWhenFunctionNull() {
代码示例来源:origin: ReactiveX/RxJava
public final Single<T> retryWhen(Function<? super Flowable<Throwable>, ? extends Publisher<?>> handler) {
return toSingle(toFlowable().retryWhen(handler));
代码示例来源:origin: ReactiveX/RxJava
public void testOnErrorFromNotificationHandler() {
Subscriber<String> subscriber = TestHelper.mockSubscriber();
Flowable<String> origin = Flowable.unsafeCreate(new FuncWithErrors(2));
origin.retryWhen(new Function<Flowable<? extends Throwable>, Flowable<Object>>() {
public Flowable<Object> apply(Flowable<? extends Throwable> t1) {
return Flowable.error(new RuntimeException());
InOrder inOrder = inOrder(subscriber);
inOrder.verify(subscriber, never()).onNext("beginningEveryTime");
inOrder.verify(subscriber, never()).onNext("onSuccessOnly");
inOrder.verify(subscriber, never()).onComplete();
inOrder.verify(subscriber, times(1)).onError(any(RuntimeException.class));
代码示例来源:origin: ReactiveX/RxJava
public final Completable retryWhen(Function<? super Flowable<Throwable>, ? extends Publisher<?>> handler) {
return fromPublisher(toFlowable().retryWhen(handler));
代码示例来源:origin: ReactiveX/RxJava
public void testOnCompletedFromNotificationHandler() {
Subscriber<String> subscriber = TestHelper.mockSubscriber();
Flowable<String> origin = Flowable.unsafeCreate(new FuncWithErrors(1));
TestSubscriber<String> ts = new TestSubscriber<String>(subscriber);
origin.retryWhen(new Function<Flowable<? extends Throwable>, Flowable<Object>>() {
public Flowable<Object> apply(Flowable<? extends Throwable> t1) {
return Flowable.empty();
InOrder inOrder = inOrder(subscriber);
inOrder.verify(subscriber, never()).onNext("beginningEveryTime");
inOrder.verify(subscriber, never()).onNext("onSuccessOnly");
inOrder.verify(subscriber, times(1)).onComplete();
inOrder.verify(subscriber, never()).onError(any(Exception.class));
代码示例来源:origin: kaushikgopal/RxJava-Android-Samples
public void startRetryingWithExponentialBackoffStrategy() {
_logs = new ArrayList<>();
DisposableSubscriber<Object> disposableSubscriber =
new DisposableSubscriber<Object>() {
public void onNext(Object aVoid) {
Timber.d("on Next");
public void onComplete() {
Timber.d("on Completed");
public void onError(Throwable e) {
_log("Error: I give up!");
Flowable.error(new RuntimeException("testing")) // always fails
.retryWhen(new RetryWithDelay(5, 1000)) // notice this is called only onError (onNext
// values sent are ignored)
.doOnSubscribe(subscription -> _log("Attempting the impossible 5 times in intervals of 1s"))
代码示例来源:origin: ReactiveX/RxJava
public final Maybe<T> retryWhen(
final Function<? super Flowable<Throwable>, ? extends Publisher<?>> handler) {
return toFlowable().retryWhen(handler).singleElement();
代码示例来源:origin: ReactiveX/RxJava
public void testOnNextFromNotificationHandler() {
Subscriber<String> subscriber = TestHelper.mockSubscriber();
int numRetries = 2;
Flowable<String> origin = Flowable.unsafeCreate(new FuncWithErrors(numRetries));
origin.retryWhen(new Function<Flowable<? extends Throwable>, Flowable<Object>>() {
public Flowable<Object> apply(Flowable<? extends Throwable> t1) {
return t1.map(new Function<Throwable, Integer>() {
public Integer apply(Throwable t1) {
return 0;
InOrder inOrder = inOrder(subscriber);
// should show 3 attempts
inOrder.verify(subscriber, times(numRetries + 1)).onNext("beginningEveryTime");
// should have no errors
inOrder.verify(subscriber, never()).onError(any(Throwable.class));
// should have a single success
inOrder.verify(subscriber, times(1)).onNext("onSuccessOnly");
// should have a single successful onComplete
inOrder.verify(subscriber, times(1)).onComplete();
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void retryWhenFunctionReturnsNull() {
Flowable.error(new TestException()).retryWhen(new Function<Flowable<? extends Throwable>, Publisher<Object>>() {
public Publisher<Object> apply(Flowable<? extends Throwable> f) {
return null;
代码示例来源:origin: redisson/redisson
public final Completable retryWhen(Function<? super Flowable<Throwable>, ? extends Publisher<?>> handler) {
return fromPublisher(toFlowable().retryWhen(handler));
代码示例来源:origin: ReactiveX/RxJava
public void testSingleSubscriptionOnFirst() throws Exception {
final AtomicInteger inc = new AtomicInteger(0);
Publisher<Integer> onSubscribe = new Publisher<Integer>() {
public void subscribe(Subscriber<? super Integer> subscriber) {
subscriber.onSubscribe(new BooleanSubscription());
final int emit = inc.incrementAndGet();
int first = Flowable.unsafeCreate(onSubscribe)
.retryWhen(new Function<Flowable<? extends Throwable>, Flowable<Object>>() {
public Flowable<Object> apply(Flowable<? extends Throwable> attempt) {
return attempt.zipWith(Flowable.just(1), new BiFunction<Throwable, Integer, Object>() {
public Object apply(Throwable o, Integer integer) {
return 0;
assertEquals("Observer did not receive the expected output", 1, first);
assertEquals("Subscribe was not called once", 1, inc.get());
代码示例来源:origin: redisson/redisson
public final Single<T> retryWhen(Function<? super Flowable<Throwable>, ? extends Publisher<?>> handler) {
return toSingle(toFlowable().retryWhen(handler));
代码示例来源:origin: ReactiveX/RxJava
public void noCancelPreviousRepeatWhen2() {
final AtomicInteger counter = new AtomicInteger();
final AtomicInteger times = new AtomicInteger();
Flowable<Integer> source = Flowable.<Integer>error(new TestException())
.doOnCancel(new Action() {
public void run() throws Exception {
source.retryWhen(new Function<Flowable<Throwable>, Flowable<?>>() {
public Flowable<?> apply(Flowable<Throwable> e) throws Exception {
return e.takeWhile(new Predicate<Object>() {
public boolean test(Object v) throws Exception {
return times.getAndIncrement() < 4;
assertEquals(0, counter.get());
代码示例来源:origin: ReactiveX/RxJava
producer.retryWhen(new Function<Flowable<? extends Throwable>, Flowable<Object>>() {
代码示例来源:origin: ReactiveX/RxJava
source.retryWhen(new Function<Flowable<Throwable>, Flowable<?>>() {
public Flowable<?> apply(Flowable<Throwable> e) throws Exception {
代码示例来源:origin: ReactiveX/RxJava
public void shouldDisposeInnerFlowable() {
final PublishProcessor<Object> processor = PublishProcessor.create();
final Disposable disposable = Flowable.error(new RuntimeException("Leak"))
.retryWhen(new Function<Flowable<Throwable>, Flowable<Object>>() {
public Flowable<Object> apply(Flowable<Throwable> errors) throws Exception {
return errors.switchMap(new Function<Throwable, Flowable<Object>>() {
public Flowable<Object> apply(Throwable ignore) throws Exception {
return processor;
代码示例来源:origin: ReactiveX/RxJava
Flowable<String> origin = Flowable.unsafeCreate(new FuncWithErrors(numRetries));
TestSubscriber<String> ts = new TestSubscriber<String>(subscriber);
origin.retryWhen(new Function<Flowable<? extends Throwable>, Flowable<Object>>() {
public Flowable<Object> apply(Flowable<? extends Throwable> t1) {
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings({ "rawtypes", "unchecked" })
public void retryWhenDefaultScheduler() {
TestSubscriber<Integer> ts = TestSubscriber.create();
.concatWith(Flowable.<Integer>error(new TestException()))
.retryWhen((Function)new Function<Flowable, Flowable>() {
public Flowable apply(Flowable f) {
return f.take(2);
ts.assertValues(1, 1);
代码示例来源:origin: ReactiveX/RxJava
.retryWhen(new Function<Flowable<Throwable>, Flowable<Integer>>() {
public Flowable<Integer> apply(Flowable<Throwable> v)
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings({ "rawtypes", "unchecked" })
public void retryWhenTrampolineScheduler() {
TestSubscriber<Integer> ts = TestSubscriber.create();
.concatWith(Flowable.<Integer>error(new TestException()))
.retryWhen((Function)new Function<Flowable, Flowable>() {
public Flowable apply(Flowable f) {
return f.take(2);
ts.assertValues(1, 1);