本文整理了Java中rx.Observable.lastOrDefault()
方法的一些代码示例,展示了Observable.lastOrDefault()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.lastOrDefault()
方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:lastOrDefault
[英]Returns an Observable that emits only the last item emitted by the source Observable, or a default item if the source Observable completes without emitting any items.
Scheduler: lastOrDefault does not operate by default on a particular Scheduler.
[中]返回仅发射源可观测项发射的最后一项的可观测项,或者如果源可观测项完成而不发射任何项,则返回默认项。
调度程序:默认情况下,lastOrDefault不会在特定调度程序上运行。
代码示例来源:origin: apache/usergrid
private void removeDuplicateEdgesAsync(GraphManager gm, Edge edge){
//now read all older versions of an edge, and remove them. Finally calling delete
final SearchByEdge searchByEdge =
new SimpleSearchByEdge( edge.getSourceNode(), edge.getType(), edge.getTargetNode(), Long.MAX_VALUE,
SearchByEdgeType.Order.DESCENDING, Optional.absent() );
//load our versions, only retain the most recent one
gm.loadEdgeVersions(searchByEdge).skip(1).flatMap(edgeToDelete -> {
if (logger.isDebugEnabled()) {
logger.debug("Duplicate edge. Marking edge {} for deletion", edgeToDelete);
}
return gm.markEdge(edgeToDelete );
}).lastOrDefault(null).doOnNext(lastEdge -> {
//no op if we hit our default
if (lastEdge == null) {
return;
}
//don't queue delete b/c that de-indexes, we need to delete the edges only since we have a version still existing to index.
gm.deleteEdge(lastEdge).toBlocking().lastOrDefault(null); // this should throw an exception
}).toBlocking().lastOrDefault(null);//this should throw an exception
}
代码示例来源:origin: apache/usergrid
@Override
public Entity restoreApplication(UUID applicationId) throws Exception {
// get the deleted_application_info for the deleted app
return (Entity) migrateAppInfo( applicationId, CpNamingUtils.DELETED_APPLICATION_INFO,
CpNamingUtils.APPLICATION_INFOS , CpNamingUtils.APPLICATION_INFO ).lastOrDefault( null )
.map( appInfo -> {
//start the index rebuild
final ReIndexRequestBuilder builder = reIndexService.getBuilder().withApplicationId( applicationId );
reIndexService.rebuildIndex( builder );
//load the entity
final EntityManager managementEm = getEntityManager( getManagementAppId() );
try {
return managementEm.get( new SimpleEntityRef( CpNamingUtils.APPLICATION_INFO, applicationId ) );
}
catch ( Exception e ) {
logger.error( "Failed to get entity", e );
throw new RuntimeException( e );
}
} )
.toBlocking().lastOrDefault(null);
}
代码示例来源:origin: apache/usergrid
return Observable.merge( sourceMetaCleanup, targetMetaCleanup ).lastOrDefault( null )
.flatMap(integer -> Observable.from( markedEdges ));
} );
代码示例来源:origin: apache/usergrid
return MathObservable.sumInteger( deleteCounts ).lastOrDefault( 0 ).doOnNext( subTypeUsedCount -> {
代码示例来源:origin: apache/usergrid
} ).lastOrDefault( Collections.emptyList() ).map( list -> mvccEntityCollectionIoEvent );
代码示例来源:origin: leeowenowen/rxjava-examples
@Override
public void run() {
Observable.range(1, 10).lastOrDefault(10).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
log(integer);
}
});
Observable.range(1, 10).lastOrDefault(10, new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
return integer % 2 == 0;
}
}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
log(integer);
}
});
}
});
代码示例来源:origin: com.netflix.rxjava/rxjava-core
/**
* Returns the last item emitted by this {@code BlockingObservable} that matches a predicate, or a default
* value if it emits no such items.
* <p>
* <img width="640" height="315" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.lastOrDefault.p.png" alt="">
*
* @param defaultValue
* a default value to return if this {@code BlockingObservable} emits no matching items
* @param predicate
* a predicate function to evaluate items emitted by this {@code BlockingObservable}
* @return the last item emitted by this {@code BlockingObservable} that matches the predicate, or the
* default value if it emits no matching items
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Blocking-Observable-Operators#last-and-lastordefault">RxJava Wiki: lastOrDefault()</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.lastordefault.aspx">MSDN: Observable.LastOrDefault</a>
*/
public T lastOrDefault(T defaultValue, Func1<? super T, Boolean> predicate) {
return blockForSingle(o.filter(predicate).map(Functions.<T>identity()).lastOrDefault(defaultValue));
}
代码示例来源:origin: com.netflix.rxjava/rxjava-core
/**
* Returns the last item emitted by this {@code BlockingObservable}, or a default value if it emits no
* items.
* <p>
* <img width="640" height="310" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.lastOrDefault.png" alt="">
*
* @param defaultValue
* a default value to return if this {@code BlockingObservable} emits no items
* @return the last item emitted by the {@code BlockingObservable}, or the default value if it emits no
* items
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Blocking-Observable-Operators#last-and-lastordefault">RxJava Wiki: lastOrDefault()</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.lastordefault.aspx">MSDN: Observable.LastOrDefault</a>
*/
public T lastOrDefault(T defaultValue) {
return blockForSingle(o.map(Functions.<T>identity()).lastOrDefault(defaultValue));
}
代码示例来源:origin: com.couchbase.client/core-io
@Override
public Observable<LifecycleState> disconnect() {
LOGGER.debug(logIdent(hostname, this) + "Got instructed to disconnect.");
return Observable
.from(endpoints)
.flatMap(new Func1<Endpoint, Observable<LifecycleState>>() {
@Override
public Observable<LifecycleState> call(Endpoint endpoint) {
LOGGER.debug(logIdent(hostname, AbstractDynamicService.this)
+ "Initializing disconnect on Endpoint.");
return endpoint.disconnect();
}
})
.lastOrDefault(initialState)
.map(new Func1<LifecycleState, LifecycleState>() {
@Override
public LifecycleState call(final LifecycleState state) {
endpointStates.terminate();
return state();
}
});
}
代码示例来源:origin: couchbase/couchbase-jvm-core
@Override
public Observable<LifecycleState> disconnect() {
LOGGER.debug(logIdent(hostname, this) + "Got instructed to disconnect.");
return Observable
.from(endpoints)
.flatMap(new Func1<Endpoint, Observable<LifecycleState>>() {
@Override
public Observable<LifecycleState> call(Endpoint endpoint) {
LOGGER.debug(logIdent(hostname, AbstractDynamicService.this)
+ "Initializing disconnect on Endpoint.");
return endpoint.disconnect();
}
})
.lastOrDefault(initialState)
.map(new Func1<LifecycleState, LifecycleState>() {
@Override
public LifecycleState call(final LifecycleState state) {
endpointStates.terminate();
return state();
}
});
}
代码示例来源:origin: couchbase/couchbase-jvm-core
@Override
public Observable<LifecycleState> disconnect() {
disconnect = true;
LOGGER.debug(logIdent(hostname, this) + "Got instructed to disconnect.");
return Observable
.from(onDemandEndpoints)
.flatMap(new Func1<Endpoint, Observable<LifecycleState>>() {
@Override
public Observable<LifecycleState> call(Endpoint endpoint) {
LOGGER.debug(logIdent(hostname, AbstractOnDemandService.this)
+ "Initializing disconnect on Endpoint.");
return endpoint.disconnect();
}
})
.lastOrDefault(LifecycleState.IDLE)
.map(new Func1<LifecycleState, LifecycleState>() {
@Override
public LifecycleState call(final LifecycleState state) {
endpointStates().terminate();
return state();
}
});
}
}
代码示例来源:origin: com.couchbase.client/core-io
@Override
public Observable<LifecycleState> disconnect() {
disconnect = true;
LOGGER.debug(logIdent(hostname, this) + "Got instructed to disconnect.");
return Observable
.from(onDemandEndpoints)
.flatMap(new Func1<Endpoint, Observable<LifecycleState>>() {
@Override
public Observable<LifecycleState> call(Endpoint endpoint) {
LOGGER.debug(logIdent(hostname, AbstractOnDemandService.this)
+ "Initializing disconnect on Endpoint.");
return endpoint.disconnect();
}
})
.lastOrDefault(LifecycleState.IDLE)
.map(new Func1<LifecycleState, LifecycleState>() {
@Override
public LifecycleState call(final LifecycleState state) {
endpointStates().terminate();
return state();
}
});
}
}
代码示例来源:origin: com.couchbase.client/core-io
.lastOrDefault(initialState)
.map(new Func1<LifecycleState, LifecycleState>() {
@Override
代码示例来源:origin: couchbase/couchbase-jvm-core
.lastOrDefault(initialState)
.map(new Func1<LifecycleState, LifecycleState>() {
@Override
代码示例来源:origin: alex-shpak/rx-jersey
@Override
public void dispatch(AsyncContext asyncContext, ResourceMethodDispatcher dispatcher, Object resource, ContainerRequest request) throws ProcessingException {
final ContainerRequestContext requestContext = containerRequestContext.get();
Completable intercept = Observable.from(requestInterceptors)
.concatMap(interceptor -> interceptor.intercept(requestContext))
.lastOrDefault(null)
.toCompletable();
Single<?> dispatch = Single.defer(() -> (Single<?>) dispatcher.dispatch(resource, request).getEntity());
intercept.andThen(dispatch)
.map(response -> response == null ? Response.noContent().build() : response)
.subscribe(asyncContext::resume, asyncContext::resume);
}
代码示例来源:origin: com.couchbase.client/java-client
.lastOrDefault(null)
代码示例来源:origin: org.jboss.hal/hal-flow
/** Executes multiple tasks in order. */
static <C extends FlowContext> Single<C> series(C context, Collection<? extends Task<C>> tasks) {
return Observable.from(tasks)
.flatMapSingle(task -> task.call(context).toSingleDefault(context), false, 1)
.doOnSubscribe(() -> context.progress.reset(tasks.size()))
.doOnNext(c -> c.progress.tick())
.doOnTerminate(context.progress::finish)
.lastOrDefault(context).toSingle();
}
}
代码示例来源:origin: com.couchbase.client/core-io
@Override
public Observable<LifecycleState> connect() {
LOGGER.debug(logIdent(hostname, this) + "Got instructed to connect.");
if (state() == LifecycleState.CONNECTED || state() == LifecycleState.CONNECTING) {
LOGGER.debug(logIdent(hostname, this) + "Already connected or connecting, skipping connect.");
return Observable.just(state());
}
for (int i = 0; i < minEndpoints; i++) {
Endpoint endpoint = createEndpoint();
endpoints[i] = endpoint;
endpointStates.register(endpoint, endpoint);
}
return Observable
.from(endpoints)
.flatMap(new Func1<Endpoint, Observable<LifecycleState>>() {
@Override
public Observable<LifecycleState> call(final Endpoint endpoint) {
LOGGER.debug(logIdent(hostname, AbstractDynamicService.this)
+ "Initializing connect on Endpoint.");
return endpoint.connect();
}
})
.lastOrDefault(initialState)
.map(new Func1<LifecycleState, LifecycleState>() {
@Override
public LifecycleState call(final LifecycleState state) {
return state();
}
});
}
代码示例来源:origin: couchbase/couchbase-jvm-core
@Override
public Observable<LifecycleState> connect() {
LOGGER.debug(logIdent(hostname, this) + "Got instructed to connect.");
if (state() == LifecycleState.CONNECTED || state() == LifecycleState.CONNECTING) {
LOGGER.debug(logIdent(hostname, this) + "Already connected or connecting, skipping connect.");
return Observable.just(state());
}
for (int i = 0; i < minEndpoints; i++) {
Endpoint endpoint = createEndpoint();
endpoints[i] = endpoint;
endpointStates.register(endpoint, endpoint);
}
return Observable
.from(endpoints)
.flatMap(new Func1<Endpoint, Observable<LifecycleState>>() {
@Override
public Observable<LifecycleState> call(final Endpoint endpoint) {
LOGGER.debug(logIdent(hostname, AbstractDynamicService.this)
+ "Initializing connect on Endpoint.");
return endpoint.connect();
}
})
.lastOrDefault(initialState)
.map(new Func1<LifecycleState, LifecycleState>() {
@Override
public LifecycleState call(final LifecycleState state) {
return state();
}
});
}
代码示例来源:origin: couchbase/couchbase-jvm-core
.lastOrDefault(initialState)
.map(new Func1<LifecycleState, LifecycleState>() {
@Override
内容来源于网络,如有侵权,请联系作者删除!