本文整理了Java中rx.Observable.mergeWith()
方法的一些代码示例,展示了Observable.mergeWith()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.mergeWith()
方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:mergeWith
[英]Flattens this and another Observable into a single Observable, without any transformation.
You can combine items emitted by multiple Observables so that they appear as a single Observable, by using the mergeWith method. Scheduler: mergeWith does not operate by default on a particular Scheduler.
[中]
代码示例来源:origin: vert-x3/vertx-examples
@Override
public void start() throws Exception {
HttpClient client = vertx.createHttpClient();
HttpClientRequest req = client.request(HttpMethod.GET, 8080, "localhost", "/");
req.toObservable().
// Status code check and -> Observable<Buffer>
flatMap(resp -> {
if (resp.statusCode() != 200) {
throw new RuntimeException("Wrong status code " + resp.statusCode());
}
return Observable.just(Buffer.buffer()).mergeWith(resp.toObservable());
}).
// Reduce all buffers in a single buffer
reduce(Buffer::appendBuffer).
// Turn in to a string
map(buffer -> buffer.toString("UTF-8")).
// Get a single buffer
subscribe(data -> System.out.println("Server content " + data));
// End request
req.end();
}
}
代码示例来源:origin: apache/usergrid
.mergeWith(graphManager.markNode((Id) id, createGraphOperationTimestamp())).toBlocking().last();
return id;
})
代码示例来源:origin: apache/usergrid
.mergeWith(gm.markNode(entityId, CpNamingUtils.createGraphOperationTimestamp()))
.toBlocking().last();
代码示例来源:origin: apache/usergrid
/**
* Marks entity for deletion in entity collection manager and graph.
* Convert this method to return a list of observables that we can crunch through on return.
* Returns merged obversable that will mark the edges in the ecm and the graph manager.
* @param entityRef
* @return
*/
private Observable markEntity(EntityRef entityRef){
if(applicationScope == null || entityRef == null){
return Observable.empty();
}
GraphManager gm = graphManagerFactory.createEdgeManager( applicationScope );
EntityCollectionManager ecm = managerCache.getEntityCollectionManager( applicationScope );
Id entityId = new SimpleId( entityRef.getUuid(), entityRef.getType() );
//Step 1 & 2 of delete
String region = this.lookupAuthoritativeRegionForType( entityRef.getType() );
return ecm.mark( entityId, region ).mergeWith( gm.markNode( entityId, createGraphOperationTimestamp() ) );
}
代码示例来源:origin: leeowenowen/rxjava-examples
@Override
public void run() {
Observable.just(1, 2, 3)
.mergeWith(Observable.just(4, 5, 6))
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
log(integer);
}
});
}
});
代码示例来源:origin: com.netflix.eureka/eureka2-server
@Override
public Observable<InstanceInfo> resolve() {
return cachingSubject.asObservable().mergeWith(control);
}
}
代码示例来源:origin: org.deephacks.rxlmdb/rxlmdb
/**
* Delete all records
*/
public void delete(RxTx tx) {
// non-blocking needed?
scan(tx).toBlocking().forEach(keyValues -> {
Observable<byte[]> keys = keyValues.stream()
.map(kv -> Observable.just(kv.key()))
.reduce(Observable.empty(), (o1, o2) -> o1.mergeWith(o2));
delete(tx, keys);
});
}
代码示例来源:origin: org.hawkular.metrics/hawkular-metrics-core-service
@Override
public Observable<Void> deleteTags(Metric<?> metric, Set<String> tags) {
return getMetricTags(metric.getMetricId())
.map(loadedTags -> {
loadedTags.keySet().retainAll(tags);
return loadedTags;
})
.flatMap(tagsToDelete -> {
return dataAccess.deleteTags(metric, tagsToDelete.keySet()).mergeWith(
dataAccess.deleteFromMetricsTagsIndex(metric.getMetricId(), tagsToDelete)).toList()
.map(r -> null);
});
}
代码示例来源:origin: Azure/azure-libraries-for-java
@Override
public Observable<String> streamApplicationLogsAsync() {
return functionService.ping()
.mergeWith(functionService.getHostStatus())
.last()
.flatMap(new Func1<Void, Observable<String>>() {
@Override
public Observable<String> call(Void aVoid) {
return FunctionAppImpl.super.streamApplicationLogsAsync();
}
});
}
代码示例来源:origin: Azure/azure-libraries-for-java
@Override
public Observable<Indexable> call(Context context) {
return submitAppSettings().mergeWith(submitConnectionStrings())
.last().flatMap(new Func1<Indexable, Observable<Indexable>>() {
@Override
public Observable<Indexable> call(Indexable indexable) {
return submitStickiness();
}
});
}
});
代码示例来源:origin: Azure/azure-libraries-for-java
@Override
public Observable<String> streamTraceLogsAsync() {
return functionService.ping()
.mergeWith(functionService.getHostStatus())
.last()
.flatMap(new Func1<Void, Observable<String>>() {
@Override
public Observable<String> call(Void aVoid) {
return FunctionAppImpl.super.streamTraceLogsAsync();
}
});
}
代码示例来源:origin: Azure/azure-libraries-for-java
@Override
public Observable<String> streamDeploymentLogsAsync() {
return functionService.ping()
.mergeWith(functionService.getHostStatus())
.last()
.flatMap(new Func1<Void, Observable<String>>() {
@Override
public Observable<String> call(Void aVoid) {
return FunctionAppImpl.super.streamDeploymentLogsAsync();
}
});
}
代码示例来源:origin: hawkular/hawkular-metrics
@Override
public Observable<Void> deleteTags(Metric<?> metric, Set<String> tags) {
return getMetricTags(metric.getMetricId())
.map(loadedTags -> {
loadedTags.keySet().retainAll(tags);
return loadedTags;
})
.flatMap(tagsToDelete -> {
return dataAccess.deleteTags(metric, tagsToDelete.keySet()).mergeWith(
dataAccess.deleteFromMetricsTagsIndex(metric.getMetricId(), tagsToDelete)).toList()
.map(r -> null);
});
}
代码示例来源:origin: Azure/azure-libraries-for-java
@Override
public Observable<String> streamHttpLogsAsync() {
return functionService.ping()
.mergeWith(functionService.getHostStatus())
.last()
.flatMap(new Func1<Void, Observable<String>>() {
@Override
public Observable<String> call(Void aVoid) {
return FunctionAppImpl.super.streamHttpLogsAsync();
}
});
}
代码示例来源:origin: Azure/azure-libraries-for-java
@Override
public Observable<String> streamAllLogsAsync() {
return functionService.ping()
.mergeWith(functionService.getHostStatus())
.last()
.flatMap(new Func1<Void, Observable<String>>() {
@Override
public Observable<String> call(Void aVoid) {
return FunctionAppImpl.super.streamAllLogsAsync();
}
});
}
代码示例来源:origin: hawkular/hawkular-metrics
@Override
public Observable<Row> findAllMetricIdentifiersInData() {
return getPrepForAllTempTables(TempStatement.LIST_ALL_METRICS_FROM_TABLE)
.flatMap(b -> rxSession.executeAndFetch(b.bind()))
.mergeWith(rxSession.executeAndFetch(findAllMetricsInData.bind()))
.mergeWith(rxSession.executeAndFetch(findAllMetricsInDataCompressed.bind()));
}
代码示例来源:origin: org.hawkular.metrics/hawkular-metrics-core-service
@Override
public Observable<Row> findAllMetricIdentifiersInData() {
return getPrepForAllTempTables(TempStatement.LIST_ALL_METRICS_FROM_TABLE)
.flatMap(b -> rxSession.executeAndFetch(b.bind()))
.mergeWith(rxSession.executeAndFetch(findAllMetricsInData.bind()))
.mergeWith(rxSession.executeAndFetch(findAllMetricsInDataCompressed.bind()));
}
代码示例来源:origin: techery/janet
@Override
public Observable<ActionState<TestAction>> call(Observable<ActionState<TestAction>> observable) {
return observable.mergeWith(actionPipe.observe());
}
})
代码示例来源:origin: AvanzaBank/astrix
@Override
public Observable<List<AstrixServiceInvocationResponse>> submitRoutedRequests(
Collection<RoutedServiceInvocationRequest> requests) {
Observable<AstrixServiceInvocationResponse> result = Observable.empty();
for (RoutedServiceInvocationRequest request : requests) {
result = result.mergeWith(submitRoutedRequest(request.getRequest(), request.getRoutingkey()));
}
return result.toList();
}
代码示例来源:origin: AvanzaBank/astrix
private Observable<List<AstrixServiceInvocationResponse>> observeRoutedReqeuests(Collection<RoutedServiceInvocationRequest> requests) {
Observable<AstrixServiceInvocationResponse> result = Observable.empty();
for (RoutedServiceInvocationRequest request : requests) {
result = result.mergeWith(spaceTaskDispatcher.observe(new AstrixServiceInvocationTask(request.getRequest()), request.getRoutingkey()));
}
return result.toList();
}
内容来源于网络,如有侵权,请联系作者删除!