rx.Observable.collect()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(9.7k)|赞(0)|评价(0)|浏览(131)

本文整理了Java中rx.Observable.collect()方法的一些代码示例,展示了Observable.collect()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.collect()方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:collect

Observable.collect介绍

[英]Collects items emitted by the source Observable into a single mutable data structure and returns an Observable that emits this structure.

This is a simplified version of reduce that does not need to return the state on each pass. Backpressure Support: This operator does not support backpressure because by intent it will receive all values and reduce them to a single onNext. Scheduler: collect does not operate by default on a particular Scheduler.
[中]将源可观测项发出的项收集到单个可变数据结构中,并返回发出此结构的可观测项。
这是reduce的简化版本,不需要在每次传递时返回状态。背压支持:此运算符不支持背压,因为它会接收所有值,并将它们减少到单个onNext。调度程序:默认情况下,collect不会在特定调度程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxNetty

  1. @Override
  2. public Observable<ByteBuf> call(Observable<ByteBuf> upstream) {
  3. return upstream
  4. .collect(
  5. new Func0<CompositeByteBuf>() {
  6. @Override
  7. public CompositeByteBuf call() {
  8. return Unpooled.compositeBuffer();
  9. }
  10. },
  11. new Action2<CompositeByteBuf, ByteBuf>() {
  12. @Override
  13. public void call(CompositeByteBuf collector, ByteBuf buf) {
  14. long newLength = collector.readableBytes() + buf.readableBytes();
  15. if (newLength <= maxBytes) {
  16. collector.addComponent(true, buf);
  17. } else {
  18. collector.release();
  19. buf.release();
  20. throw new TooMuchDataException("More than " + maxBytes + "B received");
  21. }
  22. }
  23. }
  24. )
  25. .cast(ByteBuf.class);
  26. }

代码示例来源:origin: apache/usergrid

  1. @Override
  2. public Set<String> getConnectionsAsSource( final EntityRef entityRef ) {
  3. Preconditions.checkNotNull(entityRef, "entityRef cannot be null");
  4. final GraphManager graphManager = graphManagerFactory.createEdgeManager( applicationScope );
  5. final SearchEdgeType searchByEdgeType = createConnectionTypeSearch( entityRef.asId() );
  6. return graphManager.getEdgeTypesFromSource(
  7. searchByEdgeType ).map( edgeName -> getConnectionNameFromEdgeName( edgeName ) )
  8. .collect( () -> new HashSet<String>(), ( r, s ) -> r.add( s ) ).toBlocking().last();
  9. }

代码示例来源:origin: apache/usergrid

  1. @Override
  2. public Set<String> getConnectionsAsTarget( final EntityRef entityRef ) {
  3. Preconditions.checkNotNull( entityRef, "entityRef cannot be null" );
  4. final GraphManager graphManager = graphManagerFactory.createEdgeManager( applicationScope );
  5. final SearchEdgeType searchByEdgeType = createConnectionTypeSearch( entityRef.asId() );
  6. return graphManager.getEdgeTypesToTarget(searchByEdgeType).map(
  7. edgeName -> getConnectionNameFromEdgeName( edgeName ) )
  8. .collect( () -> new HashSet<String>( ), ( r, s ) -> r.add(s) ).toBlocking().last();
  9. }

代码示例来源:origin: apache/usergrid

  1. @Override
  2. public Set<String> getConnectionTypes( boolean filterConnection ) throws Exception {
  3. final GraphManager gm = managerCache.getGraphManager( applicationScope );
  4. Observable<String> edges =
  5. gm.getEdgeTypesFromSource( new SimpleSearchEdgeType( cpHeadEntity.getId(), null, null ) );
  6. return edges.collect( () -> new HashSet<String>(), ( edgeSet, edge ) -> {
  7. edgeSet.add( CpNamingUtils.getNameFromEdgeType( edge ) );
  8. } ).toBlocking().last();
  9. }

代码示例来源:origin: apache/usergrid

  1. private Observable.Transformer<IndexOperationMessage, IndexOperationMessage> applyCollector(AsyncEventQueueType queueType) {
  2. return observable -> observable
  3. .collect(() -> new IndexOperationMessage(), (collector, single) -> collector.ingest(single))
  4. .filter(msg -> !msg.isEmpty())
  5. .doOnNext(indexOperation -> {
  6. asyncEventService.queueIndexOperationMessage(indexOperation, queueType);
  7. });
  8. }

代码示例来源:origin: apache/usergrid

  1. private Observable.Transformer<IndexOperationMessage, IndexOperationMessage> applyCollector(AsyncEventQueueType queueType) {
  2. return observable -> observable
  3. .collect(() -> new IndexOperationMessage(), (collector, single) -> collector.ingest(single))
  4. .filter(msg -> !msg.isEmpty())
  5. .doOnNext(indexOperation -> {
  6. asyncEventService.queueIndexOperationMessage(indexOperation, queueType);
  7. });
  8. }

代码示例来源:origin: apache/usergrid

  1. @Override
  2. public Set<String> getCollectionIndexes( String collectionName ) throws Exception {
  3. GraphManager gm = managerCache.getGraphManager( applicationScope );
  4. String edgeTypePrefix = CpNamingUtils.getEdgeTypeFromCollectionName( collectionName );
  5. if (logger.isTraceEnabled()) {
  6. logger.trace("getCollectionIndexes(): Searching for edge type prefix {} to target {}:{}",
  7. edgeTypePrefix, cpHeadEntity.getId().getType(), cpHeadEntity.getId().getUuid() );
  8. }
  9. Observable<Set<String>> types =
  10. gm.getEdgeTypesFromSource( new SimpleSearchEdgeType( cpHeadEntity.getId(), edgeTypePrefix, null ) )
  11. .collect( () -> new HashSet<>(), ( set, type ) -> set.add( type ) );
  12. return types.toBlocking().last();
  13. }

代码示例来源:origin: apache/usergrid

  1. @Override
  2. public Observable<ResultsPage<T>> call( final Observable<FilterResult<T>> filterResultObservable ) {
  3. final int limit = pipelineContext.getLimit();
  4. return filterResultObservable
  5. .buffer( limit )
  6. .flatMap( buffer
  7. -> Observable
  8. .from( buffer )
  9. .collect(() -> new ResultsPageWithCursorCollector( limit ), ( collector, element ) -> collector.add( element ) )
  10. )
  11. .map( resultsPageCollector ->
  12. new ResultsPage(
  13. resultsPageCollector.results,
  14. new ResponseCursor( resultsPageCollector.lastPath ), pipelineContext.getLimit()
  15. )
  16. );
  17. }

代码示例来源:origin: apache/usergrid

  1. @Test()
  2. public void testSequence(){
  3. ArrayList listReturn = Observable.range(0, 1).flatMap(i -> Observable.empty())
  4. .collect(()->new ArrayList(),(list,i) ->{
  5. list.add(i);
  6. }).toBlocking().lastOrDefault(null);
  7. Assert.assertEquals(listReturn,new ArrayList<Integer>());
  8. }

代码示例来源:origin: apache/usergrid

  1. @Test()
  2. public void testSequence2(){
  3. ArrayList listReturn = Observable.range(0, 2).buffer(2).flatMap(i -> Observable.empty())
  4. .collect(()->new ArrayList(),(list,i) ->{
  5. list.add(i);
  6. }).toBlocking().lastOrDefault(null);
  7. Assert.assertEquals(listReturn,new ArrayList<Integer>());
  8. }

代码示例来源:origin: apache/usergrid

  1. .collect(() -> initRequest(), (bulkRequestBuilder, batchOperation) -> {
  2. if (logger.isTraceEnabled()) {
  3. logger.trace("adding operation {} to bulkRequestBuilder {}", batchOperation, bulkRequestBuilder);

代码示例来源:origin: apache/usergrid

  1. }, 10 ).collect( () -> new EntitySetImpl( entityIds.size() ), ( ( entitySet, rows ) -> {
  2. final Iterator<Row<ScopedRowKey<Id>, Boolean>> latestEntityColumns = rows.iterator();

代码示例来源:origin: apache/usergrid

  1. @Override
  2. public Map<String, Long> getEachCollectionSize(ApplicationScope applicationScope) {
  3. final IndexLocationStrategy indexLocationStrategy = indexLocationStrategyFactory.getIndexLocationStrategy(applicationScope);
  4. EntityIndex entityIndex = entityIndexFactory.createEntityIndex(indexLocationStrategy);
  5. GraphManager graphManager = graphManagerFactory.createEdgeManager(applicationScope);
  6. Map<String,Long> sumMap = ObservableTimer.time(
  7. graphManager.getEdgeTypesFromSource(new SimpleSearchEdgeType(applicationScope.getApplication(), CpNamingUtils.EDGE_COLL_PREFIX, Optional.<String>absent()))
  8. .collect(() -> new HashMap<String,Long>(), ((map, type) ->
  9. {
  10. SearchEdge edge = CpNamingUtils.createCollectionSearchEdge(applicationScope.getApplication(), type);
  11. final String collectionName = CpNamingUtils.getCollectionNameFromEdgeName(type);
  12. long sumType = entityIndex.getTotalEntitySizeInBytes(edge);
  13. map.put(collectionName,sumType);
  14. })
  15. )
  16. , sumTimer).toBlocking().last();
  17. return sumMap;
  18. }

代码示例来源:origin: apache/usergrid

  1. buffer -> Observable.from( buffer ).collect( () -> keyspace.prepareMutationBatch(),
  2. ( ( mutationBatch, mvccLogEntryCollectionIoEvent ) -> {

代码示例来源:origin: apache/usergrid

  1. @Override
  2. public Observable<IndexOperationMessage> indexEntity( final ApplicationScope applicationScope,
  3. final Entity entity ) {
  4. //bootstrap the lower modules from their caches
  5. final GraphManager gm = graphManagerFactory.createEdgeManager( applicationScope );
  6. final EntityIndex ei = entityIndexFactory.createEntityIndex(indexLocationStrategyFactory.getIndexLocationStrategy(applicationScope));
  7. final Id entityId = entity.getId();
  8. //we always index in the target scope
  9. final Observable<Edge> edgesToTarget = edgesObservable.edgesToTarget( gm, entityId, true);
  10. //we may have to index we're indexing from source->target here
  11. final Observable<IndexEdge> sourceEdgesToIndex = edgesToTarget.map( edge -> generateScopeFromSource( edge ) );
  12. //do our observable for batching
  13. //try to send a whole batch if we can
  14. final Observable<IndexOperationMessage> batches = sourceEdgesToIndex
  15. .buffer(indexFig.getIndexBatchSize() )
  16. //map into batches based on our buffer size
  17. .flatMap( buffer -> Observable.from( buffer )
  18. //collect results into a single batch
  19. .collect( () -> ei.createBatch(), ( batch, indexEdge ) -> {
  20. if (logger.isDebugEnabled()) {
  21. logger.debug("adding edge {} to batch for entity {}", indexEdge, entity);
  22. }
  23. final Optional<Set<String>> fieldsToIndex = getFilteredStringObjectMap( indexEdge );
  24. batch.index( indexEdge, entity ,fieldsToIndex);
  25. } )
  26. //return the future from the batch execution
  27. .map( batch -> batch.build() ) );
  28. return ObservableTimer.time( batches, indexTimer );
  29. }

代码示例来源:origin: apache/usergrid

  1. .flatMap( entitySet -> Observable.from( entitySet.getEntities() ) )
  2. .collect( () -> new HashMap<String, UUID>(), ( appMap, entity ) -> {

代码示例来源:origin: apache/usergrid

  1. @Test()
  2. public void testSequence3(){
  3. ArrayList listReturn = Observable.range(0, 2)
  4. .collect(()->new ArrayList(),(list,i) ->{
  5. list.add(i);
  6. }).toBlocking().first();
  7. Assert.assertEquals(listReturn, Observable.range(0, 2).toList().toBlocking().last());
  8. }

代码示例来源:origin: apache/usergrid

  1. return edges.collect( () -> new LinkedHashMap<EntityRef, Set<String>>(), ( entityRefSetMap, edge ) -> {
  2. if ( fromEntityType != null && !fromEntityType.equals( edge.getSourceNode().getType() ) ) {
  3. if (logger.isDebugEnabled()) {

代码示例来源:origin: apache/usergrid

  1. .collect( () -> new DataLoadResult(), ( dataloadResult, entitySearchResult ) -> {
  2. if ( entitySearchResult.found ) {
  3. dataloadResult.success();

代码示例来源:origin: apache/usergrid

  1. Observable.from( entities ).collect( () -> entityIndex.createBatch(), ( entityIndexBatch, entity ) -> {
  2. IndexEdge edge = new IndexEdgeImpl( indexEdge.getNodeId(), indexEdge.getEdgeName(),
  3. SearchEdge.NodeType.SOURCE, edgeCounter.incrementAndGet() );

相关文章

Observable类方法