本文整理了Java中rx.Observable.merge()
方法的一些代码示例,展示了Observable.merge()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.merge()
方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:merge
[英]Flattens an Iterable of Observables into one Observable, without any transformation.
You can combine the items emitted by multiple Observables so that they appear as a single Observable, by using the merge method. Scheduler: merge does not operate by default on a particular Scheduler.
[中]将一个可观测的可观测值展平为一个可观测值,无需任何转换。
通过使用merge方法,可以将多个可观察对象发出的项组合起来,使它们显示为单个可观察对象。计划程序:默认情况下,合并不会在特定计划程序上运行。
代码示例来源:origin: apache/usergrid
@Override
public Observable<EntityIdScope> getEntities( final Observable<ApplicationScope> appScopes ) {
return appScopes.flatMap( applicationScope -> {
final GraphManager gm = graphManagerFactory.createEdgeManager( applicationScope );
final Id applicationId = applicationScope.getApplication();
//load all nodes that are targets of our application node. I.E.
// entities that have been saved
final Observable<Id> entityNodes = targetIdObservable.getTargetNodes( gm, applicationId );
//create our application node to emit since it's an entity as well
final Observable<Id> applicationNode = Observable.just( applicationId );
//merge both the specified application node and the entity node
// so they all get used
return Observable.merge( applicationNode, entityNodes ).
map( id -> new EntityIdScope( applicationScope, id ) );
} );
}
代码示例来源:origin: apache/usergrid
merge = current;
else {
merge = Observable.merge(merge,current);
代码示例来源:origin: apache/usergrid
return Observable.merge( targetEdges, sourceEdges )
return Observable.merge( sourceMetaCleanup, targetMetaCleanup ).lastOrDefault( null )
.flatMap(integer -> Observable.from( markedEdges ));
} );
代码示例来源:origin: PipelineAI/pipeline
Observable.merge(commands).subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
代码示例来源:origin: apache/usergrid
final Observable<BatchOperation> batchOps = Observable.merge(index, deIndex);
代码示例来源:origin: apache/usergrid
});
return Observable.merge( systemIds, appIds );
代码示例来源:origin: apache/usergrid
public Observable<Integer> receive( final ApplicationScope scope, final MarkedEdge edge,
final UUID eventTimestamp ) {
final long maxTimestamp = edge.getTimestamp();
return edgeDeleteRepair.repair( scope, edge, eventTimestamp )
.flatMap( markedEdge -> {
Observable<Integer> sourceDelete = edgeMetaRepair
.repairSources( scope, edge.getSourceNode(), edge.getType(), maxTimestamp );
Observable<Integer> targetDelete = edgeMetaRepair
.repairTargets( scope, edge.getTargetNode(), edge.getType(), maxTimestamp );
return MathObservable.sumInteger( Observable.merge( sourceDelete, targetDelete ) );
} );
}
}
代码示例来源:origin: apache/usergrid
return MathObservable.sumInteger( Observable.merge( checks ) )
.doOnNext( count -> {
代码示例来源:origin: PipelineAI/pipeline
Observable<String> overall = Observable.merge(observables);
代码示例来源:origin: PipelineAI/pipeline
Observable.merge(cmdResults).subscribe(new Subscriber<Boolean>() {
@Override
public void onCompleted() {
代码示例来源:origin: PipelineAI/pipeline
@Test
public void testReactive() throws Exception {
final Observable<User> u1 = userService.getUserByIdReactive("1");
final Observable<User> u2 = userService.getUserByIdReactive("2");
final Observable<User> u3 = userService.getUserByIdReactive("3");
final Observable<User> u4 = userService.getUserByIdReactive("4");
final Observable<User> u5 = userService.getUserByIdReactive("5");
final Iterable<User> users = Observable.merge(u1, u2, u3, u4, u5).toBlocking().toIterable();
Set<String> expectedIds = Sets.newHashSet("1", "2", "3", "4", "5");
for (User cUser : users) {
assertEquals(expectedIds.remove(cUser.getId()), true);
}
assertEquals(expectedIds.isEmpty(), true);
assertEquals(1, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size());
HystrixInvokableInfo<?> command = HystrixRequestLog.getCurrentRequest()
.getAllExecutedCommands().iterator().next();
// assert the command is the one we're expecting
assertEquals("getUserByIds", command.getCommandKey().name());
// confirm that it was a COLLAPSED command execution
assertTrue(command.getExecutionEvents().contains(HystrixEventType.COLLAPSED));
// and that it was successful
assertTrue(command.getExecutionEvents().contains(HystrixEventType.SUCCESS));
}
代码示例来源:origin: PipelineAI/pipeline
final List<Boolean> blockingList = Observable.merge(results).toList().toBlocking().single();
代码示例来源:origin: apache/usergrid
int returned = Observable.merge( input1, input2 ).buffer( 1000 )
.flatMap( new Func1<List<Integer>, Observable<Integer>>() {
@Override
代码示例来源:origin: PipelineAI/pipeline
Observable.merge(results)
.subscribeOn(Schedulers.computation())
.subscribe(new Subscriber<Boolean>() {
代码示例来源:origin: PipelineAI/pipeline
final LatchedSemaphoreCommand command2 = new LatchedSemaphoreCommand(circuitBreaker, semaphore, startLatch, sharedLatch);
Observable<Boolean> merged = Observable.merge(command1.toObservable(), command2.toObservable())
.subscribeOn(Schedulers.computation());
代码示例来源:origin: apache/usergrid
.merge( copyConnections, createNodeGraph, deleteAppFromIndex )
.doOnCompleted( () -> {
try {
代码示例来源:origin: jhusain/learnrxjava
private static void mergingSyncMadeAsync() {
// if you have something synchronous and want to make it async, you can schedule it like this
// so here we see both executed concurrently
Observable.merge(getDataSync(1).subscribeOn(Schedulers.io()), getDataSync(2).subscribeOn(Schedulers.io())).toBlocking().forEach(System.out::println);
}
代码示例来源:origin: PipelineAI/pipeline
Observable.merge(result1, result2).toList().toBlocking().single(); //await the 2 latent commands
代码示例来源:origin: jhusain/learnrxjava
private static void mergingSync() {
// here you'll see the delay as each is executed synchronously
Observable.merge(getDataSync(1), getDataSync(2)).toBlocking().forEach(System.out::println);
}
代码示例来源:origin: jhusain/learnrxjava
private static void mergingAsync() {
Observable.merge(getDataAsync(1), getDataAsync(2)).toBlocking().forEach(System.out::println);
}
内容来源于网络,如有侵权,请联系作者删除!