rx.Observable.concatDelayError()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(4.4k)|赞(0)|评价(0)|浏览(127)

本文整理了Java中rx.Observable.concatDelayError()方法的一些代码示例,展示了Observable.concatDelayError()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.concatDelayError()方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:concatDelayError

Observable.concatDelayError介绍

暂无

代码示例

代码示例来源:origin: com.microsoft.azure/azure-cosmosdb-gateway

  1. static Single<DatabaseAccount> getDatabaseAccountFromAnyLocationsAsync(
  2. URL defaultEndpoint, List<String> locations, Func1<URL, Single<DatabaseAccount>> getDatabaseAccountFn) {
  3. return getDatabaseAccountFn.call(defaultEndpoint).onErrorResumeNext(
  4. e -> {
  5. logger.error("Fail to reach global gateway [{}], [{}]", defaultEndpoint, e.getMessage());
  6. if (locations.isEmpty()) {
  7. return Single.error(e);
  8. }
  9. rx.Observable<rx.Observable<DatabaseAccount>> obs = rx.Observable.range(0, locations.size())
  10. .map(index -> getDatabaseAccountFn.call(LocationHelper.getLocationEndpoint(defaultEndpoint, locations.get(index))).toObservable());
  11. // iterate and get the database account from the first non failure, otherwise get the last error.
  12. rx.Observable<DatabaseAccount> res = rx.Observable.concatDelayError(obs).first().single();
  13. return res.toSingle().doOnError(
  14. innerE -> {
  15. logger.error("Fail to reach location any of locations", String.join(",", locations), innerE.getMessage());
  16. });
  17. });
  18. }

代码示例来源:origin: Azure/azure-libraries-for-java

  1. /**
  2. * Handles a faulted task.
  3. *
  4. * @param faultedEntry the entry holding faulted task
  5. * @param throwable the reason for fault
  6. * @param context the context object shared across all the task entries in this group during execution
  7. *
  8. * @return an observable represents asynchronous operation in the next stage
  9. */
  10. private Observable<Indexable> processFaultedTaskAsync(final TaskGroupEntry<TaskItem> faultedEntry,
  11. final Throwable throwable,
  12. final InvocationContext context) {
  13. markGroupAsCancelledIfTerminationStrategyIsIPTC();
  14. reportError(faultedEntry, throwable);
  15. if (isRootEntry(faultedEntry)) {
  16. if (shouldPropagateException(throwable)) {
  17. return toErrorObservable(throwable);
  18. }
  19. return Observable.empty();
  20. } else if (shouldPropagateException(throwable)) {
  21. return Observable.concatDelayError(invokeReadyTasksAsync(context), toErrorObservable(throwable));
  22. } else {
  23. return invokeReadyTasksAsync(context);
  24. }
  25. }

代码示例来源:origin: com.microsoft.azure/azure-arm-client-runtime

  1. /**
  2. * Handles a faulted task.
  3. *
  4. * @param faultedEntry the entry holding faulted task
  5. * @param throwable the reason for fault
  6. * @param context the context object shared across all the task entries in this group during execution
  7. *
  8. * @return an observable represents asynchronous operation in the next stage
  9. */
  10. private Observable<Indexable> processFaultedTaskAsync(final TaskGroupEntry<TaskItem> faultedEntry,
  11. final Throwable throwable,
  12. final InvocationContext context) {
  13. markGroupAsCancelledIfTerminationStrategyIsIPTC();
  14. reportError(faultedEntry, throwable);
  15. if (isRootEntry(faultedEntry)) {
  16. if (shouldPropagateException(throwable)) {
  17. return toErrorObservable(throwable);
  18. }
  19. return Observable.empty();
  20. } else if (shouldPropagateException(throwable)) {
  21. return Observable.concatDelayError(invokeReadyTasksAsync(context), toErrorObservable(throwable));
  22. } else {
  23. return invokeReadyTasksAsync(context);
  24. }
  25. }

代码示例来源:origin: com.microsoft.azure/azure-mgmt-resources

  1. /**
  2. * Handles a faulted task.
  3. *
  4. * @param faultedEntry the entry holding faulted task
  5. * @param throwable the reason for fault
  6. * @param context the context object shared across all the task entries in this group during execution
  7. *
  8. * @return an observable represents asynchronous operation in the next stage
  9. */
  10. private Observable<Indexable> processFaultedTaskAsync(final TaskGroupEntry<TaskItem> faultedEntry,
  11. final Throwable throwable,
  12. final InvocationContext context) {
  13. markGroupAsCancelledIfTerminationStrategyIsIPTC();
  14. reportError(faultedEntry, throwable);
  15. if (isRootEntry(faultedEntry)) {
  16. if (shouldPropagateException(throwable)) {
  17. return toErrorObservable(throwable);
  18. }
  19. return Observable.empty();
  20. } else if (shouldPropagateException(throwable)) {
  21. return Observable.concatDelayError(invokeReadyTasksAsync(context), toErrorObservable(throwable));
  22. } else {
  23. return invokeReadyTasksAsync(context);
  24. }
  25. }

代码示例来源:origin: akarnokd/akarnokd-misc

  1. @Test
  2. public void test() {
  3. Observable<Integer> obs1 = Observable.create(emitter -> {
  4. emitter.onNext(1);
  5. emitter.onCompleted();
  6. }, Emitter.BackpressureMode.BUFFER);
  7. Observable<Integer> obs2 = Observable.create(emitter -> {
  8. emitter.onNext(2);
  9. emitter.onCompleted();
  10. }, Emitter.BackpressureMode.BUFFER);
  11. Observable<Integer> both = Observable
  12. .concatDelayError(obs1, obs2)
  13. .replay().autoConnect();
  14. both.test().assertResult(1, 2);
  15. both.test().assertResult(1, 2);
  16. }
  17. }

相关文章

Observable类方法