本文整理了Java中rx.Observable.concatDelayError()
方法的一些代码示例,展示了Observable.concatDelayError()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.concatDelayError()
方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:concatDelayError
暂无
代码示例来源:origin: com.microsoft.azure/azure-cosmosdb-gateway
static Single<DatabaseAccount> getDatabaseAccountFromAnyLocationsAsync(
URL defaultEndpoint, List<String> locations, Func1<URL, Single<DatabaseAccount>> getDatabaseAccountFn) {
return getDatabaseAccountFn.call(defaultEndpoint).onErrorResumeNext(
e -> {
logger.error("Fail to reach global gateway [{}], [{}]", defaultEndpoint, e.getMessage());
if (locations.isEmpty()) {
return Single.error(e);
}
rx.Observable<rx.Observable<DatabaseAccount>> obs = rx.Observable.range(0, locations.size())
.map(index -> getDatabaseAccountFn.call(LocationHelper.getLocationEndpoint(defaultEndpoint, locations.get(index))).toObservable());
// iterate and get the database account from the first non failure, otherwise get the last error.
rx.Observable<DatabaseAccount> res = rx.Observable.concatDelayError(obs).first().single();
return res.toSingle().doOnError(
innerE -> {
logger.error("Fail to reach location any of locations", String.join(",", locations), innerE.getMessage());
});
});
}
代码示例来源:origin: Azure/azure-libraries-for-java
/**
* Handles a faulted task.
*
* @param faultedEntry the entry holding faulted task
* @param throwable the reason for fault
* @param context the context object shared across all the task entries in this group during execution
*
* @return an observable represents asynchronous operation in the next stage
*/
private Observable<Indexable> processFaultedTaskAsync(final TaskGroupEntry<TaskItem> faultedEntry,
final Throwable throwable,
final InvocationContext context) {
markGroupAsCancelledIfTerminationStrategyIsIPTC();
reportError(faultedEntry, throwable);
if (isRootEntry(faultedEntry)) {
if (shouldPropagateException(throwable)) {
return toErrorObservable(throwable);
}
return Observable.empty();
} else if (shouldPropagateException(throwable)) {
return Observable.concatDelayError(invokeReadyTasksAsync(context), toErrorObservable(throwable));
} else {
return invokeReadyTasksAsync(context);
}
}
代码示例来源:origin: com.microsoft.azure/azure-arm-client-runtime
/**
* Handles a faulted task.
*
* @param faultedEntry the entry holding faulted task
* @param throwable the reason for fault
* @param context the context object shared across all the task entries in this group during execution
*
* @return an observable represents asynchronous operation in the next stage
*/
private Observable<Indexable> processFaultedTaskAsync(final TaskGroupEntry<TaskItem> faultedEntry,
final Throwable throwable,
final InvocationContext context) {
markGroupAsCancelledIfTerminationStrategyIsIPTC();
reportError(faultedEntry, throwable);
if (isRootEntry(faultedEntry)) {
if (shouldPropagateException(throwable)) {
return toErrorObservable(throwable);
}
return Observable.empty();
} else if (shouldPropagateException(throwable)) {
return Observable.concatDelayError(invokeReadyTasksAsync(context), toErrorObservable(throwable));
} else {
return invokeReadyTasksAsync(context);
}
}
代码示例来源:origin: com.microsoft.azure/azure-mgmt-resources
/**
* Handles a faulted task.
*
* @param faultedEntry the entry holding faulted task
* @param throwable the reason for fault
* @param context the context object shared across all the task entries in this group during execution
*
* @return an observable represents asynchronous operation in the next stage
*/
private Observable<Indexable> processFaultedTaskAsync(final TaskGroupEntry<TaskItem> faultedEntry,
final Throwable throwable,
final InvocationContext context) {
markGroupAsCancelledIfTerminationStrategyIsIPTC();
reportError(faultedEntry, throwable);
if (isRootEntry(faultedEntry)) {
if (shouldPropagateException(throwable)) {
return toErrorObservable(throwable);
}
return Observable.empty();
} else if (shouldPropagateException(throwable)) {
return Observable.concatDelayError(invokeReadyTasksAsync(context), toErrorObservable(throwable));
} else {
return invokeReadyTasksAsync(context);
}
}
代码示例来源:origin: akarnokd/akarnokd-misc
@Test
public void test() {
Observable<Integer> obs1 = Observable.create(emitter -> {
emitter.onNext(1);
emitter.onCompleted();
}, Emitter.BackpressureMode.BUFFER);
Observable<Integer> obs2 = Observable.create(emitter -> {
emitter.onNext(2);
emitter.onCompleted();
}, Emitter.BackpressureMode.BUFFER);
Observable<Integer> both = Observable
.concatDelayError(obs1, obs2)
.replay().autoConnect();
both.test().assertResult(1, 2);
both.test().assertResult(1, 2);
}
}
内容来源于网络,如有侵权,请联系作者删除!