rx.Observable.merge()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(5.9k)|赞(0)|评价(0)|浏览(155)

本文整理了Java中rx.Observable.merge()方法的一些代码示例,展示了Observable.merge()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.merge()方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:merge

Observable.merge介绍

[英]Flattens an Iterable of Observables into one Observable, without any transformation.

You can combine the items emitted by multiple Observables so that they appear as a single Observable, by using the merge method. Scheduler: merge does not operate by default on a particular Scheduler.
[中]将一个可观测的可观测值展平为一个可观测值,无需任何转换。
通过使用merge方法,可以将多个可观察对象发出的项组合起来,使它们显示为单个可观察对象。计划程序:默认情况下,合并不会在特定计划程序上运行。

代码示例

代码示例来源:origin: apache/usergrid

@Override
public Observable<EntityIdScope> getEntities( final Observable<ApplicationScope> appScopes ) {
  return appScopes.flatMap( applicationScope -> {
    final GraphManager gm = graphManagerFactory.createEdgeManager( applicationScope );
    final Id applicationId = applicationScope.getApplication();
    //load all nodes that are targets of our application node.  I.E.
    // entities that have been saved
    final Observable<Id> entityNodes = targetIdObservable.getTargetNodes( gm, applicationId );
    //create our application node to emit since it's an entity as well
    final Observable<Id> applicationNode = Observable.just( applicationId );
    //merge both the specified application node and the entity node
    // so they all get used
    return Observable.merge( applicationNode, entityNodes ).
      map( id -> new EntityIdScope( applicationScope, id ) );
  } );
}

代码示例来源:origin: apache/usergrid

merge = current;
else {
  merge = Observable.merge(merge,current);

代码示例来源:origin: apache/usergrid

return Observable.merge( targetEdges, sourceEdges )
      return Observable.merge( sourceMetaCleanup, targetMetaCleanup ).lastOrDefault( null )
               .flatMap(integer -> Observable.from( markedEdges ));
    } );

代码示例来源:origin: PipelineAI/pipeline

Observable.merge(commands).subscribe(new Subscriber<Integer>() {
  @Override
  public void onCompleted() {

代码示例来源:origin: apache/usergrid

final Observable<BatchOperation> batchOps = Observable.merge(index, deIndex);

代码示例来源:origin: apache/usergrid

});
return Observable.merge( systemIds, appIds );

代码示例来源:origin: apache/usergrid

public Observable<Integer> receive( final ApplicationScope scope, final MarkedEdge edge,
                    final UUID eventTimestamp ) {

    final long maxTimestamp = edge.getTimestamp();



    return edgeDeleteRepair.repair( scope, edge, eventTimestamp )
                .flatMap( markedEdge -> {

                  Observable<Integer> sourceDelete = edgeMetaRepair
                      .repairSources( scope, edge.getSourceNode(), edge.getType(), maxTimestamp );

                  Observable<Integer> targetDelete = edgeMetaRepair
                      .repairTargets( scope, edge.getTargetNode(), edge.getType(), maxTimestamp );

                  return MathObservable.sumInteger( Observable.merge( sourceDelete, targetDelete ) );
                } );
  }
}

代码示例来源:origin: apache/usergrid

return MathObservable.sumInteger( Observable.merge( checks ) )
           .doOnNext( count -> {

代码示例来源:origin: PipelineAI/pipeline

Observable<String> overall = Observable.merge(observables);

代码示例来源:origin: PipelineAI/pipeline

Observable.merge(cmdResults).subscribe(new Subscriber<Boolean>() {
  @Override
  public void onCompleted() {

代码示例来源:origin: PipelineAI/pipeline

@Test
public void testReactive() throws Exception {
  final Observable<User> u1 = userService.getUserByIdReactive("1");
  final Observable<User> u2 = userService.getUserByIdReactive("2");
  final Observable<User> u3 = userService.getUserByIdReactive("3");
  final Observable<User> u4 = userService.getUserByIdReactive("4");
  final Observable<User> u5 = userService.getUserByIdReactive("5");
  final Iterable<User> users = Observable.merge(u1, u2, u3, u4, u5).toBlocking().toIterable();
  Set<String> expectedIds = Sets.newHashSet("1", "2", "3", "4", "5");
  for (User cUser : users) {
    assertEquals(expectedIds.remove(cUser.getId()), true);
  }
  assertEquals(expectedIds.isEmpty(), true);
  assertEquals(1, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size());
  HystrixInvokableInfo<?> command = HystrixRequestLog.getCurrentRequest()
      .getAllExecutedCommands().iterator().next();
  // assert the command is the one we're expecting
  assertEquals("getUserByIds", command.getCommandKey().name());
  // confirm that it was a COLLAPSED command execution
  assertTrue(command.getExecutionEvents().contains(HystrixEventType.COLLAPSED));
  // and that it was successful
  assertTrue(command.getExecutionEvents().contains(HystrixEventType.SUCCESS));
}

代码示例来源:origin: PipelineAI/pipeline

final List<Boolean> blockingList = Observable.merge(results).toList().toBlocking().single();

代码示例来源:origin: apache/usergrid

int returned = Observable.merge( input1, input2 ).buffer( 1000 )
             .flatMap( new Func1<List<Integer>, Observable<Integer>>() {
                   @Override

代码示例来源:origin: PipelineAI/pipeline

Observable.merge(results)
    .subscribeOn(Schedulers.computation())
    .subscribe(new Subscriber<Boolean>() {

代码示例来源:origin: PipelineAI/pipeline

final LatchedSemaphoreCommand command2 = new LatchedSemaphoreCommand(circuitBreaker, semaphore, startLatch, sharedLatch);
Observable<Boolean> merged = Observable.merge(command1.toObservable(), command2.toObservable())
    .subscribeOn(Schedulers.computation());

代码示例来源:origin: apache/usergrid

.merge( copyConnections, createNodeGraph, deleteAppFromIndex )
.doOnCompleted( () -> {
  try {

代码示例来源:origin: jhusain/learnrxjava

private static void mergingSyncMadeAsync() {
  // if you have something synchronous and want to make it async, you can schedule it like this
  // so here we see both executed concurrently
  Observable.merge(getDataSync(1).subscribeOn(Schedulers.io()), getDataSync(2).subscribeOn(Schedulers.io())).toBlocking().forEach(System.out::println);
}

代码示例来源:origin: PipelineAI/pipeline

Observable.merge(result1, result2).toList().toBlocking().single(); //await the 2 latent commands

代码示例来源:origin: jhusain/learnrxjava

private static void mergingSync() {
  // here you'll see the delay as each is executed synchronously
  Observable.merge(getDataSync(1), getDataSync(2)).toBlocking().forEach(System.out::println);
}

代码示例来源:origin: jhusain/learnrxjava

private static void mergingAsync() {
  Observable.merge(getDataAsync(1), getDataAsync(2)).toBlocking().forEach(System.out::println);
}

相关文章

Observable类方法