本文整理了Java中io.reactivex.Observable.retryWhen()
方法的一些代码示例,展示了Observable.retryWhen()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.retryWhen()
方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:retryWhen
[英]Returns an Observable that emits the same values as the source ObservableSource with the exception of an onError. An onError notification from the source will result in the emission of a Throwable item to the ObservableSource provided as an argument to the notificationHandlerfunction. If that ObservableSource calls onComplete or onError then retry will call onComplete or onError on the child subscription. Otherwise, this ObservableSource will resubscribe to the source ObservableSource.
Example: This retries 3 times, each time incrementing the number of seconds it waits.
Observable.create((ObservableEmitter<? super String> s) -> {
System.out.println("subscribing");
s.onError(new RuntimeException("always fails"));
}).retryWhen(attempts -> {
return attempts.zipWith(Observable.range(1, 3), (n, i) -> i).flatMap(i -> {
System.out.println("delay retry by " + i + " second(s)");
return Observable.timer(i, TimeUnit.SECONDS);
});
}).blockingForEach(System.out::println);
Output is:
subscribing
Note that the inner ObservableSource returned by the handler function should signal either onNext, onError or onComplete in response to the received Throwable to indicate the operator should retry or terminate. If the upstream to the operator is asynchronous, signalling onNext followed by onComplete immediately may result in the sequence to be completed immediately. Similarly, if this inner ObservableSource signals onError or onComplete while the upstream is active, the sequence is terminated with the same signal immediately.
The following example demonstrates how to retry an asynchronous source with a delay:
Observable.timer(1, TimeUnit.SECONDS)
.doOnSubscribe(s -> System.out.println("subscribing"))
.map(v -> { throw new RuntimeException(); })
.retryWhen(errors -> {
AtomicInteger counter = new AtomicInteger();
return errors
.takeWhile(e -> counter.getAndIncrement() != 3)
.flatMap(e -> {
System.out.println("delay retry by " + counter.get() + " second(s)");
return Observable.timer(counter.get(), TimeUnit.SECONDS);
});
})
.blockingSubscribe(System.out::println, System.out::println);
Scheduler: retryWhen does not operate by default on a particular Scheduler.
[中]返回与源ObservableSource发出相同值的Observable,但onError除外。源发出的一个错误通知将导致一个可丢弃的项目发送到作为NotificationHandler函数参数提供的ObservableSource。如果该ObservableSource调用onComplete或onError,则retry将调用子订阅上的onComplete或onError。否则,此ObservableSource将重新订阅源ObservableSource。
示例:重试3次,每次增加等待的秒数。
Observable.create((ObservableEmitter<? super String> s) -> {
System.out.println("subscribing");
s.onError(new RuntimeException("always fails"));
}).retryWhen(attempts -> {
return attempts.zipWith(Observable.range(1, 3), (n, i) -> i).flatMap(i -> {
System.out.println("delay retry by " + i + " second(s)");
return Observable.timer(i, TimeUnit.SECONDS);
});
}).blockingForEach(System.out::println);
输出为:
subscribing
请注意,处理程序函数返回的内部ObservableSource应向onNext、onError或onComplete发送信号,以响应接收到的Throwable,指示操作员应重试或终止。如果操作员的上游是异步的,则发出onNext后紧接着onComplete的信号可能会导致序列立即完成。类似地,如果在上游处于活动状态时,该内部可观测源发出onError或onComplete信号,则序列立即以相同的信号终止。
以下示例演示如何延迟重试异步源:
Observable.timer(1, TimeUnit.SECONDS)
.doOnSubscribe(s -> System.out.println("subscribing"))
.map(v -> { throw new RuntimeException(); })
.retryWhen(errors -> {
AtomicInteger counter = new AtomicInteger();
return errors
.takeWhile(e -> counter.getAndIncrement() != 3)
.flatMap(e -> {
System.out.println("delay retry by " + counter.get() + " second(s)");
return Observable.timer(counter.get(), TimeUnit.SECONDS);
});
})
.blockingSubscribe(System.out::println, System.out::println);
调度程序:retryWhen默认情况下不会在特定调度程序上运行。
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void retryWhenFunctionNull() {
just1.retryWhen(null);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void managerThrows() {
Observable.just(1)
.retryWhen(new Function<Observable<Throwable>, ObservableSource<Object>>() {
@Override
public ObservableSource<Object> apply(Observable<Throwable> v) throws Exception {
throw new TestException();
}
})
.test()
.assertFailure(TestException.class);
}
}
代码示例来源:origin: rengwuxian/RxJavaSamples
.retryWhen(new Function<Observable<? extends Throwable>, Observable<?>>() {
@Override
public Observable<?> apply(Observable<? extends Throwable> observable) {
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testOnErrorFromNotificationHandler() {
Observer<String> observer = TestHelper.mockObserver();
Observable<String> origin = Observable.unsafeCreate(new FuncWithErrors(2));
origin.retryWhen(new Function<Observable<? extends Throwable>, Observable<?>>() {
@Override
public Observable<?> apply(Observable<? extends Throwable> t1) {
return Observable.error(new RuntimeException());
}
}).subscribe(observer);
InOrder inOrder = inOrder(observer);
inOrder.verify(observer).onSubscribe((Disposable)notNull());
inOrder.verify(observer, never()).onNext("beginningEveryTime");
inOrder.verify(observer, never()).onNext("onSuccessOnly");
inOrder.verify(observer, never()).onComplete();
inOrder.verify(observer, times(1)).onError(any(RuntimeException.class));
inOrder.verifyNoMoreInteractions();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testOnCompletedFromNotificationHandler() {
Observer<String> observer = TestHelper.mockObserver();
Observable<String> origin = Observable.unsafeCreate(new FuncWithErrors(1));
TestObserver<String> to = new TestObserver<String>(observer);
origin.retryWhen(new Function<Observable<? extends Throwable>, Observable<?>>() {
@Override
public Observable<?> apply(Observable<? extends Throwable> t1) {
return Observable.empty();
}
}).subscribe(to);
InOrder inOrder = inOrder(observer);
inOrder.verify(observer).onSubscribe((Disposable)notNull());
inOrder.verify(observer, never()).onNext("beginningEveryTime");
inOrder.verify(observer, never()).onNext("onSuccessOnly");
inOrder.verify(observer, times(1)).onComplete();
inOrder.verify(observer, never()).onError(any(Exception.class));
inOrder.verifyNoMoreInteractions();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testOnNextFromNotificationHandler() {
Observer<String> observer = TestHelper.mockObserver();
int numRetries = 2;
Observable<String> origin = Observable.unsafeCreate(new FuncWithErrors(numRetries));
origin.retryWhen(new Function<Observable<? extends Throwable>, Observable<Object>>() {
@Override
public Observable<Object> apply(Observable<? extends Throwable> t1) {
return t1.map(new Function<Throwable, Integer>() {
@Override
public Integer apply(Throwable t1) {
return 0;
}
}).startWith(0).cast(Object.class);
}
}).subscribe(observer);
InOrder inOrder = inOrder(observer);
// should show 3 attempts
inOrder.verify(observer, times(numRetries + 1)).onNext("beginningEveryTime");
// should have no errors
inOrder.verify(observer, never()).onError(any(Throwable.class));
// should have a single success
inOrder.verify(observer, times(1)).onNext("onSuccessOnly");
// should have a single successful onComplete
inOrder.verify(observer, times(1)).onComplete();
inOrder.verifyNoMoreInteractions();
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void retryWhenFunctionReturnsNull() {
Observable.error(new TestException()).retryWhen(new Function<Observable<? extends Throwable>, Observable<Object>>() {
@Override
public Observable<Object> apply(Observable<? extends Throwable> f) {
return null;
}
}).blockingSubscribe();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void shouldDisposeInnerObservable() {
final PublishSubject<Object> subject = PublishSubject.create();
final Disposable disposable = Observable.error(new RuntimeException("Leak"))
.retryWhen(new Function<Observable<Throwable>, ObservableSource<Object>>() {
@Override
public ObservableSource<Object> apply(Observable<Throwable> errors) throws Exception {
return errors.switchMap(new Function<Throwable, ObservableSource<Object>>() {
@Override
public ObservableSource<Object> apply(Throwable ignore) throws Exception {
return subject;
}
});
}
})
.subscribe();
assertTrue(subject.hasObservers());
disposable.dispose();
assertFalse(subject.hasObservers());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testSingleSubscriptionOnFirst() throws Exception {
final AtomicInteger inc = new AtomicInteger(0);
ObservableSource<Integer> onSubscribe = new ObservableSource<Integer>() {
@Override
public void subscribe(Observer<? super Integer> observer) {
observer.onSubscribe(Disposables.empty());
final int emit = inc.incrementAndGet();
observer.onNext(emit);
observer.onComplete();
}
};
int first = Observable.unsafeCreate(onSubscribe)
.retryWhen(new Function<Observable<? extends Throwable>, Observable<?>>() {
@Override
public Observable<?> apply(Observable<? extends Throwable> attempt) {
return attempt.zipWith(Observable.just(1), new BiFunction<Throwable, Integer, Void>() {
@Override
public Void apply(Throwable o, Integer integer) {
return null;
}
});
}
})
.blockingFirst();
assertEquals("Observer did not receive the expected output", 1, first);
assertEquals("Subscribe was not called once", 1, inc.get());
}
代码示例来源:origin: ReactiveX/RxJava
producer.retryWhen(new Function<Observable<? extends Throwable>, Observable<Object>>() {
代码示例来源:origin: ReactiveX/RxJava
@Test
public void noCancelPreviousRepeatWhen2() {
final AtomicInteger counter = new AtomicInteger();
final AtomicInteger times = new AtomicInteger();
Observable<Integer> source = Observable.<Integer>error(new TestException()).doOnDispose(new Action() {
@Override
public void run() throws Exception {
counter.getAndIncrement();
}
});
source.retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Observable<Throwable> e) throws Exception {
return e.takeWhile(new Predicate<Object>() {
@Override
public boolean test(Object v) throws Exception {
return times.getAndIncrement() < 4;
}
});
}
})
.test()
.assertResult();
assertEquals(0, counter.get());
}
代码示例来源:origin: ReactiveX/RxJava
Observable<String> origin = Observable.unsafeCreate(new FuncWithErrors(numRetries));
TestObserver<String> to = new TestObserver<String>(observer);
origin.retryWhen(new Function<Observable<? extends Throwable>, Observable<Object>>() {
@Override
public Observable<Object> apply(Observable<? extends Throwable> t1) {
代码示例来源:origin: ReactiveX/RxJava
source.retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Observable<Throwable> e) throws Exception {
代码示例来源:origin: Polidea/RxAndroidBle
writeOperationAckStrategy, byteBuffer, emitterWrapper
))
.retryWhen(errorIsRetryableAndAccordingTo(writeOperationRetryStrategy, byteBuffer, batchSize))
.subscribe(new Observer<ByteAssociation<UUID>>() {
@Override
代码示例来源:origin: ReactiveX/RxJava
.retryWhen(new Function<Observable<Throwable>, ObservableSource<Integer>>() {
@Override
public ObservableSource<Integer> apply(Observable<Throwable> v)
代码示例来源:origin: Polidea/RxAndroidBle
.retryWhen(errorNotificationHandler -> errorNotificationHandler),
sharedNotifyButtonClicks.compose(onSubscribeSetText(notifyButton, R.string.setup_notification)),
sharedNotifyButtonClicks.compose(onSubscribeSetText(notifyButton, R.string.setting_notification)),
代码示例来源:origin: JessYanCoding/ArmsComponent
/**
* 重试
* @param <T>
* @return 重试次数
*/
public static <T> ObservableTransformer<T, T> retry2() {
return upstream -> upstream.retryWhen(new RetryWithDelay(2, 2));
}
代码示例来源:origin: JessYanCoding/ArmsComponent
.retryWhen(new RetryWithDelay(3, 2))//遇到错误时重试,第一个参数为重试几次,第二个参数为重试的间隔
.doOnSubscribe(disposable -> {
if (pullToRefresh)
代码示例来源:origin: commonsguy/cw-androidarch
Observable<String> getPassphrase(Uri source, final int count) {
return(getWordsFromSource(source)
.map(strings -> (randomSubset(strings, count)))
.map(pieces -> TextUtils.join(" ", pieces))
.flatMap(checker::validate)
.retryWhen(errors -> errors.retry(3)));
}
代码示例来源:origin: AriesHoo/FastLib
/**
* 检查版本--是否传递本地App 版本相关信息根据具体接口而定(demo这里是可以不需要传的,所有判断逻辑放在app端--不推荐)
*
* @return
*/
public Observable<UpdateEntity> updateApp() {
Map<String, Object> params = new HashMap<>(2);
params.put("versionCode", FastUtil.getVersionCode(App.getContext()));
params.put("versionName", FastUtil.getVersionName(App.getContext()));
return FastTransformer.switchSchedulers(getApiService().updateApp(params).retryWhen(new FastRetryWhen()));
}
}
内容来源于网络,如有侵权,请联系作者删除!