rx.Observable.lastOrDefault()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(11.6k)|赞(0)|评价(0)|浏览(139)

本文整理了Java中rx.Observable.lastOrDefault()方法的一些代码示例,展示了Observable.lastOrDefault()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.lastOrDefault()方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:lastOrDefault

Observable.lastOrDefault介绍

[英]Returns an Observable that emits only the last item emitted by the source Observable, or a default item if the source Observable completes without emitting any items.

Scheduler: lastOrDefault does not operate by default on a particular Scheduler.
[中]返回仅发射源可观测项发射的最后一项的可观测项,或者如果源可观测项完成而不发射任何项,则返回默认项。
调度程序:默认情况下,lastOrDefault不会在特定调度程序上运行。

代码示例

代码示例来源:origin: apache/usergrid

private void removeDuplicateEdgesAsync(GraphManager gm, Edge edge){
  //now read all older versions of an edge, and remove them.  Finally calling delete
  final SearchByEdge searchByEdge =
    new SimpleSearchByEdge( edge.getSourceNode(), edge.getType(), edge.getTargetNode(), Long.MAX_VALUE,
      SearchByEdgeType.Order.DESCENDING, Optional.absent() );
  //load our versions, only retain the most recent one
  gm.loadEdgeVersions(searchByEdge).skip(1).flatMap(edgeToDelete -> {
    if (logger.isDebugEnabled()) {
      logger.debug("Duplicate edge. Marking edge {} for deletion", edgeToDelete);
    }
    return gm.markEdge(edgeToDelete );
  }).lastOrDefault(null).doOnNext(lastEdge -> {
    //no op if we hit our default
    if (lastEdge == null) {
      return;
    }
    //don't queue delete b/c that de-indexes, we need to delete the edges only since we have a version still existing to index.
    gm.deleteEdge(lastEdge).toBlocking().lastOrDefault(null); // this should throw an exception
  }).toBlocking().lastOrDefault(null);//this should throw an exception
}

代码示例来源:origin: apache/usergrid

@Override
public Entity restoreApplication(UUID applicationId) throws Exception {
  // get the deleted_application_info for the deleted app
  return (Entity) migrateAppInfo( applicationId, CpNamingUtils.DELETED_APPLICATION_INFO,
    CpNamingUtils.APPLICATION_INFOS , CpNamingUtils.APPLICATION_INFO ).lastOrDefault( null )
     .map( appInfo -> {
       //start the index rebuild
       final ReIndexRequestBuilder builder = reIndexService.getBuilder().withApplicationId( applicationId );
       reIndexService.rebuildIndex( builder );
       //load the entity
       final EntityManager managementEm = getEntityManager( getManagementAppId() );
       try {
         return managementEm.get( new SimpleEntityRef( CpNamingUtils.APPLICATION_INFO, applicationId ) );
       }
       catch ( Exception e ) {
         logger.error( "Failed to get entity", e );
         throw new RuntimeException( e );
       }
     } )
    .toBlocking().lastOrDefault(null);
}

代码示例来源:origin: apache/usergrid

return Observable.merge( sourceMetaCleanup, targetMetaCleanup ).lastOrDefault( null )
           .flatMap(integer -> Observable.from( markedEdges ));
} );

代码示例来源:origin: apache/usergrid

return MathObservable.sumInteger( deleteCounts ).lastOrDefault( 0 ).doOnNext( subTypeUsedCount -> {

代码示例来源:origin: apache/usergrid

} ).lastOrDefault( Collections.emptyList() ).map( list -> mvccEntityCollectionIoEvent );

代码示例来源:origin: leeowenowen/rxjava-examples

@Override
 public void run() {
  Observable.range(1, 10).lastOrDefault(10).subscribe(new Action1<Integer>() {
   @Override
   public void call(Integer integer) {
    log(integer);
   }
  });
  Observable.range(1, 10).lastOrDefault(10, new Func1<Integer, Boolean>() {
   @Override
   public Boolean call(Integer integer) {
    return integer % 2 == 0;
   }
  }).subscribe(new Action1<Integer>() {
   @Override
   public void call(Integer integer) {
    log(integer);
   }
  });
 }
});

代码示例来源:origin: com.netflix.rxjava/rxjava-core

/**
 * Returns the last item emitted by this {@code BlockingObservable} that matches a predicate, or a default
 * value if it emits no such items.
 * <p>
 * <img width="640" height="315" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.lastOrDefault.p.png" alt="">
 *
 * @param defaultValue
 *            a default value to return if this {@code BlockingObservable} emits no matching items
 * @param predicate
 *            a predicate function to evaluate items emitted by this {@code BlockingObservable}
 * @return the last item emitted by this {@code BlockingObservable} that matches the predicate, or the
 *         default value if it emits no matching items
 * @see <a href="https://github.com/ReactiveX/RxJava/wiki/Blocking-Observable-Operators#last-and-lastordefault">RxJava Wiki: lastOrDefault()</a>
 * @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.lastordefault.aspx">MSDN: Observable.LastOrDefault</a>
 */
public T lastOrDefault(T defaultValue, Func1<? super T, Boolean> predicate) {
  return blockForSingle(o.filter(predicate).map(Functions.<T>identity()).lastOrDefault(defaultValue));
}

代码示例来源:origin: com.netflix.rxjava/rxjava-core

/**
 * Returns the last item emitted by this {@code BlockingObservable}, or a default value if it emits no
 * items.
 * <p>
 * <img width="640" height="310" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.lastOrDefault.png" alt="">
 *
 * @param defaultValue
 *            a default value to return if this {@code BlockingObservable} emits no items
 * @return the last item emitted by the {@code BlockingObservable}, or the default value if it emits no
 *         items
 * @see <a href="https://github.com/ReactiveX/RxJava/wiki/Blocking-Observable-Operators#last-and-lastordefault">RxJava Wiki: lastOrDefault()</a>
 * @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.lastordefault.aspx">MSDN: Observable.LastOrDefault</a>
 */
public T lastOrDefault(T defaultValue) {
  return blockForSingle(o.map(Functions.<T>identity()).lastOrDefault(defaultValue));
}

代码示例来源:origin: com.couchbase.client/core-io

@Override
public Observable<LifecycleState> disconnect() {
  LOGGER.debug(logIdent(hostname, this) + "Got instructed to disconnect.");
  return Observable
      .from(endpoints)
      .flatMap(new Func1<Endpoint, Observable<LifecycleState>>() {
        @Override
        public Observable<LifecycleState> call(Endpoint endpoint) {
          LOGGER.debug(logIdent(hostname, AbstractDynamicService.this)
              + "Initializing disconnect on Endpoint.");
          return endpoint.disconnect();
        }
      })
      .lastOrDefault(initialState)
      .map(new Func1<LifecycleState, LifecycleState>() {
        @Override
        public LifecycleState call(final LifecycleState state) {
          endpointStates.terminate();
          return state();
        }
      });
}

代码示例来源:origin: couchbase/couchbase-jvm-core

@Override
public Observable<LifecycleState> disconnect() {
  LOGGER.debug(logIdent(hostname, this) + "Got instructed to disconnect.");
  return Observable
      .from(endpoints)
      .flatMap(new Func1<Endpoint, Observable<LifecycleState>>() {
        @Override
        public Observable<LifecycleState> call(Endpoint endpoint) {
          LOGGER.debug(logIdent(hostname, AbstractDynamicService.this)
              + "Initializing disconnect on Endpoint.");
          return endpoint.disconnect();
        }
      })
      .lastOrDefault(initialState)
      .map(new Func1<LifecycleState, LifecycleState>() {
        @Override
        public LifecycleState call(final LifecycleState state) {
          endpointStates.terminate();
          return state();
        }
      });
}

代码示例来源:origin: couchbase/couchbase-jvm-core

@Override
  public Observable<LifecycleState> disconnect() {
    disconnect = true;
    LOGGER.debug(logIdent(hostname, this) + "Got instructed to disconnect.");

    return Observable
      .from(onDemandEndpoints)
      .flatMap(new Func1<Endpoint, Observable<LifecycleState>>() {
        @Override
        public Observable<LifecycleState> call(Endpoint endpoint) {
          LOGGER.debug(logIdent(hostname, AbstractOnDemandService.this)
            + "Initializing disconnect on Endpoint.");
          return endpoint.disconnect();
        }
      })
      .lastOrDefault(LifecycleState.IDLE)
      .map(new Func1<LifecycleState, LifecycleState>() {
        @Override
        public LifecycleState call(final LifecycleState state) {
          endpointStates().terminate();
          return state();
        }
      });
  }
}

代码示例来源:origin: com.couchbase.client/core-io

@Override
  public Observable<LifecycleState> disconnect() {
    disconnect = true;
    LOGGER.debug(logIdent(hostname, this) + "Got instructed to disconnect.");

    return Observable
      .from(onDemandEndpoints)
      .flatMap(new Func1<Endpoint, Observable<LifecycleState>>() {
        @Override
        public Observable<LifecycleState> call(Endpoint endpoint) {
          LOGGER.debug(logIdent(hostname, AbstractOnDemandService.this)
            + "Initializing disconnect on Endpoint.");
          return endpoint.disconnect();
        }
      })
      .lastOrDefault(LifecycleState.IDLE)
      .map(new Func1<LifecycleState, LifecycleState>() {
        @Override
        public LifecycleState call(final LifecycleState state) {
          endpointStates().terminate();
          return state();
        }
      });
  }
}

代码示例来源:origin: com.couchbase.client/core-io

.lastOrDefault(initialState)
.map(new Func1<LifecycleState, LifecycleState>() {
  @Override

代码示例来源:origin: couchbase/couchbase-jvm-core

.lastOrDefault(initialState)
.map(new Func1<LifecycleState, LifecycleState>() {
  @Override

代码示例来源:origin: alex-shpak/rx-jersey

@Override
public void dispatch(AsyncContext asyncContext, ResourceMethodDispatcher dispatcher, Object resource, ContainerRequest request) throws ProcessingException {
  final ContainerRequestContext requestContext = containerRequestContext.get();
  Completable intercept = Observable.from(requestInterceptors)
      .concatMap(interceptor -> interceptor.intercept(requestContext))
      .lastOrDefault(null)
      .toCompletable();
  Single<?> dispatch = Single.defer(() -> (Single<?>) dispatcher.dispatch(resource, request).getEntity());
  intercept.andThen(dispatch)
      .map(response -> response == null ? Response.noContent().build() : response)
      .subscribe(asyncContext::resume, asyncContext::resume);
}

代码示例来源:origin: com.couchbase.client/java-client

.lastOrDefault(null)

代码示例来源:origin: org.jboss.hal/hal-flow

/** Executes multiple tasks in order. */
  static <C extends FlowContext> Single<C> series(C context, Collection<? extends Task<C>> tasks) {
    return Observable.from(tasks)
        .flatMapSingle(task -> task.call(context).toSingleDefault(context), false, 1)
        .doOnSubscribe(() -> context.progress.reset(tasks.size()))
        .doOnNext(c -> c.progress.tick())
        .doOnTerminate(context.progress::finish)
        .lastOrDefault(context).toSingle();
  }
}

代码示例来源:origin: com.couchbase.client/core-io

@Override
public Observable<LifecycleState> connect() {
  LOGGER.debug(logIdent(hostname, this) + "Got instructed to connect.");
  if (state() == LifecycleState.CONNECTED || state() == LifecycleState.CONNECTING) {
    LOGGER.debug(logIdent(hostname, this) + "Already connected or connecting, skipping connect.");
    return Observable.just(state());
  }
  for (int i = 0; i < minEndpoints; i++) {
    Endpoint endpoint = createEndpoint();
    endpoints[i] = endpoint;
    endpointStates.register(endpoint, endpoint);
  }
  return Observable
    .from(endpoints)
    .flatMap(new Func1<Endpoint, Observable<LifecycleState>>() {
      @Override
      public Observable<LifecycleState> call(final Endpoint endpoint) {
        LOGGER.debug(logIdent(hostname, AbstractDynamicService.this)
            + "Initializing connect on Endpoint.");
        return endpoint.connect();
      }
    })
    .lastOrDefault(initialState)
    .map(new Func1<LifecycleState, LifecycleState>() {
      @Override
      public LifecycleState call(final LifecycleState state) {
        return state();
      }
    });
}

代码示例来源:origin: couchbase/couchbase-jvm-core

@Override
public Observable<LifecycleState> connect() {
  LOGGER.debug(logIdent(hostname, this) + "Got instructed to connect.");
  if (state() == LifecycleState.CONNECTED || state() == LifecycleState.CONNECTING) {
    LOGGER.debug(logIdent(hostname, this) + "Already connected or connecting, skipping connect.");
    return Observable.just(state());
  }
  for (int i = 0; i < minEndpoints; i++) {
    Endpoint endpoint = createEndpoint();
    endpoints[i] = endpoint;
    endpointStates.register(endpoint, endpoint);
  }
  return Observable
    .from(endpoints)
    .flatMap(new Func1<Endpoint, Observable<LifecycleState>>() {
      @Override
      public Observable<LifecycleState> call(final Endpoint endpoint) {
        LOGGER.debug(logIdent(hostname, AbstractDynamicService.this)
            + "Initializing connect on Endpoint.");
        return endpoint.connect();
      }
    })
    .lastOrDefault(initialState)
    .map(new Func1<LifecycleState, LifecycleState>() {
      @Override
      public LifecycleState call(final LifecycleState state) {
        return state();
      }
    });
}

代码示例来源:origin: couchbase/couchbase-jvm-core

.lastOrDefault(initialState)
.map(new Func1<LifecycleState, LifecycleState>() {
  @Override

相关文章

Observable类方法