rx.Observable.lastOrDefault()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(11.6k)|赞(0)|评价(0)|浏览(160)

本文整理了Java中rx.Observable.lastOrDefault()方法的一些代码示例,展示了Observable.lastOrDefault()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.lastOrDefault()方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:lastOrDefault

Observable.lastOrDefault介绍

[英]Returns an Observable that emits only the last item emitted by the source Observable, or a default item if the source Observable completes without emitting any items.

Scheduler: lastOrDefault does not operate by default on a particular Scheduler.
[中]返回仅发射源可观测项发射的最后一项的可观测项,或者如果源可观测项完成而不发射任何项,则返回默认项。
调度程序:默认情况下,lastOrDefault不会在特定调度程序上运行。

代码示例

代码示例来源:origin: apache/usergrid

  1. private void removeDuplicateEdgesAsync(GraphManager gm, Edge edge){
  2. //now read all older versions of an edge, and remove them. Finally calling delete
  3. final SearchByEdge searchByEdge =
  4. new SimpleSearchByEdge( edge.getSourceNode(), edge.getType(), edge.getTargetNode(), Long.MAX_VALUE,
  5. SearchByEdgeType.Order.DESCENDING, Optional.absent() );
  6. //load our versions, only retain the most recent one
  7. gm.loadEdgeVersions(searchByEdge).skip(1).flatMap(edgeToDelete -> {
  8. if (logger.isDebugEnabled()) {
  9. logger.debug("Duplicate edge. Marking edge {} for deletion", edgeToDelete);
  10. }
  11. return gm.markEdge(edgeToDelete );
  12. }).lastOrDefault(null).doOnNext(lastEdge -> {
  13. //no op if we hit our default
  14. if (lastEdge == null) {
  15. return;
  16. }
  17. //don't queue delete b/c that de-indexes, we need to delete the edges only since we have a version still existing to index.
  18. gm.deleteEdge(lastEdge).toBlocking().lastOrDefault(null); // this should throw an exception
  19. }).toBlocking().lastOrDefault(null);//this should throw an exception
  20. }

代码示例来源:origin: apache/usergrid

  1. @Override
  2. public Entity restoreApplication(UUID applicationId) throws Exception {
  3. // get the deleted_application_info for the deleted app
  4. return (Entity) migrateAppInfo( applicationId, CpNamingUtils.DELETED_APPLICATION_INFO,
  5. CpNamingUtils.APPLICATION_INFOS , CpNamingUtils.APPLICATION_INFO ).lastOrDefault( null )
  6. .map( appInfo -> {
  7. //start the index rebuild
  8. final ReIndexRequestBuilder builder = reIndexService.getBuilder().withApplicationId( applicationId );
  9. reIndexService.rebuildIndex( builder );
  10. //load the entity
  11. final EntityManager managementEm = getEntityManager( getManagementAppId() );
  12. try {
  13. return managementEm.get( new SimpleEntityRef( CpNamingUtils.APPLICATION_INFO, applicationId ) );
  14. }
  15. catch ( Exception e ) {
  16. logger.error( "Failed to get entity", e );
  17. throw new RuntimeException( e );
  18. }
  19. } )
  20. .toBlocking().lastOrDefault(null);
  21. }

代码示例来源:origin: apache/usergrid

  1. return Observable.merge( sourceMetaCleanup, targetMetaCleanup ).lastOrDefault( null )
  2. .flatMap(integer -> Observable.from( markedEdges ));
  3. } );

代码示例来源:origin: apache/usergrid

  1. return MathObservable.sumInteger( deleteCounts ).lastOrDefault( 0 ).doOnNext( subTypeUsedCount -> {

代码示例来源:origin: apache/usergrid

  1. } ).lastOrDefault( Collections.emptyList() ).map( list -> mvccEntityCollectionIoEvent );

代码示例来源:origin: leeowenowen/rxjava-examples

  1. @Override
  2. public void run() {
  3. Observable.range(1, 10).lastOrDefault(10).subscribe(new Action1<Integer>() {
  4. @Override
  5. public void call(Integer integer) {
  6. log(integer);
  7. }
  8. });
  9. Observable.range(1, 10).lastOrDefault(10, new Func1<Integer, Boolean>() {
  10. @Override
  11. public Boolean call(Integer integer) {
  12. return integer % 2 == 0;
  13. }
  14. }).subscribe(new Action1<Integer>() {
  15. @Override
  16. public void call(Integer integer) {
  17. log(integer);
  18. }
  19. });
  20. }
  21. });

代码示例来源:origin: com.netflix.rxjava/rxjava-core

  1. /**
  2. * Returns the last item emitted by this {@code BlockingObservable} that matches a predicate, or a default
  3. * value if it emits no such items.
  4. * <p>
  5. * <img width="640" height="315" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.lastOrDefault.p.png" alt="">
  6. *
  7. * @param defaultValue
  8. * a default value to return if this {@code BlockingObservable} emits no matching items
  9. * @param predicate
  10. * a predicate function to evaluate items emitted by this {@code BlockingObservable}
  11. * @return the last item emitted by this {@code BlockingObservable} that matches the predicate, or the
  12. * default value if it emits no matching items
  13. * @see <a href="https://github.com/ReactiveX/RxJava/wiki/Blocking-Observable-Operators#last-and-lastordefault">RxJava Wiki: lastOrDefault()</a>
  14. * @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.lastordefault.aspx">MSDN: Observable.LastOrDefault</a>
  15. */
  16. public T lastOrDefault(T defaultValue, Func1<? super T, Boolean> predicate) {
  17. return blockForSingle(o.filter(predicate).map(Functions.<T>identity()).lastOrDefault(defaultValue));
  18. }

代码示例来源:origin: com.netflix.rxjava/rxjava-core

  1. /**
  2. * Returns the last item emitted by this {@code BlockingObservable}, or a default value if it emits no
  3. * items.
  4. * <p>
  5. * <img width="640" height="310" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.lastOrDefault.png" alt="">
  6. *
  7. * @param defaultValue
  8. * a default value to return if this {@code BlockingObservable} emits no items
  9. * @return the last item emitted by the {@code BlockingObservable}, or the default value if it emits no
  10. * items
  11. * @see <a href="https://github.com/ReactiveX/RxJava/wiki/Blocking-Observable-Operators#last-and-lastordefault">RxJava Wiki: lastOrDefault()</a>
  12. * @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.lastordefault.aspx">MSDN: Observable.LastOrDefault</a>
  13. */
  14. public T lastOrDefault(T defaultValue) {
  15. return blockForSingle(o.map(Functions.<T>identity()).lastOrDefault(defaultValue));
  16. }

代码示例来源:origin: com.couchbase.client/core-io

  1. @Override
  2. public Observable<LifecycleState> disconnect() {
  3. LOGGER.debug(logIdent(hostname, this) + "Got instructed to disconnect.");
  4. return Observable
  5. .from(endpoints)
  6. .flatMap(new Func1<Endpoint, Observable<LifecycleState>>() {
  7. @Override
  8. public Observable<LifecycleState> call(Endpoint endpoint) {
  9. LOGGER.debug(logIdent(hostname, AbstractDynamicService.this)
  10. + "Initializing disconnect on Endpoint.");
  11. return endpoint.disconnect();
  12. }
  13. })
  14. .lastOrDefault(initialState)
  15. .map(new Func1<LifecycleState, LifecycleState>() {
  16. @Override
  17. public LifecycleState call(final LifecycleState state) {
  18. endpointStates.terminate();
  19. return state();
  20. }
  21. });
  22. }

代码示例来源:origin: couchbase/couchbase-jvm-core

  1. @Override
  2. public Observable<LifecycleState> disconnect() {
  3. LOGGER.debug(logIdent(hostname, this) + "Got instructed to disconnect.");
  4. return Observable
  5. .from(endpoints)
  6. .flatMap(new Func1<Endpoint, Observable<LifecycleState>>() {
  7. @Override
  8. public Observable<LifecycleState> call(Endpoint endpoint) {
  9. LOGGER.debug(logIdent(hostname, AbstractDynamicService.this)
  10. + "Initializing disconnect on Endpoint.");
  11. return endpoint.disconnect();
  12. }
  13. })
  14. .lastOrDefault(initialState)
  15. .map(new Func1<LifecycleState, LifecycleState>() {
  16. @Override
  17. public LifecycleState call(final LifecycleState state) {
  18. endpointStates.terminate();
  19. return state();
  20. }
  21. });
  22. }

代码示例来源:origin: couchbase/couchbase-jvm-core

  1. @Override
  2. public Observable<LifecycleState> disconnect() {
  3. disconnect = true;
  4. LOGGER.debug(logIdent(hostname, this) + "Got instructed to disconnect.");
  5. return Observable
  6. .from(onDemandEndpoints)
  7. .flatMap(new Func1<Endpoint, Observable<LifecycleState>>() {
  8. @Override
  9. public Observable<LifecycleState> call(Endpoint endpoint) {
  10. LOGGER.debug(logIdent(hostname, AbstractOnDemandService.this)
  11. + "Initializing disconnect on Endpoint.");
  12. return endpoint.disconnect();
  13. }
  14. })
  15. .lastOrDefault(LifecycleState.IDLE)
  16. .map(new Func1<LifecycleState, LifecycleState>() {
  17. @Override
  18. public LifecycleState call(final LifecycleState state) {
  19. endpointStates().terminate();
  20. return state();
  21. }
  22. });
  23. }
  24. }

代码示例来源:origin: com.couchbase.client/core-io

  1. @Override
  2. public Observable<LifecycleState> disconnect() {
  3. disconnect = true;
  4. LOGGER.debug(logIdent(hostname, this) + "Got instructed to disconnect.");
  5. return Observable
  6. .from(onDemandEndpoints)
  7. .flatMap(new Func1<Endpoint, Observable<LifecycleState>>() {
  8. @Override
  9. public Observable<LifecycleState> call(Endpoint endpoint) {
  10. LOGGER.debug(logIdent(hostname, AbstractOnDemandService.this)
  11. + "Initializing disconnect on Endpoint.");
  12. return endpoint.disconnect();
  13. }
  14. })
  15. .lastOrDefault(LifecycleState.IDLE)
  16. .map(new Func1<LifecycleState, LifecycleState>() {
  17. @Override
  18. public LifecycleState call(final LifecycleState state) {
  19. endpointStates().terminate();
  20. return state();
  21. }
  22. });
  23. }
  24. }

代码示例来源:origin: com.couchbase.client/core-io

  1. .lastOrDefault(initialState)
  2. .map(new Func1<LifecycleState, LifecycleState>() {
  3. @Override

代码示例来源:origin: couchbase/couchbase-jvm-core

  1. .lastOrDefault(initialState)
  2. .map(new Func1<LifecycleState, LifecycleState>() {
  3. @Override

代码示例来源:origin: alex-shpak/rx-jersey

  1. @Override
  2. public void dispatch(AsyncContext asyncContext, ResourceMethodDispatcher dispatcher, Object resource, ContainerRequest request) throws ProcessingException {
  3. final ContainerRequestContext requestContext = containerRequestContext.get();
  4. Completable intercept = Observable.from(requestInterceptors)
  5. .concatMap(interceptor -> interceptor.intercept(requestContext))
  6. .lastOrDefault(null)
  7. .toCompletable();
  8. Single<?> dispatch = Single.defer(() -> (Single<?>) dispatcher.dispatch(resource, request).getEntity());
  9. intercept.andThen(dispatch)
  10. .map(response -> response == null ? Response.noContent().build() : response)
  11. .subscribe(asyncContext::resume, asyncContext::resume);
  12. }

代码示例来源:origin: com.couchbase.client/java-client

  1. .lastOrDefault(null)

代码示例来源:origin: org.jboss.hal/hal-flow

  1. /** Executes multiple tasks in order. */
  2. static <C extends FlowContext> Single<C> series(C context, Collection<? extends Task<C>> tasks) {
  3. return Observable.from(tasks)
  4. .flatMapSingle(task -> task.call(context).toSingleDefault(context), false, 1)
  5. .doOnSubscribe(() -> context.progress.reset(tasks.size()))
  6. .doOnNext(c -> c.progress.tick())
  7. .doOnTerminate(context.progress::finish)
  8. .lastOrDefault(context).toSingle();
  9. }
  10. }

代码示例来源:origin: com.couchbase.client/core-io

  1. @Override
  2. public Observable<LifecycleState> connect() {
  3. LOGGER.debug(logIdent(hostname, this) + "Got instructed to connect.");
  4. if (state() == LifecycleState.CONNECTED || state() == LifecycleState.CONNECTING) {
  5. LOGGER.debug(logIdent(hostname, this) + "Already connected or connecting, skipping connect.");
  6. return Observable.just(state());
  7. }
  8. for (int i = 0; i < minEndpoints; i++) {
  9. Endpoint endpoint = createEndpoint();
  10. endpoints[i] = endpoint;
  11. endpointStates.register(endpoint, endpoint);
  12. }
  13. return Observable
  14. .from(endpoints)
  15. .flatMap(new Func1<Endpoint, Observable<LifecycleState>>() {
  16. @Override
  17. public Observable<LifecycleState> call(final Endpoint endpoint) {
  18. LOGGER.debug(logIdent(hostname, AbstractDynamicService.this)
  19. + "Initializing connect on Endpoint.");
  20. return endpoint.connect();
  21. }
  22. })
  23. .lastOrDefault(initialState)
  24. .map(new Func1<LifecycleState, LifecycleState>() {
  25. @Override
  26. public LifecycleState call(final LifecycleState state) {
  27. return state();
  28. }
  29. });
  30. }

代码示例来源:origin: couchbase/couchbase-jvm-core

  1. @Override
  2. public Observable<LifecycleState> connect() {
  3. LOGGER.debug(logIdent(hostname, this) + "Got instructed to connect.");
  4. if (state() == LifecycleState.CONNECTED || state() == LifecycleState.CONNECTING) {
  5. LOGGER.debug(logIdent(hostname, this) + "Already connected or connecting, skipping connect.");
  6. return Observable.just(state());
  7. }
  8. for (int i = 0; i < minEndpoints; i++) {
  9. Endpoint endpoint = createEndpoint();
  10. endpoints[i] = endpoint;
  11. endpointStates.register(endpoint, endpoint);
  12. }
  13. return Observable
  14. .from(endpoints)
  15. .flatMap(new Func1<Endpoint, Observable<LifecycleState>>() {
  16. @Override
  17. public Observable<LifecycleState> call(final Endpoint endpoint) {
  18. LOGGER.debug(logIdent(hostname, AbstractDynamicService.this)
  19. + "Initializing connect on Endpoint.");
  20. return endpoint.connect();
  21. }
  22. })
  23. .lastOrDefault(initialState)
  24. .map(new Func1<LifecycleState, LifecycleState>() {
  25. @Override
  26. public LifecycleState call(final LifecycleState state) {
  27. return state();
  28. }
  29. });
  30. }

代码示例来源:origin: couchbase/couchbase-jvm-core

  1. .lastOrDefault(initialState)
  2. .map(new Func1<LifecycleState, LifecycleState>() {
  3. @Override

相关文章

Observable类方法