io.reactivex.Observable.onErrorReturn()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(11.0k)|赞(0)|评价(0)|浏览(174)

本文整理了Java中io.reactivex.Observable.onErrorReturn()方法的一些代码示例,展示了Observable.onErrorReturn()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.onErrorReturn()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:onErrorReturn

Observable.onErrorReturn介绍

[英]Instructs an ObservableSource to emit an item (returned by a specified function) rather than invoking Observer#onError if it encounters an error.

By default, when an ObservableSource encounters an error that prevents it from emitting the expected item to its Observer, the ObservableSource invokes its Observer's onError method, and then quits without invoking any more of its Observer's methods. The onErrorReturn method changes this behavior. If you pass a function ( resumeFunction) to an ObservableSource's onErrorReturnmethod, if the original ObservableSource encounters an error, instead of invoking its Observer's onError method, it will instead emit the return value of resumeFunction.

You can use this to prevent errors from propagating or to supply fallback data should errors be encountered. Scheduler: onErrorReturn does not operate by default on a particular Scheduler.
[中]指示ObservateSource发出一个项(由指定函数返回),而不是在遇到错误时调用Observator#onError。
默认情况下,当ObservateSource遇到一个错误,阻止它向其观察者发送预期的项时,ObservateSource调用其观察者的OneError方法,然后退出,不再调用任何观察者的方法。OneRorReturn方法会更改此行为。如果将函数(resumeFunction)传递给ObservateSource的onErrorReturnmethod,如果原始ObservateSource遇到错误,它将不会调用其Observater的onError方法,而是发出resumeFunction的返回值。
您可以使用它来防止错误传播,或者在遇到错误时提供回退数据。Scheduler:onErrorReturn默认情况下不会在特定的计划程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = NullPointerException.class)
  2. public void onErrorReturnFunctionNull() {
  3. just1.onErrorReturn(null);
  4. }

代码示例来源:origin: Polidea/RxAndroidBle

  1. /**
  2. * A convenience function creating a transformer that will wrap the emissions in either {@link ResultEvent} or {@link ErrorEvent}
  3. * with a given {@link Type}
  4. *
  5. * @param type the type to wrap with
  6. * @return transformer that will emit an observable that will be emitting ResultEvent or ErrorEvent with a given type
  7. */
  8. @NonNull
  9. private static ObservableTransformer<byte[], PresenterEvent> transformToPresenterEvent(Type type) {
  10. return observable -> observable.map(writtenBytes -> ((PresenterEvent) new ResultEvent(writtenBytes, type)))
  11. .onErrorReturn(throwable -> new ErrorEvent(throwable, type));
  12. }

代码示例来源:origin: Polidea/RxAndroidBle

  1. /**
  2. * A convenience function creating a transformer that will wrap the emissions in either {@link ResultEvent} or {@link ErrorEvent}
  3. * with a given {@link Type} for notification type {@link Observable} (Observable<Observable<byte[]>>)
  4. *
  5. * @param type the type to wrap with
  6. * @return the transformer
  7. */
  8. @NonNull
  9. private static ObservableTransformer<Observable<byte[]>, PresenterEvent> transformToNotificationPresenterEvent(Type type) {
  10. return observableObservable -> observableObservable
  11. .flatMap(observable -> observable
  12. .map(bytes -> ((PresenterEvent) new ResultEvent(bytes, type)))
  13. )
  14. .onErrorReturn(throwable -> new ErrorEvent(throwable, type));
  15. }

代码示例来源:origin: trello/RxLifecycle

  1. private static <R> Observable<Boolean> takeUntilCorrespondingEvent(final Observable<R> lifecycle,
  2. final Function<R, R> correspondingEvents) {
  3. return Observable.combineLatest(
  4. lifecycle.take(1).map(correspondingEvents),
  5. lifecycle.skip(1),
  6. new BiFunction<R, R, Boolean>() {
  7. @Override
  8. public Boolean apply(R bindUntilEvent, R lifecycleEvent) throws Exception {
  9. return lifecycleEvent.equals(bindUntilEvent);
  10. }
  11. })
  12. .onErrorReturn(Functions.RESUME_FUNCTION)
  13. .filter(Functions.SHOULD_COMPLETE);
  14. }
  15. }

代码示例来源:origin: ReactiveX/RxJava

  1. public final Observable<T> onErrorReturnItem(final T item) {
  2. ObjectHelper.requireNonNull(item, "item is null");
  3. return onErrorReturn(Functions.justFunction(item));

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testResumeNext() {
  3. TestObservable f = new TestObservable("one");
  4. Observable<String> w = Observable.unsafeCreate(f);
  5. final AtomicReference<Throwable> capturedException = new AtomicReference<Throwable>();
  6. Observable<String> observable = w.onErrorReturn(new Function<Throwable, String>() {
  7. @Override
  8. public String apply(Throwable e) {
  9. capturedException.set(e);
  10. return "failure";
  11. }
  12. });
  13. Observer<String> observer = TestHelper.mockObserver();
  14. observable.subscribe(observer);
  15. try {
  16. f.t.join();
  17. } catch (InterruptedException e) {
  18. fail(e.getMessage());
  19. }
  20. verify(observer, Mockito.never()).onError(any(Throwable.class));
  21. verify(observer, times(1)).onComplete();
  22. verify(observer, times(1)).onNext("one");
  23. verify(observer, times(1)).onNext("failure");
  24. assertNotNull(capturedException.get());
  25. }

代码示例来源:origin: ReactiveX/RxJava

  1. /**
  2. * Test that when a function throws an exception this is propagated through onError.
  3. */
  4. @Test
  5. public void testFunctionThrowsError() {
  6. TestObservable f = new TestObservable("one");
  7. Observable<String> w = Observable.unsafeCreate(f);
  8. final AtomicReference<Throwable> capturedException = new AtomicReference<Throwable>();
  9. Observable<String> observable = w.onErrorReturn(new Function<Throwable, String>() {
  10. @Override
  11. public String apply(Throwable e) {
  12. capturedException.set(e);
  13. throw new RuntimeException("exception from function");
  14. }
  15. });
  16. Observer<String> observer = TestHelper.mockObserver();
  17. observable.subscribe(observer);
  18. try {
  19. f.t.join();
  20. } catch (InterruptedException e) {
  21. fail(e.getMessage());
  22. }
  23. // we should get the "one" value before the error
  24. verify(observer, times(1)).onNext("one");
  25. // we should have received an onError call on the Observer since the resume function threw an exception
  26. verify(observer, times(1)).onError(any(Throwable.class));
  27. verify(observer, times(0)).onComplete();
  28. assertNotNull(capturedException.get());
  29. }

代码示例来源:origin: ReactiveX/RxJava

  1. Observable<String> observable = w.onErrorReturn(new Function<Throwable, String>() {

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testBackpressure() {
  3. TestObserver<Integer> to = new TestObserver<Integer>();
  4. Observable.range(0, 100000)
  5. .onErrorReturn(new Function<Throwable, Integer>() {
  6. @Override
  7. public Integer apply(Throwable t1) {
  8. return 1;
  9. }
  10. })
  11. .observeOn(Schedulers.computation())
  12. .map(new Function<Integer, Integer>() {
  13. int c;
  14. @Override
  15. public Integer apply(Integer t1) {
  16. if (c++ <= 1) {
  17. // slow
  18. try {
  19. Thread.sleep(500);
  20. } catch (InterruptedException e) {
  21. e.printStackTrace();
  22. }
  23. }
  24. return t1;
  25. }
  26. })
  27. .subscribe(to);
  28. to.awaitTerminalEvent();
  29. to.assertNoErrors();
  30. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = NullPointerException.class)
  2. public void onErrorReturnFunctionReturnsNull() {
  3. Observable.error(new TestException()).onErrorReturn(new Function<Throwable, Object>() {
  4. @Override
  5. public Object apply(Throwable e) {
  6. return null;
  7. }
  8. }).blockingSubscribe();
  9. }

代码示例来源:origin: redisson/redisson

  1. public final Observable<T> onErrorReturnItem(final T item) {
  2. ObjectHelper.requireNonNull(item, "item is null");
  3. return onErrorReturn(Functions.justFunction(item));

代码示例来源:origin: Polidea/RxAndroidBle

  1. .onErrorReturn(throwable -> new InfoEvent("Connection error: " + throwable))

代码示例来源:origin: Marchuck/BlueDuff

  1. private void openInputStream(BluetoothSocket bluetoothSocket) {
  2. getInputStream(bluetoothSocket).map(new Function<InputStream, Boolean>() {
  3. @Override
  4. public Boolean apply(InputStream inputStream) throws Exception {
  5. is = inputStream;
  6. receiveMessages();
  7. return true;
  8. }
  9. }).onErrorReturn(new Function<Throwable, Boolean>() {
  10. @Override
  11. public Boolean apply(Throwable throwable) throws Exception {
  12. return false;
  13. }
  14. }).subscribe(new Consumer<Boolean>() {
  15. @Override
  16. public void accept(Boolean success) throws Exception {
  17. if (!success) {
  18. connectionCallbacks.onDisconnected();
  19. }
  20. }
  21. });
  22. }

代码示例来源:origin: Marchuck/BlueDuff

  1. void openOutputStream(final BluetoothSocket bluetoothSocket) {
  2. Observable.fromCallable(new Callable<OutputStream>() {
  3. @Override
  4. public OutputStream call() throws Exception {
  5. return bluetoothSocket.getOutputStream();
  6. }
  7. }).map(new Function<OutputStream, Boolean>() {
  8. @Override
  9. public Boolean apply(OutputStream outputStream) throws Exception {
  10. os = outputStream;
  11. return true;
  12. }
  13. }).onErrorReturn(new Function<Throwable, Boolean>() {
  14. @Override
  15. public Boolean apply(Throwable throwable) throws Exception {
  16. return false;
  17. }
  18. }).subscribe(new Consumer<Boolean>() {
  19. @Override
  20. public void accept(Boolean canWrite) throws Exception {
  21. if (!canWrite) connectionCallbacks.onError("Cannot send data");
  22. }
  23. });
  24. }

代码示例来源:origin: WallaceXiao/StockChart-MPAndroidChart

  1. @Override
  2. public ObservableSource<CacheResult<T>> apply(final @NonNull T t) throws Exception {
  3. return rxCache.save(key, t).map(new Function<Boolean, CacheResult<T>>() {
  4. @Override
  5. public CacheResult<T> apply(@NonNull Boolean aBoolean) throws Exception {
  6. HttpLog.i("save status => " + aBoolean);
  7. return new CacheResult<T>(false, t);
  8. }
  9. }).onErrorReturn(new Function<Throwable, CacheResult<T>>() {
  10. @Override
  11. public CacheResult<T> apply(@NonNull Throwable throwable) throws Exception {
  12. HttpLog.i("save status => " + throwable);
  13. return new CacheResult<T>(false, t);
  14. }
  15. });
  16. }
  17. });

代码示例来源:origin: groupon/grox

  1. @Override
  2. public Observable<? extends Action<State>> actions() {
  3. final Observable<Action<State>> refresh = just(new RefreshAction());
  4. //don't forget to convert errors in actions
  5. return refresh.concatWith(refreshColor()).onErrorReturn(ErrorAction::new);
  6. }

代码示例来源:origin: VictorAlbertos/RxCacheSamples

  1. public Observable<String> logoutUser() {
  2. return cacheProviders.getCurrentUser(ProviderHelper.<User>withoutLoader(), new EvictProvider(true))
  3. .map(new Function<Reply<User>, String>() {
  4. @Override public String apply(Reply<User> user) throws Exception {
  5. return "Logout";
  6. }
  7. })
  8. .onErrorReturn(new Function<Throwable, String>() {
  9. @Override public String apply(Throwable throwable) {
  10. return "Logout";
  11. }
  12. });
  13. }

代码示例来源:origin: com.trello.rxlifecycle3/rxlifecycle

  1. private static <R> Observable<Boolean> takeUntilCorrespondingEvent(final Observable<R> lifecycle,
  2. final Function<R, R> correspondingEvents) {
  3. return Observable.combineLatest(
  4. lifecycle.take(1).map(correspondingEvents),
  5. lifecycle.skip(1),
  6. new BiFunction<R, R, Boolean>() {
  7. @Override
  8. public Boolean apply(R bindUntilEvent, R lifecycleEvent) throws Exception {
  9. return lifecycleEvent.equals(bindUntilEvent);
  10. }
  11. })
  12. .onErrorReturn(Functions.RESUME_FUNCTION)
  13. .filter(Functions.SHOULD_COMPLETE);
  14. }
  15. }

代码示例来源:origin: com.trello.rxlifecycle2/rxlifecycle

  1. private static <R> Observable<Boolean> takeUntilCorrespondingEvent(final Observable<R> lifecycle,
  2. final Function<R, R> correspondingEvents) {
  3. return Observable.combineLatest(
  4. lifecycle.take(1).map(correspondingEvents),
  5. lifecycle.skip(1),
  6. new BiFunction<R, R, Boolean>() {
  7. @Override
  8. public Boolean apply(R bindUntilEvent, R lifecycleEvent) throws Exception {
  9. return lifecycleEvent.equals(bindUntilEvent);
  10. }
  11. })
  12. .onErrorReturn(Functions.RESUME_FUNCTION)
  13. .filter(Functions.SHOULD_COMPLETE);
  14. }
  15. }

代码示例来源:origin: cn.leancloud/storage-core

  1. result = wrapObservable(QueryResultCache.getInstance().getCacheResult(className, query, maxAgeInMilliseconds, false));
  2. if (null != result) {
  3. result = result.onErrorReturn(new Function<Throwable, List<AVObject>>() {
  4. public List<AVObject> apply(Throwable o) throws Exception {
  5. LOGGER.d("failed to query local cache, cause: " + o.getMessage() + ", try to query networking");
  6. return o.getResults();
  7. }).onErrorReturn(new Function<Throwable, List<AVObject>>() {
  8. public List<AVObject> apply(Throwable o) throws Exception {
  9. LOGGER.d("failed to query networking, cause: " + o.getMessage() + ", try to query local cache.");

相关文章

Observable类方法