本文整理了Java中rx.Observable.collect()
方法的一些代码示例,展示了Observable.collect()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.collect()
方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:collect
[英]Collects items emitted by the source Observable into a single mutable data structure and returns an Observable that emits this structure.
This is a simplified version of reduce that does not need to return the state on each pass. Backpressure Support: This operator does not support backpressure because by intent it will receive all values and reduce them to a single onNext. Scheduler: collect does not operate by default on a particular Scheduler.
[中]将源可观测项发出的项收集到单个可变数据结构中,并返回发出此结构的可观测项。
这是reduce的简化版本,不需要在每次传递时返回状态。背压支持:此运算符不支持背压,因为它会接收所有值,并将它们减少到单个onNext。调度程序:默认情况下,collect不会在特定调度程序上运行。
代码示例来源:origin: ReactiveX/RxNetty
@Override
public Observable<ByteBuf> call(Observable<ByteBuf> upstream) {
return upstream
.collect(
new Func0<CompositeByteBuf>() {
@Override
public CompositeByteBuf call() {
return Unpooled.compositeBuffer();
}
},
new Action2<CompositeByteBuf, ByteBuf>() {
@Override
public void call(CompositeByteBuf collector, ByteBuf buf) {
long newLength = collector.readableBytes() + buf.readableBytes();
if (newLength <= maxBytes) {
collector.addComponent(true, buf);
} else {
collector.release();
buf.release();
throw new TooMuchDataException("More than " + maxBytes + "B received");
}
}
}
)
.cast(ByteBuf.class);
}
代码示例来源:origin: apache/usergrid
@Override
public Set<String> getConnectionsAsSource( final EntityRef entityRef ) {
Preconditions.checkNotNull(entityRef, "entityRef cannot be null");
final GraphManager graphManager = graphManagerFactory.createEdgeManager( applicationScope );
final SearchEdgeType searchByEdgeType = createConnectionTypeSearch( entityRef.asId() );
return graphManager.getEdgeTypesFromSource(
searchByEdgeType ).map( edgeName -> getConnectionNameFromEdgeName( edgeName ) )
.collect( () -> new HashSet<String>(), ( r, s ) -> r.add( s ) ).toBlocking().last();
}
代码示例来源:origin: apache/usergrid
@Override
public Set<String> getConnectionsAsTarget( final EntityRef entityRef ) {
Preconditions.checkNotNull( entityRef, "entityRef cannot be null" );
final GraphManager graphManager = graphManagerFactory.createEdgeManager( applicationScope );
final SearchEdgeType searchByEdgeType = createConnectionTypeSearch( entityRef.asId() );
return graphManager.getEdgeTypesToTarget(searchByEdgeType).map(
edgeName -> getConnectionNameFromEdgeName( edgeName ) )
.collect( () -> new HashSet<String>( ), ( r, s ) -> r.add(s) ).toBlocking().last();
}
代码示例来源:origin: apache/usergrid
@Override
public Set<String> getConnectionTypes( boolean filterConnection ) throws Exception {
final GraphManager gm = managerCache.getGraphManager( applicationScope );
Observable<String> edges =
gm.getEdgeTypesFromSource( new SimpleSearchEdgeType( cpHeadEntity.getId(), null, null ) );
return edges.collect( () -> new HashSet<String>(), ( edgeSet, edge ) -> {
edgeSet.add( CpNamingUtils.getNameFromEdgeType( edge ) );
} ).toBlocking().last();
}
代码示例来源:origin: apache/usergrid
private Observable.Transformer<IndexOperationMessage, IndexOperationMessage> applyCollector(AsyncEventQueueType queueType) {
return observable -> observable
.collect(() -> new IndexOperationMessage(), (collector, single) -> collector.ingest(single))
.filter(msg -> !msg.isEmpty())
.doOnNext(indexOperation -> {
asyncEventService.queueIndexOperationMessage(indexOperation, queueType);
});
}
代码示例来源:origin: apache/usergrid
private Observable.Transformer<IndexOperationMessage, IndexOperationMessage> applyCollector(AsyncEventQueueType queueType) {
return observable -> observable
.collect(() -> new IndexOperationMessage(), (collector, single) -> collector.ingest(single))
.filter(msg -> !msg.isEmpty())
.doOnNext(indexOperation -> {
asyncEventService.queueIndexOperationMessage(indexOperation, queueType);
});
}
代码示例来源:origin: apache/usergrid
@Override
public Set<String> getCollectionIndexes( String collectionName ) throws Exception {
GraphManager gm = managerCache.getGraphManager( applicationScope );
String edgeTypePrefix = CpNamingUtils.getEdgeTypeFromCollectionName( collectionName );
if (logger.isTraceEnabled()) {
logger.trace("getCollectionIndexes(): Searching for edge type prefix {} to target {}:{}",
edgeTypePrefix, cpHeadEntity.getId().getType(), cpHeadEntity.getId().getUuid() );
}
Observable<Set<String>> types =
gm.getEdgeTypesFromSource( new SimpleSearchEdgeType( cpHeadEntity.getId(), edgeTypePrefix, null ) )
.collect( () -> new HashSet<>(), ( set, type ) -> set.add( type ) );
return types.toBlocking().last();
}
代码示例来源:origin: apache/usergrid
@Override
public Observable<ResultsPage<T>> call( final Observable<FilterResult<T>> filterResultObservable ) {
final int limit = pipelineContext.getLimit();
return filterResultObservable
.buffer( limit )
.flatMap( buffer
-> Observable
.from( buffer )
.collect(() -> new ResultsPageWithCursorCollector( limit ), ( collector, element ) -> collector.add( element ) )
)
.map( resultsPageCollector ->
new ResultsPage(
resultsPageCollector.results,
new ResponseCursor( resultsPageCollector.lastPath ), pipelineContext.getLimit()
)
);
}
代码示例来源:origin: apache/usergrid
@Test()
public void testSequence(){
ArrayList listReturn = Observable.range(0, 1).flatMap(i -> Observable.empty())
.collect(()->new ArrayList(),(list,i) ->{
list.add(i);
}).toBlocking().lastOrDefault(null);
Assert.assertEquals(listReturn,new ArrayList<Integer>());
}
代码示例来源:origin: apache/usergrid
@Test()
public void testSequence2(){
ArrayList listReturn = Observable.range(0, 2).buffer(2).flatMap(i -> Observable.empty())
.collect(()->new ArrayList(),(list,i) ->{
list.add(i);
}).toBlocking().lastOrDefault(null);
Assert.assertEquals(listReturn,new ArrayList<Integer>());
}
代码示例来源:origin: apache/usergrid
.collect(() -> initRequest(), (bulkRequestBuilder, batchOperation) -> {
if (logger.isTraceEnabled()) {
logger.trace("adding operation {} to bulkRequestBuilder {}", batchOperation, bulkRequestBuilder);
代码示例来源:origin: apache/usergrid
}, 10 ).collect( () -> new EntitySetImpl( entityIds.size() ), ( ( entitySet, rows ) -> {
final Iterator<Row<ScopedRowKey<Id>, Boolean>> latestEntityColumns = rows.iterator();
代码示例来源:origin: apache/usergrid
@Override
public Map<String, Long> getEachCollectionSize(ApplicationScope applicationScope) {
final IndexLocationStrategy indexLocationStrategy = indexLocationStrategyFactory.getIndexLocationStrategy(applicationScope);
EntityIndex entityIndex = entityIndexFactory.createEntityIndex(indexLocationStrategy);
GraphManager graphManager = graphManagerFactory.createEdgeManager(applicationScope);
Map<String,Long> sumMap = ObservableTimer.time(
graphManager.getEdgeTypesFromSource(new SimpleSearchEdgeType(applicationScope.getApplication(), CpNamingUtils.EDGE_COLL_PREFIX, Optional.<String>absent()))
.collect(() -> new HashMap<String,Long>(), ((map, type) ->
{
SearchEdge edge = CpNamingUtils.createCollectionSearchEdge(applicationScope.getApplication(), type);
final String collectionName = CpNamingUtils.getCollectionNameFromEdgeName(type);
long sumType = entityIndex.getTotalEntitySizeInBytes(edge);
map.put(collectionName,sumType);
})
)
, sumTimer).toBlocking().last();
return sumMap;
}
代码示例来源:origin: apache/usergrid
buffer -> Observable.from( buffer ).collect( () -> keyspace.prepareMutationBatch(),
( ( mutationBatch, mvccLogEntryCollectionIoEvent ) -> {
代码示例来源:origin: apache/usergrid
@Override
public Observable<IndexOperationMessage> indexEntity( final ApplicationScope applicationScope,
final Entity entity ) {
//bootstrap the lower modules from their caches
final GraphManager gm = graphManagerFactory.createEdgeManager( applicationScope );
final EntityIndex ei = entityIndexFactory.createEntityIndex(indexLocationStrategyFactory.getIndexLocationStrategy(applicationScope));
final Id entityId = entity.getId();
//we always index in the target scope
final Observable<Edge> edgesToTarget = edgesObservable.edgesToTarget( gm, entityId, true);
//we may have to index we're indexing from source->target here
final Observable<IndexEdge> sourceEdgesToIndex = edgesToTarget.map( edge -> generateScopeFromSource( edge ) );
//do our observable for batching
//try to send a whole batch if we can
final Observable<IndexOperationMessage> batches = sourceEdgesToIndex
.buffer(indexFig.getIndexBatchSize() )
//map into batches based on our buffer size
.flatMap( buffer -> Observable.from( buffer )
//collect results into a single batch
.collect( () -> ei.createBatch(), ( batch, indexEdge ) -> {
if (logger.isDebugEnabled()) {
logger.debug("adding edge {} to batch for entity {}", indexEdge, entity);
}
final Optional<Set<String>> fieldsToIndex = getFilteredStringObjectMap( indexEdge );
batch.index( indexEdge, entity ,fieldsToIndex);
} )
//return the future from the batch execution
.map( batch -> batch.build() ) );
return ObservableTimer.time( batches, indexTimer );
}
代码示例来源:origin: apache/usergrid
.flatMap( entitySet -> Observable.from( entitySet.getEntities() ) )
.collect( () -> new HashMap<String, UUID>(), ( appMap, entity ) -> {
代码示例来源:origin: apache/usergrid
@Test()
public void testSequence3(){
ArrayList listReturn = Observable.range(0, 2)
.collect(()->new ArrayList(),(list,i) ->{
list.add(i);
}).toBlocking().first();
Assert.assertEquals(listReturn, Observable.range(0, 2).toList().toBlocking().last());
}
代码示例来源:origin: apache/usergrid
return edges.collect( () -> new LinkedHashMap<EntityRef, Set<String>>(), ( entityRefSetMap, edge ) -> {
if ( fromEntityType != null && !fromEntityType.equals( edge.getSourceNode().getType() ) ) {
if (logger.isDebugEnabled()) {
代码示例来源:origin: apache/usergrid
.collect( () -> new DataLoadResult(), ( dataloadResult, entitySearchResult ) -> {
if ( entitySearchResult.found ) {
dataloadResult.success();
代码示例来源:origin: apache/usergrid
Observable.from( entities ).collect( () -> entityIndex.createBatch(), ( entityIndexBatch, entity ) -> {
IndexEdge edge = new IndexEdgeImpl( indexEdge.getNodeId(), indexEdge.getEdgeName(),
SearchEdge.NodeType.SOURCE, edgeCounter.incrementAndGet() );
内容来源于网络,如有侵权,请联系作者删除!