rx.Observable.mergeWith()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(7.9k)|赞(0)|评价(0)|浏览(158)

本文整理了Java中rx.Observable.mergeWith()方法的一些代码示例,展示了Observable.mergeWith()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.mergeWith()方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:mergeWith

Observable.mergeWith介绍

[英]Flattens this and another Observable into a single Observable, without any transformation.

You can combine items emitted by multiple Observables so that they appear as a single Observable, by using the mergeWith method. Scheduler: mergeWith does not operate by default on a particular Scheduler.
[中]

代码示例

代码示例来源:origin: vert-x3/vertx-examples

  1. @Override
  2. public void start() throws Exception {
  3. HttpClient client = vertx.createHttpClient();
  4. HttpClientRequest req = client.request(HttpMethod.GET, 8080, "localhost", "/");
  5. req.toObservable().
  6. // Status code check and -> Observable<Buffer>
  7. flatMap(resp -> {
  8. if (resp.statusCode() != 200) {
  9. throw new RuntimeException("Wrong status code " + resp.statusCode());
  10. }
  11. return Observable.just(Buffer.buffer()).mergeWith(resp.toObservable());
  12. }).
  13. // Reduce all buffers in a single buffer
  14. reduce(Buffer::appendBuffer).
  15. // Turn in to a string
  16. map(buffer -> buffer.toString("UTF-8")).
  17. // Get a single buffer
  18. subscribe(data -> System.out.println("Server content " + data));
  19. // End request
  20. req.end();
  21. }
  22. }

代码示例来源:origin: apache/usergrid

  1. .mergeWith(graphManager.markNode((Id) id, createGraphOperationTimestamp())).toBlocking().last();
  2. return id;
  3. })

代码示例来源:origin: apache/usergrid

  1. .mergeWith(gm.markNode(entityId, CpNamingUtils.createGraphOperationTimestamp()))
  2. .toBlocking().last();

代码示例来源:origin: apache/usergrid

  1. /**
  2. * Marks entity for deletion in entity collection manager and graph.
  3. * Convert this method to return a list of observables that we can crunch through on return.
  4. * Returns merged obversable that will mark the edges in the ecm and the graph manager.
  5. * @param entityRef
  6. * @return
  7. */
  8. private Observable markEntity(EntityRef entityRef){
  9. if(applicationScope == null || entityRef == null){
  10. return Observable.empty();
  11. }
  12. GraphManager gm = graphManagerFactory.createEdgeManager( applicationScope );
  13. EntityCollectionManager ecm = managerCache.getEntityCollectionManager( applicationScope );
  14. Id entityId = new SimpleId( entityRef.getUuid(), entityRef.getType() );
  15. //Step 1 & 2 of delete
  16. String region = this.lookupAuthoritativeRegionForType( entityRef.getType() );
  17. return ecm.mark( entityId, region ).mergeWith( gm.markNode( entityId, createGraphOperationTimestamp() ) );
  18. }

代码示例来源:origin: leeowenowen/rxjava-examples

  1. @Override
  2. public void run() {
  3. Observable.just(1, 2, 3)
  4. .mergeWith(Observable.just(4, 5, 6))
  5. .subscribe(new Action1<Integer>() {
  6. @Override
  7. public void call(Integer integer) {
  8. log(integer);
  9. }
  10. });
  11. }
  12. });

代码示例来源:origin: com.netflix.eureka/eureka2-server

  1. @Override
  2. public Observable<InstanceInfo> resolve() {
  3. return cachingSubject.asObservable().mergeWith(control);
  4. }
  5. }

代码示例来源:origin: org.deephacks.rxlmdb/rxlmdb

  1. /**
  2. * Delete all records
  3. */
  4. public void delete(RxTx tx) {
  5. // non-blocking needed?
  6. scan(tx).toBlocking().forEach(keyValues -> {
  7. Observable<byte[]> keys = keyValues.stream()
  8. .map(kv -> Observable.just(kv.key()))
  9. .reduce(Observable.empty(), (o1, o2) -> o1.mergeWith(o2));
  10. delete(tx, keys);
  11. });
  12. }

代码示例来源:origin: org.hawkular.metrics/hawkular-metrics-core-service

  1. @Override
  2. public Observable<Void> deleteTags(Metric<?> metric, Set<String> tags) {
  3. return getMetricTags(metric.getMetricId())
  4. .map(loadedTags -> {
  5. loadedTags.keySet().retainAll(tags);
  6. return loadedTags;
  7. })
  8. .flatMap(tagsToDelete -> {
  9. return dataAccess.deleteTags(metric, tagsToDelete.keySet()).mergeWith(
  10. dataAccess.deleteFromMetricsTagsIndex(metric.getMetricId(), tagsToDelete)).toList()
  11. .map(r -> null);
  12. });
  13. }

代码示例来源:origin: Azure/azure-libraries-for-java

  1. @Override
  2. public Observable<String> streamApplicationLogsAsync() {
  3. return functionService.ping()
  4. .mergeWith(functionService.getHostStatus())
  5. .last()
  6. .flatMap(new Func1<Void, Observable<String>>() {
  7. @Override
  8. public Observable<String> call(Void aVoid) {
  9. return FunctionAppImpl.super.streamApplicationLogsAsync();
  10. }
  11. });
  12. }

代码示例来源:origin: Azure/azure-libraries-for-java

  1. @Override
  2. public Observable<Indexable> call(Context context) {
  3. return submitAppSettings().mergeWith(submitConnectionStrings())
  4. .last().flatMap(new Func1<Indexable, Observable<Indexable>>() {
  5. @Override
  6. public Observable<Indexable> call(Indexable indexable) {
  7. return submitStickiness();
  8. }
  9. });
  10. }
  11. });

代码示例来源:origin: Azure/azure-libraries-for-java

  1. @Override
  2. public Observable<String> streamTraceLogsAsync() {
  3. return functionService.ping()
  4. .mergeWith(functionService.getHostStatus())
  5. .last()
  6. .flatMap(new Func1<Void, Observable<String>>() {
  7. @Override
  8. public Observable<String> call(Void aVoid) {
  9. return FunctionAppImpl.super.streamTraceLogsAsync();
  10. }
  11. });
  12. }

代码示例来源:origin: Azure/azure-libraries-for-java

  1. @Override
  2. public Observable<String> streamDeploymentLogsAsync() {
  3. return functionService.ping()
  4. .mergeWith(functionService.getHostStatus())
  5. .last()
  6. .flatMap(new Func1<Void, Observable<String>>() {
  7. @Override
  8. public Observable<String> call(Void aVoid) {
  9. return FunctionAppImpl.super.streamDeploymentLogsAsync();
  10. }
  11. });
  12. }

代码示例来源:origin: hawkular/hawkular-metrics

  1. @Override
  2. public Observable<Void> deleteTags(Metric<?> metric, Set<String> tags) {
  3. return getMetricTags(metric.getMetricId())
  4. .map(loadedTags -> {
  5. loadedTags.keySet().retainAll(tags);
  6. return loadedTags;
  7. })
  8. .flatMap(tagsToDelete -> {
  9. return dataAccess.deleteTags(metric, tagsToDelete.keySet()).mergeWith(
  10. dataAccess.deleteFromMetricsTagsIndex(metric.getMetricId(), tagsToDelete)).toList()
  11. .map(r -> null);
  12. });
  13. }

代码示例来源:origin: Azure/azure-libraries-for-java

  1. @Override
  2. public Observable<String> streamHttpLogsAsync() {
  3. return functionService.ping()
  4. .mergeWith(functionService.getHostStatus())
  5. .last()
  6. .flatMap(new Func1<Void, Observable<String>>() {
  7. @Override
  8. public Observable<String> call(Void aVoid) {
  9. return FunctionAppImpl.super.streamHttpLogsAsync();
  10. }
  11. });
  12. }

代码示例来源:origin: Azure/azure-libraries-for-java

  1. @Override
  2. public Observable<String> streamAllLogsAsync() {
  3. return functionService.ping()
  4. .mergeWith(functionService.getHostStatus())
  5. .last()
  6. .flatMap(new Func1<Void, Observable<String>>() {
  7. @Override
  8. public Observable<String> call(Void aVoid) {
  9. return FunctionAppImpl.super.streamAllLogsAsync();
  10. }
  11. });
  12. }

代码示例来源:origin: hawkular/hawkular-metrics

  1. @Override
  2. public Observable<Row> findAllMetricIdentifiersInData() {
  3. return getPrepForAllTempTables(TempStatement.LIST_ALL_METRICS_FROM_TABLE)
  4. .flatMap(b -> rxSession.executeAndFetch(b.bind()))
  5. .mergeWith(rxSession.executeAndFetch(findAllMetricsInData.bind()))
  6. .mergeWith(rxSession.executeAndFetch(findAllMetricsInDataCompressed.bind()));
  7. }

代码示例来源:origin: org.hawkular.metrics/hawkular-metrics-core-service

  1. @Override
  2. public Observable<Row> findAllMetricIdentifiersInData() {
  3. return getPrepForAllTempTables(TempStatement.LIST_ALL_METRICS_FROM_TABLE)
  4. .flatMap(b -> rxSession.executeAndFetch(b.bind()))
  5. .mergeWith(rxSession.executeAndFetch(findAllMetricsInData.bind()))
  6. .mergeWith(rxSession.executeAndFetch(findAllMetricsInDataCompressed.bind()));
  7. }

代码示例来源:origin: techery/janet

  1. @Override
  2. public Observable<ActionState<TestAction>> call(Observable<ActionState<TestAction>> observable) {
  3. return observable.mergeWith(actionPipe.observe());
  4. }
  5. })

代码示例来源:origin: AvanzaBank/astrix

  1. @Override
  2. public Observable<List<AstrixServiceInvocationResponse>> submitRoutedRequests(
  3. Collection<RoutedServiceInvocationRequest> requests) {
  4. Observable<AstrixServiceInvocationResponse> result = Observable.empty();
  5. for (RoutedServiceInvocationRequest request : requests) {
  6. result = result.mergeWith(submitRoutedRequest(request.getRequest(), request.getRoutingkey()));
  7. }
  8. return result.toList();
  9. }

代码示例来源:origin: AvanzaBank/astrix

  1. private Observable<List<AstrixServiceInvocationResponse>> observeRoutedReqeuests(Collection<RoutedServiceInvocationRequest> requests) {
  2. Observable<AstrixServiceInvocationResponse> result = Observable.empty();
  3. for (RoutedServiceInvocationRequest request : requests) {
  4. result = result.mergeWith(spaceTaskDispatcher.observe(new AstrixServiceInvocationTask(request.getRequest()), request.getRoutingkey()));
  5. }
  6. return result.toList();
  7. }

相关文章

Observable类方法