本文整理了Java中reactor.core.publisher.Mono.compose()
方法的一些代码示例,展示了Mono.compose()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Mono.compose()
方法的具体详情如下:
包路径:reactor.core.publisher.Mono
类名称:Mono
方法名:compose
[英]Defer the given transformation to this Mono in order to generate a target Mono type. A transformation will occur for each Subscriber. For instance:
mono.compose(original -> original.log());
[中]将给定的转换推迟到这个Mono,以便生成目标Mono类型。每个订阅服务器都将发生转换。例如:
mono.compose(original -> original.log());
代码示例来源:origin: reactor/reactor-core
@Test
public void perTrackable() {
Mono<Integer> source = Mono.just(10).compose(f -> {
AtomicInteger value = new AtomicInteger();
return f.map(v -> v + value.incrementAndGet());
});
for (int i = 0; i < 10; i++) {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
source.subscribe(ts);
ts.assertValues(11)
.assertComplete()
.assertNoError();
}
}
代码示例来源:origin: reactor/reactor-core
@Test
public void composerThrows() {
Mono<Integer> source = Mono.just(10).compose(f -> {
throw new RuntimeException("Forced failure");
});
for (int i = 0; i < 10; i++) {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
source.subscribe(ts);
ts.assertNoValues()
.assertNotComplete()
.assertError(RuntimeException.class);
}
}
代码示例来源:origin: reactor/reactor-core
@Test
public void composerReturnsNull() {
Mono<Integer> source = Mono.just(10).compose(f -> {
return null;
});
for (int i = 0; i < 10; i++) {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
source.subscribe(ts);
ts.assertNoValues()
.assertNotComplete()
.assertError(NullPointerException.class);
}
}
}
代码示例来源:origin: org.springframework.boot/spring-boot-actuator
@Override
public Mono<Health> health() {
return Flux.fromIterable(this.registry.getAll().entrySet())
.flatMap((entry) -> Mono.zip(Mono.just(entry.getKey()),
entry.getValue().health().compose(this.timeoutCompose)))
.collectMap(Tuple2::getT1, Tuple2::getT2)
.map(this.healthAggregator::aggregate);
}
代码示例来源:origin: mulesoft/mule
@Override
public Publisher<Void> execute(CoreEvent event, Map<String, Object> parameters, SourceCallbackContext context) {
return from(first.execute(event, parameters, context)).compose(v -> then.execute(event, parameters, context));
}
}
代码示例来源:origin: org.springframework.boot/spring-boot-actuator
@Override
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
if (this.autoTimeRequests) {
return chain.filter(exchange).compose((call) -> filter(exchange, call));
}
return chain.filter(exchange);
}
代码示例来源:origin: com.aol.cyclops/cyclops-reactor
/**
* @param transformer
* @return
* @see reactor.core.publisher.Mono#compose(java.util.function.Function)
*/
public final <V> Mono<V> compose(Function<? super Mono<T>, ? extends Publisher<V>> transformer) {
return boxed.compose(transformer);
}
/**
代码示例来源:origin: org.cloudfoundry/cloudfoundry-client-spring
protected final <T> Mono<T> doGet(Class<T> responseType, Function<UriComponentsBuilder, UriComponentsBuilder> uriTransformer, Function<HttpClientRequest, HttpClientRequest> requestTransformer) {
return doGet(uriTransformer, requestTransformer)
.compose(deserializedResponse(responseType));
}
代码示例来源:origin: org.mule.runtime/mule-module-extensions-support
@Override
public Publisher<Void> execute(CoreEvent event, Map<String, Object> parameters, SourceCallbackContext context) {
return from(first.execute(event, parameters, context)).compose(v -> then.execute(event, parameters, context));
}
}
代码示例来源:origin: org.cloudfoundry/cloudfoundry-client-spring
protected final <T> Mono<T> doDelete(Object request, Class<T> responseType, Function<UriComponentsBuilder, UriComponentsBuilder> uriTransformer,
Function<HttpClientRequest, HttpClientRequest> requestTransformer) {
return this.root
.map(root -> buildUri(root, uriTransformer))
.then(uri -> this.httpClient.delete(uri, outbound -> this.authorizationProvider.addAuthorization(outbound)
.map(requestTransformer)
.then(o -> o.send(serializedRequest(o, request))))
.doOnSubscribe(s -> this.requestLogger.debug("DELETE {}", uri))
.compose(logResponse(uri)))
.compose(deserializedResponse(responseType));
}
代码示例来源:origin: org.cloudfoundry/cloudfoundry-client-spring
protected final <T> Mono<T> doPost(Class<T> responseType, Function<UriComponentsBuilder, UriComponentsBuilder> uriTransformer, Function<HttpClientRequest, Mono<Void>> requestTransformer) {
return this.root
.map(root -> buildUri(root, uriTransformer))
.then(uri -> this.httpClient.post(uri, outbound -> this.authorizationProvider.addAuthorization(outbound)
.then(requestTransformer))
.doOnSubscribe(s -> this.requestLogger.debug("POST {}", uri))
.compose(logResponse(uri)))
.compose(deserializedResponse(responseType));
}
代码示例来源:origin: org.cloudfoundry/cloudfoundry-client-spring
protected final <T> Mono<T> doPut(Class<T> responseType, Function<UriComponentsBuilder, UriComponentsBuilder> uriTransformer, Function<HttpClientRequest, Mono<Void>> requestTransformer) {
return this.root
.map(root -> buildUri(root, uriTransformer))
.then(uri -> this.httpClient.put(uri, outbound -> this.authorizationProvider.addAuthorization(outbound)
.then(requestTransformer))
.doOnSubscribe(s -> this.requestLogger.debug("PUT {}", uri))
.compose(logResponse(uri)))
.compose(deserializedResponse(responseType));
}
代码示例来源:origin: org.cloudfoundry/cloudfoundry-client-spring
protected final <T> Mono<T> doPatch(Object request, Class<T> responseType, Function<UriComponentsBuilder, UriComponentsBuilder> uriTransformer,
Function<HttpClientRequest, HttpClientRequest> requestTransformer) {
return this.root
.map(root -> buildUri(root, uriTransformer))
.then(uri -> this.httpClient.patch(uri, outbound -> this.authorizationProvider.addAuthorization(outbound)
.map(requestTransformer)
.then(o -> o.send(serializedRequest(o, request))))
.doOnSubscribe(s -> this.requestLogger.debug("PATCH {}", uri))
.compose(logResponse(uri)))
.compose(deserializedResponse(responseType));
}
代码示例来源:origin: org.cloudfoundry/cloudfoundry-client-spring
protected final <T> Mono<T> doPut(Object request, Class<T> responseType, Function<UriComponentsBuilder, UriComponentsBuilder> uriTransformer,
Function<HttpClientRequest, HttpClientRequest> requestTransformer) {
return this.root
.map(root -> buildUri(root, uriTransformer))
.then(uri -> this.httpClient.put(uri, outbound -> this.authorizationProvider.addAuthorization(outbound)
.map(requestTransformer)
.then(o -> o.send(serializedRequest(o, request))))
.doOnSubscribe(s -> this.requestLogger.debug("PUT {}", uri))
.compose(logResponse(uri)))
.compose(deserializedResponse(responseType));
}
代码示例来源:origin: org.cloudfoundry/cloudfoundry-client-spring
protected final Mono<HttpClientResponse> doGet(Function<UriComponentsBuilder, UriComponentsBuilder> uriTransformer, Function<HttpClientRequest, HttpClientRequest> requestTransformer) {
return this.root
.map(root -> buildUri(root, uriTransformer))
.then(uri -> this.httpClient.get(uri, outbound -> this.authorizationProvider.addAuthorization(outbound)
.map(requestTransformer)
.then(HttpClientRequest::sendHeaders))
.doOnSubscribe(s -> this.requestLogger.debug("GET {}", uri))
.compose(logResponse(uri)));
}
代码示例来源:origin: org.cloudfoundry/cloudfoundry-client-spring
protected final Mono<HttpClientResponse> doWs(Function<UriComponentsBuilder, UriComponentsBuilder> uriTransformer, Function<HttpClientRequest, HttpClientRequest> requestTransformer) {
return this.root
.map(root -> buildUri(root, uriTransformer))
.then(uri -> this.httpClient.get(uri, outbound -> this.authorizationProvider.addAuthorization(outbound)
.map(requestTransformer)
.then(HttpClientRequest::upgradeToTextWebsocket))
.doOnSubscribe(s -> this.requestLogger.debug("WS {}", uri))
.compose(logResponse(uri)));
}
代码示例来源:origin: org.springframework.metrics/spring-metrics
@Override
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
return chain.filter(exchange).compose(f -> {
long start = System.nanoTime();
return f.doOnSuccess(done ->
registry.timer(metricName, tagConfigurer.httpRequestTags(exchange, null))
.record(System.nanoTime() - start, TimeUnit.NANOSECONDS)
)
.doOnError(t ->
registry.timer(metricName, tagConfigurer.httpRequestTags(exchange, t))
.record(System.nanoTime() - start, TimeUnit.NANOSECONDS)
);
});
}
}
内容来源于网络,如有侵权,请联系作者删除!