rx.Observable.merge()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(5.9k)|赞(0)|评价(0)|浏览(182)

本文整理了Java中rx.Observable.merge()方法的一些代码示例,展示了Observable.merge()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.merge()方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:merge

Observable.merge介绍

[英]Flattens an Iterable of Observables into one Observable, without any transformation.

You can combine the items emitted by multiple Observables so that they appear as a single Observable, by using the merge method. Scheduler: merge does not operate by default on a particular Scheduler.
[中]将一个可观测的可观测值展平为一个可观测值,无需任何转换。
通过使用merge方法,可以将多个可观察对象发出的项组合起来,使它们显示为单个可观察对象。计划程序:默认情况下,合并不会在特定计划程序上运行。

代码示例

代码示例来源:origin: apache/usergrid

  1. @Override
  2. public Observable<EntityIdScope> getEntities( final Observable<ApplicationScope> appScopes ) {
  3. return appScopes.flatMap( applicationScope -> {
  4. final GraphManager gm = graphManagerFactory.createEdgeManager( applicationScope );
  5. final Id applicationId = applicationScope.getApplication();
  6. //load all nodes that are targets of our application node. I.E.
  7. // entities that have been saved
  8. final Observable<Id> entityNodes = targetIdObservable.getTargetNodes( gm, applicationId );
  9. //create our application node to emit since it's an entity as well
  10. final Observable<Id> applicationNode = Observable.just( applicationId );
  11. //merge both the specified application node and the entity node
  12. // so they all get used
  13. return Observable.merge( applicationNode, entityNodes ).
  14. map( id -> new EntityIdScope( applicationScope, id ) );
  15. } );
  16. }

代码示例来源:origin: apache/usergrid

  1. merge = current;
  2. else {
  3. merge = Observable.merge(merge,current);

代码示例来源:origin: apache/usergrid

  1. return Observable.merge( targetEdges, sourceEdges )
  2. return Observable.merge( sourceMetaCleanup, targetMetaCleanup ).lastOrDefault( null )
  3. .flatMap(integer -> Observable.from( markedEdges ));
  4. } );

代码示例来源:origin: PipelineAI/pipeline

  1. Observable.merge(commands).subscribe(new Subscriber<Integer>() {
  2. @Override
  3. public void onCompleted() {

代码示例来源:origin: apache/usergrid

  1. final Observable<BatchOperation> batchOps = Observable.merge(index, deIndex);

代码示例来源:origin: apache/usergrid

  1. });
  2. return Observable.merge( systemIds, appIds );

代码示例来源:origin: apache/usergrid

  1. public Observable<Integer> receive( final ApplicationScope scope, final MarkedEdge edge,
  2. final UUID eventTimestamp ) {
  3. final long maxTimestamp = edge.getTimestamp();
  4. return edgeDeleteRepair.repair( scope, edge, eventTimestamp )
  5. .flatMap( markedEdge -> {
  6. Observable<Integer> sourceDelete = edgeMetaRepair
  7. .repairSources( scope, edge.getSourceNode(), edge.getType(), maxTimestamp );
  8. Observable<Integer> targetDelete = edgeMetaRepair
  9. .repairTargets( scope, edge.getTargetNode(), edge.getType(), maxTimestamp );
  10. return MathObservable.sumInteger( Observable.merge( sourceDelete, targetDelete ) );
  11. } );
  12. }
  13. }

代码示例来源:origin: apache/usergrid

  1. return MathObservable.sumInteger( Observable.merge( checks ) )
  2. .doOnNext( count -> {

代码示例来源:origin: PipelineAI/pipeline

  1. Observable<String> overall = Observable.merge(observables);

代码示例来源:origin: PipelineAI/pipeline

  1. Observable.merge(cmdResults).subscribe(new Subscriber<Boolean>() {
  2. @Override
  3. public void onCompleted() {

代码示例来源:origin: PipelineAI/pipeline

  1. @Test
  2. public void testReactive() throws Exception {
  3. final Observable<User> u1 = userService.getUserByIdReactive("1");
  4. final Observable<User> u2 = userService.getUserByIdReactive("2");
  5. final Observable<User> u3 = userService.getUserByIdReactive("3");
  6. final Observable<User> u4 = userService.getUserByIdReactive("4");
  7. final Observable<User> u5 = userService.getUserByIdReactive("5");
  8. final Iterable<User> users = Observable.merge(u1, u2, u3, u4, u5).toBlocking().toIterable();
  9. Set<String> expectedIds = Sets.newHashSet("1", "2", "3", "4", "5");
  10. for (User cUser : users) {
  11. assertEquals(expectedIds.remove(cUser.getId()), true);
  12. }
  13. assertEquals(expectedIds.isEmpty(), true);
  14. assertEquals(1, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size());
  15. HystrixInvokableInfo<?> command = HystrixRequestLog.getCurrentRequest()
  16. .getAllExecutedCommands().iterator().next();
  17. // assert the command is the one we're expecting
  18. assertEquals("getUserByIds", command.getCommandKey().name());
  19. // confirm that it was a COLLAPSED command execution
  20. assertTrue(command.getExecutionEvents().contains(HystrixEventType.COLLAPSED));
  21. // and that it was successful
  22. assertTrue(command.getExecutionEvents().contains(HystrixEventType.SUCCESS));
  23. }

代码示例来源:origin: PipelineAI/pipeline

  1. final List<Boolean> blockingList = Observable.merge(results).toList().toBlocking().single();

代码示例来源:origin: apache/usergrid

  1. int returned = Observable.merge( input1, input2 ).buffer( 1000 )
  2. .flatMap( new Func1<List<Integer>, Observable<Integer>>() {
  3. @Override

代码示例来源:origin: PipelineAI/pipeline

  1. Observable.merge(results)
  2. .subscribeOn(Schedulers.computation())
  3. .subscribe(new Subscriber<Boolean>() {

代码示例来源:origin: PipelineAI/pipeline

  1. final LatchedSemaphoreCommand command2 = new LatchedSemaphoreCommand(circuitBreaker, semaphore, startLatch, sharedLatch);
  2. Observable<Boolean> merged = Observable.merge(command1.toObservable(), command2.toObservable())
  3. .subscribeOn(Schedulers.computation());

代码示例来源:origin: apache/usergrid

  1. .merge( copyConnections, createNodeGraph, deleteAppFromIndex )
  2. .doOnCompleted( () -> {
  3. try {

代码示例来源:origin: jhusain/learnrxjava

  1. private static void mergingSyncMadeAsync() {
  2. // if you have something synchronous and want to make it async, you can schedule it like this
  3. // so here we see both executed concurrently
  4. Observable.merge(getDataSync(1).subscribeOn(Schedulers.io()), getDataSync(2).subscribeOn(Schedulers.io())).toBlocking().forEach(System.out::println);
  5. }

代码示例来源:origin: PipelineAI/pipeline

  1. Observable.merge(result1, result2).toList().toBlocking().single(); //await the 2 latent commands

代码示例来源:origin: jhusain/learnrxjava

  1. private static void mergingSync() {
  2. // here you'll see the delay as each is executed synchronously
  3. Observable.merge(getDataSync(1), getDataSync(2)).toBlocking().forEach(System.out::println);
  4. }

代码示例来源:origin: jhusain/learnrxjava

  1. private static void mergingAsync() {
  2. Observable.merge(getDataAsync(1), getDataAsync(2)).toBlocking().forEach(System.out::println);
  3. }

相关文章

Observable类方法