rx.Observable.mergeWith()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(7.9k)|赞(0)|评价(0)|浏览(131)

本文整理了Java中rx.Observable.mergeWith()方法的一些代码示例,展示了Observable.mergeWith()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.mergeWith()方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:mergeWith

Observable.mergeWith介绍

[英]Flattens this and another Observable into a single Observable, without any transformation.

You can combine items emitted by multiple Observables so that they appear as a single Observable, by using the mergeWith method. Scheduler: mergeWith does not operate by default on a particular Scheduler.
[中]

代码示例

代码示例来源:origin: vert-x3/vertx-examples

@Override
 public void start() throws Exception {
  HttpClient client = vertx.createHttpClient();
  HttpClientRequest req = client.request(HttpMethod.GET, 8080, "localhost", "/");
  req.toObservable().

    // Status code check and -> Observable<Buffer>
    flatMap(resp -> {
     if (resp.statusCode() != 200) {
      throw new RuntimeException("Wrong status code " + resp.statusCode());
     }
     return Observable.just(Buffer.buffer()).mergeWith(resp.toObservable());
    }).

    // Reduce all buffers in a single buffer
    reduce(Buffer::appendBuffer).

    // Turn in to a string
    map(buffer -> buffer.toString("UTF-8")).

    // Get a single buffer
    subscribe(data -> System.out.println("Server content " + data));

  // End request
  req.end();
 }
}

代码示例来源:origin: apache/usergrid

.mergeWith(graphManager.markNode((Id) id, createGraphOperationTimestamp())).toBlocking().last();
  return id;
})

代码示例来源:origin: apache/usergrid

.mergeWith(gm.markNode(entityId, CpNamingUtils.createGraphOperationTimestamp()))
.toBlocking().last();

代码示例来源:origin: apache/usergrid

/**
 * Marks entity for deletion in entity collection manager and graph.
 * Convert this method to return a list of observables that we can crunch through on return.
 * Returns merged obversable that will mark the edges in the ecm and the graph manager.
 * @param entityRef
 * @return
 */
private Observable markEntity(EntityRef entityRef){
  if(applicationScope == null || entityRef == null){
    return Observable.empty();
  }
  GraphManager gm =  graphManagerFactory.createEdgeManager( applicationScope );
  EntityCollectionManager ecm = managerCache.getEntityCollectionManager( applicationScope );
  Id entityId = new SimpleId( entityRef.getUuid(), entityRef.getType() );
  //Step 1 & 2 of delete
  String region = this.lookupAuthoritativeRegionForType( entityRef.getType() );
  return ecm.mark( entityId, region ).mergeWith( gm.markNode( entityId, createGraphOperationTimestamp() ) );
}

代码示例来源:origin: leeowenowen/rxjava-examples

@Override
 public void run() {
  Observable.just(1, 2, 3)
       .mergeWith(Observable.just(4, 5, 6))
       .subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer integer) {
         log(integer);
        }
       });
 }
});

代码示例来源:origin: com.netflix.eureka/eureka2-server

@Override
  public Observable<InstanceInfo> resolve() {
    return cachingSubject.asObservable().mergeWith(control);
  }
}

代码示例来源:origin: org.deephacks.rxlmdb/rxlmdb

/**
 * Delete all records
 */
public void delete(RxTx tx) {
 // non-blocking needed?
 scan(tx).toBlocking().forEach(keyValues -> {
  Observable<byte[]> keys = keyValues.stream()
   .map(kv -> Observable.just(kv.key()))
   .reduce(Observable.empty(), (o1, o2) -> o1.mergeWith(o2));
  delete(tx, keys);
 });
}

代码示例来源:origin: org.hawkular.metrics/hawkular-metrics-core-service

@Override
public Observable<Void> deleteTags(Metric<?> metric, Set<String> tags) {
  return getMetricTags(metric.getMetricId())
      .map(loadedTags -> {
        loadedTags.keySet().retainAll(tags);
        return loadedTags;
      })
      .flatMap(tagsToDelete -> {
        return dataAccess.deleteTags(metric, tagsToDelete.keySet()).mergeWith(
            dataAccess.deleteFromMetricsTagsIndex(metric.getMetricId(), tagsToDelete)).toList()
            .map(r -> null);
      });
}

代码示例来源:origin: Azure/azure-libraries-for-java

@Override
public Observable<String> streamApplicationLogsAsync() {
  return functionService.ping()
      .mergeWith(functionService.getHostStatus())
      .last()
      .flatMap(new Func1<Void, Observable<String>>() {
        @Override
        public Observable<String> call(Void aVoid) {
          return FunctionAppImpl.super.streamApplicationLogsAsync();
        }
      });
}

代码示例来源:origin: Azure/azure-libraries-for-java

@Override
  public Observable<Indexable> call(Context context) {
    return submitAppSettings().mergeWith(submitConnectionStrings())
        .last().flatMap(new Func1<Indexable, Observable<Indexable>>() {
          @Override
          public Observable<Indexable> call(Indexable indexable) {
            return submitStickiness();
          }
        });
  }
});

代码示例来源:origin: Azure/azure-libraries-for-java

@Override
public Observable<String> streamTraceLogsAsync() {
  return functionService.ping()
      .mergeWith(functionService.getHostStatus())
      .last()
      .flatMap(new Func1<Void, Observable<String>>() {
        @Override
        public Observable<String> call(Void aVoid) {
          return FunctionAppImpl.super.streamTraceLogsAsync();
        }
      });
}

代码示例来源:origin: Azure/azure-libraries-for-java

@Override
public Observable<String> streamDeploymentLogsAsync() {
  return functionService.ping()
      .mergeWith(functionService.getHostStatus())
      .last()
      .flatMap(new Func1<Void, Observable<String>>() {
        @Override
        public Observable<String> call(Void aVoid) {
          return FunctionAppImpl.super.streamDeploymentLogsAsync();
        }
      });
}

代码示例来源:origin: hawkular/hawkular-metrics

@Override
public Observable<Void> deleteTags(Metric<?> metric, Set<String> tags) {
  return getMetricTags(metric.getMetricId())
      .map(loadedTags -> {
        loadedTags.keySet().retainAll(tags);
        return loadedTags;
      })
      .flatMap(tagsToDelete -> {
        return dataAccess.deleteTags(metric, tagsToDelete.keySet()).mergeWith(
            dataAccess.deleteFromMetricsTagsIndex(metric.getMetricId(), tagsToDelete)).toList()
            .map(r -> null);
      });
}

代码示例来源:origin: Azure/azure-libraries-for-java

@Override
public Observable<String> streamHttpLogsAsync() {
  return functionService.ping()
      .mergeWith(functionService.getHostStatus())
      .last()
      .flatMap(new Func1<Void, Observable<String>>() {
        @Override
        public Observable<String> call(Void aVoid) {
          return FunctionAppImpl.super.streamHttpLogsAsync();
        }
      });
}

代码示例来源:origin: Azure/azure-libraries-for-java

@Override
public Observable<String> streamAllLogsAsync() {
  return functionService.ping()
      .mergeWith(functionService.getHostStatus())
      .last()
      .flatMap(new Func1<Void, Observable<String>>() {
        @Override
        public Observable<String> call(Void aVoid) {
          return FunctionAppImpl.super.streamAllLogsAsync();
        }
      });
}

代码示例来源:origin: hawkular/hawkular-metrics

@Override
public Observable<Row> findAllMetricIdentifiersInData() {
  return getPrepForAllTempTables(TempStatement.LIST_ALL_METRICS_FROM_TABLE)
      .flatMap(b -> rxSession.executeAndFetch(b.bind()))
      .mergeWith(rxSession.executeAndFetch(findAllMetricsInData.bind()))
      .mergeWith(rxSession.executeAndFetch(findAllMetricsInDataCompressed.bind()));
}

代码示例来源:origin: org.hawkular.metrics/hawkular-metrics-core-service

@Override
public Observable<Row> findAllMetricIdentifiersInData() {
  return getPrepForAllTempTables(TempStatement.LIST_ALL_METRICS_FROM_TABLE)
      .flatMap(b -> rxSession.executeAndFetch(b.bind()))
      .mergeWith(rxSession.executeAndFetch(findAllMetricsInData.bind()))
      .mergeWith(rxSession.executeAndFetch(findAllMetricsInDataCompressed.bind()));
}

代码示例来源:origin: techery/janet

@Override
  public Observable<ActionState<TestAction>> call(Observable<ActionState<TestAction>> observable) {
    return observable.mergeWith(actionPipe.observe());
  }
})

代码示例来源:origin: AvanzaBank/astrix

@Override
public Observable<List<AstrixServiceInvocationResponse>> submitRoutedRequests(
    Collection<RoutedServiceInvocationRequest> requests) {
  Observable<AstrixServiceInvocationResponse> result = Observable.empty();
  for (RoutedServiceInvocationRequest request : requests) {
    result = result.mergeWith(submitRoutedRequest(request.getRequest(), request.getRoutingkey()));
  }
  return result.toList();
}

代码示例来源:origin: AvanzaBank/astrix

private Observable<List<AstrixServiceInvocationResponse>> observeRoutedReqeuests(Collection<RoutedServiceInvocationRequest> requests) {
  Observable<AstrixServiceInvocationResponse> result = Observable.empty();
  for (RoutedServiceInvocationRequest request : requests) {
    result = result.mergeWith(spaceTaskDispatcher.observe(new AstrixServiceInvocationTask(request.getRequest()), request.getRoutingkey()));
  }
  return result.toList();
}

相关文章

Observable类方法