rx.Observable.buffer()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(12.5k)|赞(0)|评价(0)|浏览(135)

本文整理了Java中rx.Observable.buffer()方法的一些代码示例,展示了Observable.buffer()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.buffer()方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:buffer

Observable.buffer介绍

[英]Returns an Observable that emits buffers of items it collects from the source Observable. The resulting Observable emits connected, non-overlapping buffers, each containing count items. When the source Observable completes or encounters an error, the resulting Observable emits the current buffer and propagates the notification from the source Observable.

Scheduler: This version of buffer does not operate by default on a particular Scheduler.
[中]返回一个Observable,该Observable将从源Observable中收集缓冲区中的项。由此产生的可观测发射连接的、不重叠的缓冲区,每个缓冲区包含计数项。当源可观察对象完成或遇到错误时,生成的可观察对象将发出当前缓冲区,并从源可观察对象传播通知。
调度程序:默认情况下,此版本的缓冲区不会在特定调度程序上运行。

代码示例

代码示例来源:origin: apache/usergrid

@Override
public Observable<MarkedEdge> loadEdgesToTargetByType( final SearchByIdType search ) {
  final Observable<MarkedEdge> edges =
    Observable.create( new ObservableIterator<MarkedEdge>( "loadEdgesToTargetByType" ) {
      @Override
      protected Iterator<MarkedEdge> getIterator() {
        return storageEdgeSerialization.getEdgesToTargetBySourceType( scope, search );
      }
    } ).buffer( graphFig.getScanPageSize() )
         .compose( new EdgeBufferFilter(  search.filterMarked() ) );
  return ObservableTimer.time( edges, loadEdgesToTargetByTypeTimer );
}

代码示例来源:origin: apache/usergrid

@Override
public Observable<MarkedEdge> loadEdgesToTarget( final SearchByEdgeType search ) {
  final Observable<MarkedEdge> edges =
    Observable.create( new ObservableIterator<MarkedEdge>( "loadEdgesToTarget" ) {
      @Override
      protected Iterator<MarkedEdge> getIterator() {
        return storageEdgeSerialization.getEdgesToTarget( scope, search );
      }
    } ).buffer( graphFig.getScanPageSize() )
         .compose( new EdgeBufferFilter( search.filterMarked() ) );
  return ObservableTimer.time( edges, loadEdgesToTargetTimer );
}

代码示例来源:origin: apache/usergrid

@Override
public Observable<MarkedEdge> loadEdgesFromSourceByType( final SearchByIdType search ) {
  final Observable<MarkedEdge> edges =
    Observable.create( new ObservableIterator<MarkedEdge>( "loadEdgesFromSourceByType" ) {
      @Override
      protected Iterator<MarkedEdge> getIterator() {
        return storageEdgeSerialization.getEdgesFromSourceByTargetType( scope, search );
      }
    } ).buffer( graphFig.getScanPageSize() )
         .compose( new EdgeBufferFilter( search.filterMarked() ) );
  return ObservableTimer.time( edges, loadEdgesFromSourceByTypeTimer );
}

代码示例来源:origin: apache/usergrid

@Override
public Observable<MarkedEdge> loadEdgeVersions( final SearchByEdge searchByEdge ) {
  final Observable<MarkedEdge> edges =
    Observable.create( new ObservableIterator<MarkedEdge>( "getEdgeTypesFromSource" ) {
      @Override
      protected Iterator<MarkedEdge> getIterator() {
        return storageEdgeSerialization.getEdgeVersions( scope, searchByEdge );
      }
    } ).buffer( graphFig.getScanPageSize() )
         .compose( new EdgeBufferFilter( searchByEdge.filterMarked() ) );
  return ObservableTimer.time( edges, loadEdgesVersionsTimer );
}

代码示例来源:origin: apache/usergrid

@Override
public Observable<MarkedEdge> loadEdgesFromSource( final SearchByEdgeType search ) {
  final Observable<MarkedEdge> edges =
    Observable.create( new ObservableIterator<MarkedEdge>( "loadEdgesFromSource" ) {
      @Override
      protected Iterator<MarkedEdge> getIterator() {
        return storageEdgeSerialization.getEdgesFromSource( scope, search );
      }
    } ).buffer( graphFig.getScanPageSize() )
         .compose( new EdgeBufferFilter( search.filterMarked() ) );
  return ObservableTimer.time( edges, loadEdgesFromSourceTimer );
}

代码示例来源:origin: apache/usergrid

@Override
public Observable<FilterResult<Entity>> call( final Observable<FilterResult<Id>> filterResultObservable ) {
  final ApplicationScope applicationScope = pipelineContext.getApplicationScope();
  final EntityCollectionManager entityCollectionManager =
    entityCollectionManagerFactory.createCollectionManager( applicationScope );
  //it's more efficient to make 1 network hop to get everything, then drop our results if required
  final Observable<FilterResult<Entity>> entityObservable =
    filterResultObservable.buffer( pipelineContext.getLimit() ).flatMap( bufferedIds -> {
      if (logger.isTraceEnabled()) {
        logger.trace("Attempting to batch load ids {}", bufferedIds);
      }
      final Observable<EntitySet> entitySetObservable =
        Observable.from( bufferedIds ).map( filterResultId -> filterResultId.getValue() ).toList()
             .flatMap( ids -> entityCollectionManager.load( ids ) );
      //now we have a collection, validate our candidate set is correct.
      GraphManager graphManager = graphManagerFactory.createEdgeManager(applicationScope);
      return entitySetObservable.map( entitySet -> new EntityVerifier( applicationScope, graphManager,
        entitySet, bufferedIds, readRepairFig ) )
                   .doOnNext( entityCollector -> entityCollector.merge() ).flatMap(
          entityCollector -> Observable.from( entityCollector.getResults() ) );
    } );
  return entityObservable;
}

代码示例来源:origin: apache/usergrid

@Override
public Observable<ResultsPage<T>> call( final Observable<FilterResult<T>> filterResultObservable ) {
  final int limit = pipelineContext.getLimit();
  return filterResultObservable
    .buffer( limit )
    .flatMap( buffer
      -> Observable
        .from( buffer )
        .collect(() -> new ResultsPageWithCursorCollector( limit ), ( collector, element ) -> collector.add( element ) )
    )
    .map( resultsPageCollector ->
      new ResultsPage(
        resultsPageCollector.results,
        new ResponseCursor( resultsPageCollector.lastPath ), pipelineContext.getLimit()
      )
    );
}

代码示例来源:origin: apache/usergrid

.buffer( indexProcessorFig.getCollectionDeleteBufferSize())
.doOnNext( edgeScopes -> {
  logger.info("Sending batch of {} to be deleted.", edgeScopes.size());

代码示例来源:origin: apache/usergrid

return edgesFromSourceObservable.edgesFromSourceDescending( gm, graphNode.entryNode, true).buffer( 1000 )
                .doOnNext( edges -> {
                    final MutationBatch batch = keyspace.prepareMutationBatch();

代码示例来源:origin: apache/usergrid

.buffer(250, TimeUnit.MILLISECONDS, indexFig.getIndexBatchSize())

代码示例来源:origin: apache/usergrid

@Test()
public void testSequence2(){
  ArrayList listReturn =  Observable.range(0, 2).buffer(2).flatMap(i -> Observable.empty())
    .collect(()->new ArrayList(),(list,i) ->{
      list.add(i);
    }).toBlocking().lastOrDefault(null);
  Assert.assertEquals(listReturn,new ArrayList<Integer>());
}

代码示例来源:origin: apache/usergrid

@Override
public Observable<FilterResult<Id>> call( final Observable<FilterResult<Candidate>> filterResultObservable ) {
  /**
   * A bit kludgy from old 1.0 -> 2.0 apis.  Refactor this as we clean up our lower levels and create new results
   * objects
   */
  final ApplicationScope applicationScope = pipelineContext.getApplicationScope();
  final EntityCollectionManager entityCollectionManager =
    entityCollectionManagerFactory.createCollectionManager( applicationScope );
  final EntityIndex applicationIndex =
    entityIndexFactory.createEntityIndex(indexLocationStrategyFactory.getIndexLocationStrategy(applicationScope));
  final Observable<FilterResult<Id>> searchIdSetObservable =
    filterResultObservable.buffer( pipelineContext.getLimit() ).flatMap( candidateResults -> {
        //flatten toa list of ids to load
        final Observable<List<Id>> candidateIds = Observable.from( candidateResults ).map(
          candidate -> candidate.getValue().getCandidateResult().getId() ).toList();
        //load the ids
        final Observable<VersionSet> versionSetObservable =
          candidateIds.flatMap( ids -> entityCollectionManager.getLatestVersion( ids ) );
        //now we have a collection, validate our canidate set is correct.
        return versionSetObservable.map(
          entitySet -> new EntityCollector( applicationIndex.createBatch(), entitySet,
            candidateResults, indexProducer ) ).doOnNext( entityCollector -> entityCollector.merge() ).flatMap(
          entityCollector -> Observable.from( entityCollector.collectResults() ) );
      } );
  return searchIdSetObservable;
}

代码示例来源:origin: apache/usergrid

collectionIoEventObservable.buffer( serializationFig.getBufferSize() ).flatMap(
  buffer -> Observable.from( buffer ).collect( () -> keyspace.prepareMutationBatch(),
    ( ( mutationBatch, mvccLogEntryCollectionIoEvent ) -> {

代码示例来源:origin: apache/usergrid

@Override
public Observable<IndexOperationMessage> indexEntity( final ApplicationScope applicationScope,
                           final Entity entity ) {
  //bootstrap the lower modules from their caches
  final GraphManager gm = graphManagerFactory.createEdgeManager( applicationScope );
  final EntityIndex ei = entityIndexFactory.createEntityIndex(indexLocationStrategyFactory.getIndexLocationStrategy(applicationScope));
  final Id entityId = entity.getId();
  //we always index in the target scope
  final Observable<Edge> edgesToTarget = edgesObservable.edgesToTarget( gm, entityId, true);
  //we may have to index  we're indexing from source->target here
  final Observable<IndexEdge> sourceEdgesToIndex = edgesToTarget.map( edge -> generateScopeFromSource( edge ) );
  //do our observable for batching
  //try to send a whole batch if we can
  final Observable<IndexOperationMessage>  batches =  sourceEdgesToIndex
    .buffer(indexFig.getIndexBatchSize() )
    //map into batches based on our buffer size
    .flatMap( buffer -> Observable.from( buffer )
      //collect results into a single batch
      .collect( () -> ei.createBatch(), ( batch, indexEdge ) -> {
        if (logger.isDebugEnabled()) {
          logger.debug("adding edge {} to batch for entity {}", indexEdge, entity);
        }
        final Optional<Set<String>> fieldsToIndex = getFilteredStringObjectMap( indexEdge );
        batch.index( indexEdge, entity ,fieldsToIndex);
      } )
        //return the future from the batch execution
      .map( batch -> batch.build() ) );
  return ObservableTimer.time( batches, indexTimer );
}

代码示例来源:origin: apache/usergrid

} ).buffer( 100 ).flatMap( entityIds -> {
  return ecm.load( entityIds );
} )

代码示例来源:origin: apache/usergrid

.buffer( serializationFig.getBufferSize() )

代码示例来源:origin: PipelineAI/pipeline

@Test
public void testSemaphoreIsolatedResponseFromCache() throws Exception {
  CountDownLatch commandLatch = new CountDownLatch(1);
  CountDownLatch threadPoolLatch = new CountDownLatch(1);
  Subscriber<List<HystrixCommandCompletion>> commandListSubscriber = getLatchedSubscriber(commandLatch);
  readCommandStream.observe().buffer(500, TimeUnit.MILLISECONDS).take(1)
      .doOnNext(new Action1<List<HystrixCommandCompletion>>() {
        @Override
        public void call(List<HystrixCommandCompletion> hystrixCommandCompletions) {
          System.out.println("LIST : " + hystrixCommandCompletions);
          assertEquals(3, hystrixCommandCompletions.size());
        }
      })
      .subscribe(commandListSubscriber);
  Subscriber<HystrixCommandCompletion> threadPoolSubscriber = getLatchedSubscriber(threadPoolLatch);
  readThreadPoolStream.observe().take(1).subscribe(threadPoolSubscriber);
  ExecutionResult result = ExecutionResult.from(HystrixEventType.SUCCESS);
  ExecutionResult cache1 = ExecutionResult.from(HystrixEventType.RESPONSE_FROM_CACHE);
  ExecutionResult cache2 = ExecutionResult.from(HystrixEventType.RESPONSE_FROM_CACHE);
  writeToStream.executionDone(result, commandKey, threadPoolKey);
  writeToStream.executionDone(cache1, commandKey, threadPoolKey);
  writeToStream.executionDone(cache2, commandKey, threadPoolKey);
  assertTrue(commandLatch.await(1000, TimeUnit.MILLISECONDS));
  assertFalse(threadPoolLatch.await(1000, TimeUnit.MILLISECONDS));
}

代码示例来源:origin: apache/usergrid

int returned = Observable.merge( input1, input2 ).buffer( 1000 )
             .flatMap( new Func1<List<Integer>, Observable<Integer>>() {
                   @Override

代码示例来源:origin: PipelineAI/pipeline

@Test
public void testThreadIsolatedResponseFromCache() throws Exception {
  CountDownLatch commandLatch = new CountDownLatch(1);
  CountDownLatch threadPoolLatch = new CountDownLatch(1);
  Subscriber<List<HystrixCommandCompletion>> commandListSubscriber = getLatchedSubscriber(commandLatch);
  readCommandStream.observe().buffer(500, TimeUnit.MILLISECONDS).take(1)
      .doOnNext(new Action1<List<HystrixCommandCompletion>>() {
        @Override
        public void call(List<HystrixCommandCompletion> hystrixCommandCompletions) {
          System.out.println("LIST : " + hystrixCommandCompletions);
          assertEquals(3, hystrixCommandCompletions.size());
        }
      })
      .subscribe(commandListSubscriber);
  Subscriber<HystrixCommandCompletion> threadPoolSubscriber = getLatchedSubscriber(threadPoolLatch);
  readThreadPoolStream.observe().take(1).subscribe(threadPoolSubscriber);
  ExecutionResult result = ExecutionResult.from(HystrixEventType.SUCCESS).setExecutedInThread();
  ExecutionResult cache1 = ExecutionResult.from(HystrixEventType.RESPONSE_FROM_CACHE);
  ExecutionResult cache2 = ExecutionResult.from(HystrixEventType.RESPONSE_FROM_CACHE);
  writeToStream.executionDone(result, commandKey, threadPoolKey);
  writeToStream.executionDone(cache1, commandKey, threadPoolKey);
  writeToStream.executionDone(cache2, commandKey, threadPoolKey);
  assertTrue(commandLatch.await(1000, TimeUnit.MILLISECONDS));
  assertTrue(threadPoolLatch.await(1000, TimeUnit.MILLISECONDS));
}

代码示例来源:origin: apache/usergrid

generator.doSearch( manager ).take( readCount ).buffer( 1000 ).subscribe( new Subscriber<List<MarkedEdge>>() {
  @Override
  public void onCompleted() {

相关文章

Observable类方法