[英]Returns an Observable that emits a single item, a list composed of all the items emitted by the source Observable.

Normally, an Observable that returns multiple items will do so by invoking its Observer's Observer#onNext method for each such item. You can change this behavior, instructing the Observable to compose a list of all of these items and then to invoke the Observer's onNextfunction once, passing it the entire list, by calling the Observable's toList method prior to calling its #subscribe method.

Be careful not to use this operator on Observables that emit infinite or very large numbers of items, as you do not have the option to unsubscribe. Backpressure Support: This operator does not support backpressure as by intent it is requesting and buffering everything. Scheduler: toList does not operate by default on a particular Scheduler.


代码示例来源:origin: PipelineAI/pipeline

  public Observable<List<CachedValuesHistogram>> call(Observable<CachedValuesHistogram> windowOf2) {
    return windowOf2.toList();

代码示例来源:origin: grandcentrix/tray

private List<String> getNiceString(final Collection<TrayItem> items) {
  return Observable.from(items)
      .map(new Func1<TrayItem, String>() {
        public String call(final TrayItem trayItem) {
          return "key: '" + trayItem.key() + "' value '" + trayItem.value() + "'";

代码示例来源:origin: spring-projects/spring-framework

public Observable<Void> createWithObservable(@RequestBody Observable<Person> observable) {
  return observable.toList().doOnNext(persons::addAll).flatMap(document -> Observable.empty());

代码示例来源:origin: jooby-project/jooby

static Route.Mapper mapper() {
 return Route.Mapper.create("mongo-rx", v -> {
  if (v instanceof FindObservable) {
   return ((FindObservable) v).toObservable().toList();
  } else if (v instanceof ListCollectionsObservable) {
   return ((ListCollectionsObservable) v).toObservable().toList();
  } else if (v instanceof ListDatabasesObservable) {
   return ((ListDatabasesObservable) v).toObservable().toList();
  } else if (v instanceof AggregateObservable) {
   return ((AggregateObservable) v).toObservable().toList();
  } else if (v instanceof DistinctObservable) {
   return ((DistinctObservable) v).toObservable().toList();
  } else if (v instanceof MapReduceObservable) {
   return ((MapReduceObservable) v).toObservable().toList();
  } else if (v instanceof MongoObservable) {
   return ((MongoObservable) v).toObservable();
  return v;

代码示例来源:origin: Rukey7/MvpApp

public void getData(boolean isRefresh) {
      .filter(new Func1<VideoInfo, Boolean>() {
        public Boolean call(VideoInfo info) {
          // 判断是否存于下载中
          return (info.getDownloadStatus() != DownloadStatus.NORMAL &&
              info.getDownloadStatus() != DownloadStatus.COMPLETE);
      .subscribe(new Action1<List<VideoInfo>>() {
        public void call(List<VideoInfo> videoList) {
          if (ListUtils.isEmpty(videoList)) {
          } else {

代码示例来源:origin: jooby-project/jooby

public <T> Observable<AsyncViewQueryResult<T>> query(final ViewQuery query) {
 return bucket.query(query)
   .map(result -> {
    Observable<List<T>> rows = result.rows()
      .flatMap(r -> r.document()
        .map(doc -> {
         EntityDocument<T> entity = converter.toEntity(doc, null);
         return entity.content();
    return new AsyncViewQueryResult(result.totalRows(), rows);

代码示例来源:origin: apache/usergrid

public Observable<FilterResult<Entity>> call( final Observable<FilterResult<Id>> filterResultObservable ) {
  final ApplicationScope applicationScope = pipelineContext.getApplicationScope();
  final EntityCollectionManager entityCollectionManager =
    entityCollectionManagerFactory.createCollectionManager( applicationScope );
  //it's more efficient to make 1 network hop to get everything, then drop our results if required
  final Observable<FilterResult<Entity>> entityObservable =
    filterResultObservable.buffer( pipelineContext.getLimit() ).flatMap( bufferedIds -> {
      if (logger.isTraceEnabled()) {
        logger.trace("Attempting to batch load ids {}", bufferedIds);
      final Observable<EntitySet> entitySetObservable =
        Observable.from( bufferedIds ).map( filterResultId -> filterResultId.getValue() ).toList()
             .flatMap( ids -> entityCollectionManager.load( ids ) );
      //now we have a collection, validate our candidate set is correct.
      GraphManager graphManager = graphManagerFactory.createEdgeManager(applicationScope);
      return entitySetObservable.map( entitySet -> new EntityVerifier( applicationScope, graphManager,
        entitySet, bufferedIds, readRepairFig ) )
                   .doOnNext( entityCollector -> entityCollector.merge() ).flatMap(
          entityCollector -> Observable.from( entityCollector.getResults() ) );
    } );
  return entityObservable;

代码示例来源:origin: spring-projects/spring-framework

public void publisherToRxObservable() {
  List<Integer> sequence = Arrays.asList(1, 2, 3);
  Publisher<Integer> source = Flowable.fromIterable(sequence);
  Object target = getAdapter(rx.Observable.class).fromPublisher(source);
  assertTrue(target instanceof rx.Observable);
  assertEquals(sequence, ((rx.Observable<?>) target).toList().toBlocking().first());

代码示例来源:origin: jooby-project/jooby

public <T> Observable<List<T>> query(final N1qlQuery query) {
 return bucket.query(query)
   .flatMap(aqr -> Observable.zip(aqr.rows().toList(),
     (rows, errors, finalSuccess) -> {
      if (!finalSuccess) {
       throw new QueryExecutionException(
         "execution of query resulted in exception: ",
         Try.apply(() -> errors.get(0)).orElse(null));
      List<T> value = new ArrayList<>();
      for (AsyncN1qlQueryRow row : rows) {
       try {
        T v = converter.fromBytes(row.byteValue());
       } catch (IOException ex) {
        throw new QueryExecutionException(
          "execution of query resulted in exception", null, ex);
      return value;

代码示例来源:origin: spring-projects/spring-framework

public void observableTestBean() throws Exception {
  String body = "[{\"bar\":\"b1\",\"foo\":\"f1\"},{\"bar\":\"b2\",\"foo\":\"f2\"}]";
  ResolvableType type = forClassWithGenerics(Observable.class, TestBean.class);
  MethodParameter param = this.testMethod.arg(type);
  Observable<?> observable = resolveValue(param, body);
  assertEquals(Arrays.asList(new TestBean("f1", "b1"), new TestBean("f2", "b2")),

代码示例来源:origin: PipelineAI/pipeline

 * Run the command via {@link com.netflix.hystrix.HystrixCommand#observe()}, immediately block and then assert
 * @param command command to run
 * @param assertion assertions to check
 * @param isSuccess should the command succeed?
protected void assertBlockingObserve(C command, Action1<C> assertion, boolean isSuccess) {
  System.out.println("Running command.observe(), immediately blocking and then running assertions...");
  if (isSuccess) {
    try {
    } catch (Exception ex) {
      throw new RuntimeException(ex);
  } else {
    try {
      fail("Expected a command failure!");
    } catch (Exception ex) {
      System.out.println("Received expected ex : " + ex);

代码示例来源:origin: PipelineAI/pipeline

} catch (Exception ex) {
  throw new RuntimeException(ex);
  fail("Expected a command failure!");
} catch (Exception ex) {

代码示例来源:origin: apache/usergrid

public Observable<FilterResult<Id>> call( final Observable<FilterResult<Candidate>> filterResultObservable ) {
   * A bit kludgy from old 1.0 -> 2.0 apis.  Refactor this as we clean up our lower levels and create new results
   * objects
  final ApplicationScope applicationScope = pipelineContext.getApplicationScope();
  final EntityCollectionManager entityCollectionManager =
    entityCollectionManagerFactory.createCollectionManager( applicationScope );
  final EntityIndex applicationIndex =
  final Observable<FilterResult<Id>> searchIdSetObservable =
    filterResultObservable.buffer( pipelineContext.getLimit() ).flatMap( candidateResults -> {
        //flatten toa list of ids to load
        final Observable<List<Id>> candidateIds = Observable.from( candidateResults ).map(
          candidate -> candidate.getValue().getCandidateResult().getId() ).toList();
        //load the ids
        final Observable<VersionSet> versionSetObservable =
          candidateIds.flatMap( ids -> entityCollectionManager.getLatestVersion( ids ) );
        //now we have a collection, validate our canidate set is correct.
        return versionSetObservable.map(
          entitySet -> new EntityCollector( applicationIndex.createBatch(), entitySet,
            candidateResults, indexProducer ) ).doOnNext( entityCollector -> entityCollector.merge() ).flatMap(
          entityCollector -> Observable.from( entityCollector.collectResults() ) );
      } );
  return searchIdSetObservable;

代码示例来源:origin: apache/usergrid

private List<MarkedEdge> createConnectionSearchEdges( final Entity testEntity, final GraphManager graphManager,
                           final int edgeCount ) {
  final List<MarkedEdge> connectionSearchEdges = Observable.range( 0, edgeCount ).flatMap( integer -> {
    //create our connection edge.
    final Id connectingId = createId( "connecting" );
    final Edge connectionEdge = CpNamingUtils.createConnectionEdge( connectingId, "likes", testEntity.getId() );
    return graphManager.writeEdge( connectionEdge ).subscribeOn( Schedulers.io() );
  }, 20).toList().toBlocking().last();
  assertEquals( "All edges saved", edgeCount, connectionSearchEdges.size() );
  return connectionSearchEdges;

代码示例来源:origin: apache/usergrid

public void testSequence3(){
  ArrayList listReturn =  Observable.range(0, 2)
    .collect(()->new ArrayList(),(list,i) ->{
  Assert.assertEquals(listReturn, Observable.range(0, 2).toList().toBlocking().last());

代码示例来源:origin: PipelineAI/pipeline

List<String> results = overall.toList().toBlocking().first(); //wait for all commands to complete

代码示例来源:origin: PipelineAI/pipeline

final List<Boolean> blockingList = Observable.merge(results).toList().toBlocking().single();

代码示例来源:origin: apache/usergrid

public void testSingleConnection() {
  final ApplicationScope applicationScope = new ApplicationScopeImpl( new SimpleId( "application" ) );
  final GraphManager gm = graphManagerFactory.createEdgeManager( applicationScope );
  //now write a single connection
  final Id source = new SimpleId( "source" );
  //add to a collection
  final String collectionName = "testCollection";
  final Edge collectionEdge = CpNamingUtils.createCollectionEdge( applicationScope.getApplication(), collectionName, source );
  final Edge writtenCollection = gm.writeEdge( collectionEdge ).toBlocking().last();
  assertNotNull("Collection edge written", writtenCollection);
  final Id target = new SimpleId( "target" );
  final String connectionType = "testConnection";
  final Edge connectionEdge = CpNamingUtils.createConnectionEdge( source, connectionType, target );
  final Edge writtenConnection = gm.writeEdge( connectionEdge ).toBlocking().last();
  //now run the cleanup
  final int count =
    connectionService.deDupeConnections( Observable.just( applicationScope ) ).count().toBlocking().last();
  assertEquals( "No edges deleted", 0, count );
  //now ensure we can read the edge.
  final SearchByEdge simpleSearchByEdge =
    new SimpleSearchByEdge( source, connectionEdge.getType(), target, Long.MAX_VALUE,
      SearchByEdgeType.Order.DESCENDING, Optional.absent() );
  final List<MarkedEdge> edges = gm.loadEdgeVersions( simpleSearchByEdge ).toList().toBlocking().last();
  assertEquals( 1, edges.size() );
  assertEquals( writtenConnection, edges.get( 0 ) );

代码示例来源:origin: PipelineAI/pipeline

 * Test support of multiple onNext events.
public void testExecutionSuccessWithMultipleEvents() {
  try {
    TestCommandWithMultipleValues command = new TestCommandWithMultipleValues();
    assertEquals(Arrays.asList(true, false, true), command.observe().toList().toBlocking().single());
    assertEquals(null, command.getFailedExecutionException());
    assertTrue(command.getExecutionTimeInMilliseconds() > -1);
    assertCommandExecutionEvents(command, HystrixEventType.EMIT, HystrixEventType.EMIT, HystrixEventType.EMIT, HystrixEventType.SUCCESS);
    assertEquals(0, command.metrics.getCurrentConcurrentExecutionCount());
    // semaphore isolated
  } catch (Exception e) {
    fail("We received an exception.");

代码示例来源:origin: PipelineAI/pipeline

Observable.merge(result1, result2).toList().toBlocking().single(); //await the 2 latent commands

