本文整理了Java中io.reactivex.Flowable.retryWhen()
方法的一些代码示例,展示了Flowable.retryWhen()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.retryWhen()
方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:retryWhen
[英]Returns a Flowable that emits the same values as the source Publisher with the exception of an onError. An onError notification from the source will result in the emission of a Throwable item to the Publisher provided as an argument to the notificationHandlerfunction. If that Publisher calls onComplete or onError then retry will call onComplete or onError on the child subscription. Otherwise, this Publisher will resubscribe to the source Publisher.
Example: This retries 3 times, each time incrementing the number of seconds it waits.
Flowable.create((FlowableEmitter<? super String> s) -> {
System.out.println("subscribing");
s.onError(new RuntimeException("always fails"));
}, BackpressureStrategy.BUFFER).retryWhen(attempts -> {
return attempts.zipWith(Flowable.range(1, 3), (n, i) -> i).flatMap(i -> {
System.out.println("delay retry by " + i + " second(s)");
return Flowable.timer(i, TimeUnit.SECONDS);
});
}).blockingForEach(System.out::println);
Output is:
subscribing
Note that the inner Publisher returned by the handler function should signal either onNext, onError or onComplete in response to the received Throwable to indicate the operator should retry or terminate. If the upstream to the operator is asynchronous, signaling onNext followed by onComplete immediately may result in the sequence to be completed immediately. Similarly, if this inner Publisher signals onError or onComplete while the upstream is active, the sequence is terminated with the same signal immediately.
The following example demonstrates how to retry an asynchronous source with a delay:
Flowable.timer(1, TimeUnit.SECONDS)
.doOnSubscribe(s -> System.out.println("subscribing"))
.map(v -> { throw new RuntimeException(); })
.retryWhen(errors -> {
AtomicInteger counter = new AtomicInteger();
return errors
.takeWhile(e -> counter.getAndIncrement() != 3)
.flatMap(e -> {
System.out.println("delay retry by " + counter.get() + " second(s)");
return Flowable.timer(counter.get(), TimeUnit.SECONDS);
});
})
.blockingSubscribe(System.out::println, System.out::println);
Backpressure: The operator honors downstream backpressure and expects both the source and inner Publishers to honor backpressure as well. If this expectation is violated, the operator may throw an IllegalStateException. Scheduler: retryWhen does not operate by default on a particular Scheduler.
[中]返回与源发布服务器发出相同值的Flowable,但onError除外。源发出的onError通知将导致向发布者发送一个可丢弃的项目,该项目作为NotificationHandler函数的参数提供。如果发布服务器调用onComplete或onError,则retry将对子订阅调用onComplete或onError。否则,此发布服务器将重新订阅源发布服务器。
示例:重试3次,每次增加等待的秒数。
Flowable.create((FlowableEmitter<? super String> s) -> {
System.out.println("subscribing");
s.onError(new RuntimeException("always fails"));
}, BackpressureStrategy.BUFFER).retryWhen(attempts -> {
return attempts.zipWith(Flowable.range(1, 3), (n, i) -> i).flatMap(i -> {
System.out.println("delay retry by " + i + " second(s)");
return Flowable.timer(i, TimeUnit.SECONDS);
});
}).blockingForEach(System.out::println);
输出为:
subscribing
请注意,处理程序函数返回的内部发布程序应发出onNext、onError或onComplete信号,以响应接收到的Throwable,指示操作员应重试或终止。如果操作员的上游是异步的,则发出onNext后紧接着onComplete的信号可能会导致序列立即完成。类似地,如果此内部发布服务器在上游处于活动状态时发出onError或onComplete信号,则序列将立即以相同的信号终止。
以下示例演示如何延迟重试异步源:
Flowable.timer(1, TimeUnit.SECONDS)
.doOnSubscribe(s -> System.out.println("subscribing"))
.map(v -> { throw new RuntimeException(); })
.retryWhen(errors -> {
AtomicInteger counter = new AtomicInteger();
return errors
.takeWhile(e -> counter.getAndIncrement() != 3)
.flatMap(e -> {
System.out.println("delay retry by " + counter.get() + " second(s)");
return Flowable.timer(counter.get(), TimeUnit.SECONDS);
});
})
.blockingSubscribe(System.out::println, System.out::println);
背压:运营商尊重下游背压,并希望源发布商和内部发布商也尊重背压。如果违反此期望,运算符可能抛出一个非法状态异常。Scheduler:retryWhen默认情况下不在特定计划程序上运行。
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void retryWhenFunctionNull() {
just1.retryWhen(null);
}
代码示例来源:origin: ReactiveX/RxJava
@SchedulerSupport(SchedulerSupport.NONE)
public final Single<T> retryWhen(Function<? super Flowable<Throwable>, ? extends Publisher<?>> handler) {
return toSingle(toFlowable().retryWhen(handler));
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testOnErrorFromNotificationHandler() {
Subscriber<String> subscriber = TestHelper.mockSubscriber();
Flowable<String> origin = Flowable.unsafeCreate(new FuncWithErrors(2));
origin.retryWhen(new Function<Flowable<? extends Throwable>, Flowable<Object>>() {
@Override
public Flowable<Object> apply(Flowable<? extends Throwable> t1) {
return Flowable.error(new RuntimeException());
}
}).subscribe(subscriber);
InOrder inOrder = inOrder(subscriber);
inOrder.verify(subscriber).onSubscribe((Subscription)notNull());
inOrder.verify(subscriber, never()).onNext("beginningEveryTime");
inOrder.verify(subscriber, never()).onNext("onSuccessOnly");
inOrder.verify(subscriber, never()).onComplete();
inOrder.verify(subscriber, times(1)).onError(any(RuntimeException.class));
inOrder.verifyNoMoreInteractions();
}
代码示例来源:origin: ReactiveX/RxJava
@SchedulerSupport(SchedulerSupport.NONE)
public final Completable retryWhen(Function<? super Flowable<Throwable>, ? extends Publisher<?>> handler) {
return fromPublisher(toFlowable().retryWhen(handler));
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testOnCompletedFromNotificationHandler() {
Subscriber<String> subscriber = TestHelper.mockSubscriber();
Flowable<String> origin = Flowable.unsafeCreate(new FuncWithErrors(1));
TestSubscriber<String> ts = new TestSubscriber<String>(subscriber);
origin.retryWhen(new Function<Flowable<? extends Throwable>, Flowable<Object>>() {
@Override
public Flowable<Object> apply(Flowable<? extends Throwable> t1) {
return Flowable.empty();
}
}).subscribe(ts);
InOrder inOrder = inOrder(subscriber);
inOrder.verify(subscriber).onSubscribe((Subscription)notNull());
inOrder.verify(subscriber, never()).onNext("beginningEveryTime");
inOrder.verify(subscriber, never()).onNext("onSuccessOnly");
inOrder.verify(subscriber, times(1)).onComplete();
inOrder.verify(subscriber, never()).onError(any(Exception.class));
inOrder.verifyNoMoreInteractions();
}
代码示例来源:origin: kaushikgopal/RxJava-Android-Samples
@OnClick(R.id.btn_eb_retry)
public void startRetryingWithExponentialBackoffStrategy() {
_logs = new ArrayList<>();
_adapter.clear();
DisposableSubscriber<Object> disposableSubscriber =
new DisposableSubscriber<Object>() {
@Override
public void onNext(Object aVoid) {
Timber.d("on Next");
}
@Override
public void onComplete() {
Timber.d("on Completed");
}
@Override
public void onError(Throwable e) {
_log("Error: I give up!");
}
};
Flowable.error(new RuntimeException("testing")) // always fails
.retryWhen(new RetryWithDelay(5, 1000)) // notice this is called only onError (onNext
// values sent are ignored)
.doOnSubscribe(subscription -> _log("Attempting the impossible 5 times in intervals of 1s"))
.subscribe(disposableSubscriber);
_disposables.add(disposableSubscriber);
}
代码示例来源:origin: ReactiveX/RxJava
public final Maybe<T> retryWhen(
final Function<? super Flowable<Throwable>, ? extends Publisher<?>> handler) {
return toFlowable().retryWhen(handler).singleElement();
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testOnNextFromNotificationHandler() {
Subscriber<String> subscriber = TestHelper.mockSubscriber();
int numRetries = 2;
Flowable<String> origin = Flowable.unsafeCreate(new FuncWithErrors(numRetries));
origin.retryWhen(new Function<Flowable<? extends Throwable>, Flowable<Object>>() {
@Override
public Flowable<Object> apply(Flowable<? extends Throwable> t1) {
return t1.map(new Function<Throwable, Integer>() {
@Override
public Integer apply(Throwable t1) {
return 0;
}
}).startWith(0).cast(Object.class);
}
}).subscribe(subscriber);
InOrder inOrder = inOrder(subscriber);
// should show 3 attempts
inOrder.verify(subscriber, times(numRetries + 1)).onNext("beginningEveryTime");
// should have no errors
inOrder.verify(subscriber, never()).onError(any(Throwable.class));
// should have a single success
inOrder.verify(subscriber, times(1)).onNext("onSuccessOnly");
// should have a single successful onComplete
inOrder.verify(subscriber, times(1)).onComplete();
inOrder.verifyNoMoreInteractions();
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void retryWhenFunctionReturnsNull() {
Flowable.error(new TestException()).retryWhen(new Function<Flowable<? extends Throwable>, Publisher<Object>>() {
@Override
public Publisher<Object> apply(Flowable<? extends Throwable> f) {
return null;
}
}).blockingSubscribe();
}
代码示例来源:origin: redisson/redisson
@SchedulerSupport(SchedulerSupport.NONE)
public final Completable retryWhen(Function<? super Flowable<Throwable>, ? extends Publisher<?>> handler) {
return fromPublisher(toFlowable().retryWhen(handler));
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testSingleSubscriptionOnFirst() throws Exception {
final AtomicInteger inc = new AtomicInteger(0);
Publisher<Integer> onSubscribe = new Publisher<Integer>() {
@Override
public void subscribe(Subscriber<? super Integer> subscriber) {
subscriber.onSubscribe(new BooleanSubscription());
final int emit = inc.incrementAndGet();
subscriber.onNext(emit);
subscriber.onComplete();
}
};
int first = Flowable.unsafeCreate(onSubscribe)
.retryWhen(new Function<Flowable<? extends Throwable>, Flowable<Object>>() {
@Override
public Flowable<Object> apply(Flowable<? extends Throwable> attempt) {
return attempt.zipWith(Flowable.just(1), new BiFunction<Throwable, Integer, Object>() {
@Override
public Object apply(Throwable o, Integer integer) {
return 0;
}
});
}
})
.blockingFirst();
assertEquals("Observer did not receive the expected output", 1, first);
assertEquals("Subscribe was not called once", 1, inc.get());
}
代码示例来源:origin: redisson/redisson
@SchedulerSupport(SchedulerSupport.NONE)
public final Single<T> retryWhen(Function<? super Flowable<Throwable>, ? extends Publisher<?>> handler) {
return toSingle(toFlowable().retryWhen(handler));
代码示例来源:origin: ReactiveX/RxJava
@Test
public void noCancelPreviousRepeatWhen2() {
final AtomicInteger counter = new AtomicInteger();
final AtomicInteger times = new AtomicInteger();
Flowable<Integer> source = Flowable.<Integer>error(new TestException())
.doOnCancel(new Action() {
@Override
public void run() throws Exception {
counter.getAndIncrement();
}
});
source.retryWhen(new Function<Flowable<Throwable>, Flowable<?>>() {
@Override
public Flowable<?> apply(Flowable<Throwable> e) throws Exception {
return e.takeWhile(new Predicate<Object>() {
@Override
public boolean test(Object v) throws Exception {
return times.getAndIncrement() < 4;
}
});
}
})
.test()
.assertResult();
assertEquals(0, counter.get());
}
代码示例来源:origin: ReactiveX/RxJava
producer.retryWhen(new Function<Flowable<? extends Throwable>, Flowable<Object>>() {
代码示例来源:origin: ReactiveX/RxJava
source.retryWhen(new Function<Flowable<Throwable>, Flowable<?>>() {
@Override
public Flowable<?> apply(Flowable<Throwable> e) throws Exception {
代码示例来源:origin: ReactiveX/RxJava
@Test
public void shouldDisposeInnerFlowable() {
final PublishProcessor<Object> processor = PublishProcessor.create();
final Disposable disposable = Flowable.error(new RuntimeException("Leak"))
.retryWhen(new Function<Flowable<Throwable>, Flowable<Object>>() {
@Override
public Flowable<Object> apply(Flowable<Throwable> errors) throws Exception {
return errors.switchMap(new Function<Throwable, Flowable<Object>>() {
@Override
public Flowable<Object> apply(Throwable ignore) throws Exception {
return processor;
}
});
}
})
.subscribe();
assertTrue(processor.hasSubscribers());
disposable.dispose();
assertFalse(processor.hasSubscribers());
}
代码示例来源:origin: ReactiveX/RxJava
Flowable<String> origin = Flowable.unsafeCreate(new FuncWithErrors(numRetries));
TestSubscriber<String> ts = new TestSubscriber<String>(subscriber);
origin.retryWhen(new Function<Flowable<? extends Throwable>, Flowable<Object>>() {
@Override
public Flowable<Object> apply(Flowable<? extends Throwable> t1) {
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
public void retryWhenDefaultScheduler() {
TestSubscriber<Integer> ts = TestSubscriber.create();
Flowable.just(1)
.concatWith(Flowable.<Integer>error(new TestException()))
.retryWhen((Function)new Function<Flowable, Flowable>() {
@Override
public Flowable apply(Flowable f) {
return f.take(2);
}
}).subscribe(ts);
ts.assertValues(1, 1);
ts.assertNoErrors();
ts.assertComplete();
}
代码示例来源:origin: ReactiveX/RxJava
.retryWhen(new Function<Flowable<Throwable>, Flowable<Integer>>() {
@Override
public Flowable<Integer> apply(Flowable<Throwable> v)
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
public void retryWhenTrampolineScheduler() {
TestSubscriber<Integer> ts = TestSubscriber.create();
Flowable.just(1)
.concatWith(Flowable.<Integer>error(new TestException()))
.subscribeOn(Schedulers.trampoline())
.retryWhen((Function)new Function<Flowable, Flowable>() {
@Override
public Flowable apply(Flowable f) {
return f.take(2);
}
}).subscribe(ts);
ts.assertValues(1, 1);
ts.assertNoErrors();
ts.assertComplete();
}
内容来源于网络,如有侵权,请联系作者删除!