rx.Observable.onErrorResumeNext()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(8.3k)|赞(0)|评价(0)|浏览(181)

本文整理了Java中rx.Observable.onErrorResumeNext()方法的一些代码示例,展示了Observable.onErrorResumeNext()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.onErrorResumeNext()方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:onErrorResumeNext

Observable.onErrorResumeNext介绍

[英]Instructs an Observable to pass control to another Observable rather than invoking Observer#onError if it encounters an error.

By default, when an Observable encounters an error that prevents it from emitting the expected item to its Observer, the Observable invokes its Observer's onError method, and then quits without invoking any more of its Observer's methods. The onErrorResumeNext method changes this behavior. If you pass another Observable ( resumeSequence) to an Observable's onErrorResumeNext method, if the original Observable encounters an error, instead of invoking its Observer's onError method, it will instead relinquish control to resumeSequence which will invoke the Observer's Observer#onNext method if it is able to do so. In such a case, because no Observable necessarily invokes onError, the Observer may never know that an error happened.

You can use this to prevent errors from propagating or to supply fallback data should errors be encountered. Scheduler: onErrorResumeNext does not operate by default on a particular Scheduler.
[中]指示一个可观察对象将控制传递给另一个可观察对象,而不是在遇到错误时调用Observer#onError。
默认情况下,当一个可观察对象遇到一个错误,阻止它向其观察者发送预期的项目时,该可观察对象调用其观察者的onError方法,然后退出,不再调用任何观察者的方法。下一个方法会更改此行为。如果您将另一个可观察对象(resumeSequence)传递给可观察对象的OneRorResumeNext方法,如果原始可观察对象遇到错误,它将放弃对resumeSequence的控制,而resumeSequence将调用观察者的Observer#onNext方法(如果它能够这样做)。在这种情况下,因为没有可观察的对象必然调用onError,所以观察者可能永远不会知道发生了错误。
您可以使用它来防止错误传播,或者在遇到错误时提供回退数据。调度器:OneRorResumeNext默认情况下不会在特定的调度器上运行。

代码示例

代码示例来源:origin: PipelineAI/pipeline

private Object executeObservable(HystrixInvokable invokable, ExecutionType executionType, final MetaHolder metaHolder) {
  return mapObservable(((Observable) CommandExecutor.execute(invokable, executionType, metaHolder))
      .onErrorResumeNext(new Func1<Throwable, Observable>() {
        @Override
        public Observable call(Throwable throwable) {
          if (throwable instanceof HystrixBadRequestException) {
            return Observable.error(throwable.getCause());
          } else if (throwable instanceof HystrixRuntimeException) {
            HystrixRuntimeException hystrixRuntimeException = (HystrixRuntimeException) throwable;
            return Observable.error(hystrixRuntimeExceptionToThrowable(metaHolder, hystrixRuntimeException));
          }
          return Observable.error(throwable);
        }
      }), metaHolder);
}

代码示例来源:origin: ReactiveX/RxNetty

}).onErrorResumeNext(discard());

代码示例来源:origin: PipelineAI/pipeline

/**
 *{@inheritDoc}.
 */
@Override
protected Observable construct() {
  Observable result;
  try {
    Observable observable = toObservable(commandActions.getCommandAction().execute(executionType));
    result = observable
        .onErrorResumeNext(new Func1<Throwable, Observable>() {
          @Override
          public Observable call(Throwable throwable) {
            if (isIgnorable(throwable)) {
              return Observable.error(new HystrixBadRequestException(throwable.getMessage(), throwable));
            }
            return Observable.error(throwable);
          }
        });
    flushCache();
  } catch (CommandActionExecutionException throwable) {
    Throwable cause = throwable.getCause();
    if (isIgnorable(cause)) {
      throw new HystrixBadRequestException(cause.getMessage(), cause);
    }
    throw throwable;
  }
  return result;
}

代码示例来源:origin: hidroh/materialistic

case MODE_CACHE:
  itemObservable = mRestService.cachedItemRx(itemId)
      .onErrorResumeNext(mRestService.itemRx(itemId));
  break;

代码示例来源:origin: ReactiveX/RxNetty

.onErrorResumeNext(new Func1<Throwable, Observable<Void>>() {
  @Override
  public Observable<Void> call(Throwable throwable) {
   .onErrorResumeNext(new Func1<Throwable, Observable<Void>>() {
     @Override
     public Observable<Void> call(Throwable throwable) {

代码示例来源:origin: PipelineAI/pipeline

.doOnNext(markFallbackEmit)
.doOnCompleted(markFallbackCompleted)
.onErrorResumeNext(handleFallbackError)
.doOnTerminate(singleSemaphoreRelease)
.doOnUnsubscribe(singleSemaphoreRelease);

代码示例来源:origin: jhusain/learnrxjava

/**
 * Don't modify any values in the stream but do handle the error
 * and replace it with "default-value".
 */
public Observable<String> handleError(Observable<String> data) {
  return data.onErrorResumeNext(Observable.just("default-value"));
}

代码示例来源:origin: PipelineAI/pipeline

.onErrorResumeNext(handleFallback)
.doOnEach(setRequestContext);

代码示例来源:origin: GeekGhost/Ghost

/**
 * ClassificationPresenter
 *
 * @return
 */
public Observable<VideoRes> queryClassification() {
  return mService.getHomePage()
      .map(new ServerResultFunc<VideoRes>())
      .onErrorResumeNext(new HttpResultFunc<VideoRes>())
      .subscribeOn(Schedulers.io())
      .unsubscribeOn(Schedulers.io())
      .observeOn(AndroidSchedulers.mainThread());
}

代码示例来源:origin: jhusain/learnrxjava

.onErrorResumeNext(Observable.just("5) data"))
.subscribe(System.out::println, t -> System.out.println("5) Error: " + t));
.onErrorResumeNext(t -> {
  if (t instanceof IllegalStateException) {
    return Observable.error(t);
.onErrorResumeNext(t -> {
  if (t instanceof IllegalStateException) {
    return Observable.error(t);

代码示例来源:origin: jhusain/learnrxjava

.onErrorResumeNext(e -> Observable.just("Fallback Data"))
    .subscribe(System.out::println);
}).onErrorResumeNext(throwable -> {
  return Observable.just("fallback value");
}).subscribe(System.out::println);
}).onErrorResumeNext(Observable.just("fallback value"))
    .subscribe(System.out::println);

代码示例来源:origin: ribot/ribot-app-android

/**
 * Retrieve list of venues. Behaviour:
 * 1. Return cached venues (empty list if none is cached)
 * 2. Return API venues (if different to cached ones)
 * 3. Save new venues from API in cache
 * 5. If an error happens and cache is not empty, returns venues from cache.
 */
public Observable<List<Venue>> getVenues() {
  String auth = RibotService.Util.buildAuthorization(mPreferencesHelper.getAccessToken());
  return mRibotService.getVenues(auth)
      .doOnNext(new Action1<List<Venue>>() {
        @Override
        public void call(List<Venue> venues) {
          mPreferencesHelper.putVenues(venues);
        }
      })
      .onErrorResumeNext(new Func1<Throwable, Observable<? extends List<Venue>>>() {
        @Override
        public Observable<? extends List<Venue>> call(Throwable throwable) {
          return getVenuesRecoveryObservable(throwable);
        }
      })
      .startWith(mPreferencesHelper.getVenuesAsObservable())
      .distinct();
}

代码示例来源:origin: jhusain/learnrxjava

.onErrorResumeNext(t -> {
  System.out.println("     ***** complete group: " + groupedObservable.getKey());

代码示例来源:origin: alaisi/postgres-async-driver

@Override
public Observable<Row> queryRows(String sql, Object... params) {
  return PgConnection.this.queryRows(sql, params)
      .onErrorResumeNext(this::doRollback);
}
@Override

代码示例来源:origin: com.github.alaisi.pgasync/postgres-async-driver

@Override
public Observable<ResultSet> querySet(String sql, Object... params) {
  return PgConnection.this.querySet(sql, params)
      .onErrorResumeNext(this::doRollback);
}
<T> Observable<T> doRollback(Throwable t) {

代码示例来源:origin: yammer/tenacity

public static <R> R execute(Observable<R> primary,
                       Observable<R> secondary) {
  return primary
      .onErrorResumeNext(secondary)
      .toBlocking()
      .single();
}

代码示例来源:origin: com.netflix.ribbon/ribbon

public Observable<T> toObservable() {
  Observable<T> rootObservable = null;
  for (final HystrixObservableCommand<T> command : hystrixCommands) {
    Observable<T> observable = command.toObservable();
    rootObservable = rootObservable == null ? observable : rootObservable.onErrorResumeNext(observable);
  }
  return rootObservable;
}

代码示例来源:origin: com.couchbase.client/core-io

@Override
  public Observable<Tuple2<LoaderType, BucketConfig>> call(NetworkAddress seedHost) {
    Observable<Tuple2<LoaderType, BucketConfig>> node = loaderChain.get(0)
        .loadConfig(seedHost, bucket, username, password);
    for (int i = 1; i < loaderChain.size(); i++) {
      node = node.onErrorResumeNext(loaderChain.get(i)
          .loadConfig(seedHost, bucket, username, password));
    }
    return node;
  }
})

代码示例来源:origin: com.netflix.eureka/eureka2-client

@Override
  public Observable<Server> resolve() {
    return primary.resolve()
        .onErrorResumeNext(fallback.resolve());
  }
}

代码示例来源:origin: io.vertx/vertx-rx-java

@Override
 public Observable<T> call(Observable<T> upstream) {
  return sqlConnection.rxSetAutoCommit(false).toCompletable()
   .andThen(upstream)
   .concatWith(sqlConnection.rxCommit().toCompletable().toObservable())
   .onErrorResumeNext(throwable -> {
    return sqlConnection.rxRollback().toCompletable().onErrorComplete()
     .andThen(sqlConnection.rxSetAutoCommit(true).toCompletable().onErrorComplete())
     .andThen(Observable.error(throwable));
   }).concatWith(sqlConnection.rxSetAutoCommit(true).toCompletable().toObservable());
 }
}

相关文章

Observable类方法