本文整理了Java中reactor.core.publisher.Mono.then()
方法的一些代码示例,展示了Mono.then()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Mono.then()
方法的具体详情如下:
包路径:reactor.core.publisher.Mono
类名称:Mono
方法名:then
[英]Return a Mono which only replays complete and error signals from this Mono.
[中]返回一个单声道,该单声道只回放来自该单声道的完整和错误信号。
代码示例来源:origin: spring-projects/spring-framework
@SuppressWarnings("unchecked")
private static <T> Supplier<Mono<T>> skipBodyAsMono(ReactiveHttpInputMessage message) {
return message instanceof ClientHttpResponse ?
() -> consumeAndCancel(message).then(Mono.empty()) : Mono::empty;
}
代码示例来源:origin: spring-projects/spring-framework
@Override
public Mono<HandlerResult> handle(ServerWebExchange exchange, Object handler) {
WebHandler webHandler = (WebHandler) handler;
Mono<Void> mono = webHandler.handle(exchange);
return mono.then(Mono.empty());
}
代码示例来源:origin: spring-projects/spring-framework
/**
* Prepare the model to use for rendering.
* <p>The default implementation creates a combined output Map that includes
* model as well as static attributes with the former taking precedence.
*/
protected Mono<Map<String, Object>> getModelAttributes(@Nullable Map<String, ?> model,
ServerWebExchange exchange) {
int size = (model != null ? model.size() : 0);
Map<String, Object> attributes = new LinkedHashMap<>(size);
if (model != null) {
attributes.putAll(model);
}
return resolveAsyncAttributes(attributes).then(Mono.just(attributes));
}
代码示例来源:origin: spring-projects/spring-framework
@Override
public Mono<HandlerResult> handle(ServerWebExchange exchange, Object handler) {
WebSocketHandler webSocketHandler = (WebSocketHandler) handler;
return getWebSocketService().handleRequest(exchange, webSocketHandler).then(Mono.empty());
}
代码示例来源:origin: spring-projects/spring-framework
@Override
public Mono<ServerResponse> build(Publisher<Void> voidPublisher) {
Assert.notNull(voidPublisher, "Publisher must not be null");
return build((exchange, handlerStrategies) ->
Mono.from(voidPublisher).then(exchange.getResponse().setComplete()));
}
代码示例来源:origin: spring-projects/spring-framework
@PostMapping("/mono")
public Mono<Void> createWithMono(@RequestBody Mono<Person> mono) {
return mono.doOnNext(persons::add).then();
}
代码示例来源:origin: spring-projects/spring-framework
@ModelAttribute
public Mono<Void> voidMonoMethodBean(Model model) {
return Mono.just("Void Mono Method Bean")
.doOnNext(name -> model.addAttribute("voidMonoMethodBean", new TestBean(name)))
.then();
}
代码示例来源:origin: spring-projects/spring-framework
/**
* Bind query params, form data, and or multipart form data to the binder target.
* @param exchange the current exchange.
* @return a {@code Mono<Void>} when binding is complete
*/
public Mono<Void> bind(ServerWebExchange exchange) {
return getValuesToBind(exchange)
.doOnNext(values -> doBind(new MutablePropertyValues(values)))
.then();
}
代码示例来源:origin: spring-projects/spring-framework
@Override
public ListenableFuture<Void> connect(final TcpConnectionHandler<P> handler) {
Assert.notNull(handler, "TcpConnectionHandler is required");
if (this.stopping) {
return handleShuttingDownConnectFailure(handler);
}
Mono<Void> connectMono = this.tcpClient
.handle(new ReactorNettyHandler(handler))
.connect()
.doOnError(handler::afterConnectFailure)
.then();
return new MonoToListenableFutureAdapter<>(connectMono);
}
代码示例来源:origin: spring-projects/spring-framework
private Mono<Void> executeInternal(URI url, HttpHeaders headers, WebSocketHandler handler) {
MonoProcessor<Void> completionMono = MonoProcessor.create();
return Mono.fromCallable(
() -> {
if (logger.isDebugEnabled()) {
logger.debug("Connecting to " + url);
}
Object jettyHandler = createHandler(url, handler, completionMono);
ClientUpgradeRequest request = new ClientUpgradeRequest();
request.setSubProtocols(handler.getSubProtocols());
UpgradeListener upgradeListener = new DefaultUpgradeListener(headers);
return this.jettyClient.connect(jettyHandler, url, request, upgradeListener);
})
.then(completionMono);
}
代码示例来源:origin: spring-projects/spring-framework
private Mono<Void> executeInternal(URI url, HttpHeaders requestHeaders, WebSocketHandler handler) {
MonoProcessor<Void> completionMono = MonoProcessor.create();
return Mono.fromCallable(
() -> {
if (logger.isDebugEnabled()) {
logger.debug("Connecting to " + url);
}
List<String> protocols = handler.getSubProtocols();
DefaultConfigurator configurator = new DefaultConfigurator(requestHeaders);
Endpoint endpoint = createEndpoint(url, handler, completionMono, configurator);
ClientEndpointConfig config = createEndpointConfig(configurator, protocols);
return this.webSocketContainer.connectToServer(endpoint, config, url);
})
.subscribeOn(Schedulers.elastic()) // connectToServer is blocking
.then(completionMono);
}
代码示例来源:origin: spring-projects/spring-framework
@Test
public void onStatusWithMonoErrorAndBodyConsumed() {
RuntimeException ex = new RuntimeException("response error");
testOnStatus(ex, response -> response.bodyToMono(Void.class).then(Mono.error(ex)));
}
代码示例来源:origin: spring-projects/spring-framework
/**
* Apply {@link Flux#reduce(Object, BiFunction) reduce} on the body, count
* the number of bytes produced, release data buffers without writing, and
* set the {@literal Content-Length} header.
*/
@Override
public final Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
return Flux.from(body)
.reduce(0, (current, buffer) -> {
int next = current + buffer.readableByteCount();
DataBufferUtils.release(buffer);
return next;
})
.doOnNext(count -> getHeaders().setContentLength(count))
.then();
}
代码示例来源:origin: spring-projects/spring-framework
@Override
public Mono<HandlerResult> handle(ServerWebExchange exchange, Object handler) {
HandlerMethod handlerMethod = (HandlerMethod) handler;
Assert.state(this.methodResolver != null && this.modelInitializer != null, "Not initialized");
InitBinderBindingContext bindingContext = new InitBinderBindingContext(
getWebBindingInitializer(), this.methodResolver.getInitBinderMethods(handlerMethod));
InvocableHandlerMethod invocableMethod = this.methodResolver.getRequestMappingMethod(handlerMethod);
Function<Throwable, Mono<HandlerResult>> exceptionHandler =
ex -> handleException(ex, handlerMethod, bindingContext, exchange);
return this.modelInitializer
.initModel(handlerMethod, bindingContext, exchange)
.then(Mono.defer(() -> invocableMethod.invoke(exchange, bindingContext)))
.doOnNext(result -> result.setExceptionHandler(exceptionHandler))
.doOnNext(result -> bindingContext.saveModel())
.onErrorResume(exceptionHandler);
}
代码示例来源:origin: spring-projects/spring-framework
@Test
public void captureAndClaim() {
ClientHttpRequest request = new MockClientHttpRequest(HttpMethod.GET, "/test");
ClientHttpResponse response = new MockClientHttpResponse(HttpStatus.OK);
ClientHttpConnector connector = (method, uri, fn) -> fn.apply(request).then(Mono.just(response));
ClientRequest clientRequest = ClientRequest.create(HttpMethod.GET, URI.create("/test"))
.header(WebTestClient.WEBTESTCLIENT_REQUEST_ID, "1").build();
WiretapConnector wiretapConnector = new WiretapConnector(connector);
ExchangeFunction function = ExchangeFunctions.create(wiretapConnector);
function.exchange(clientRequest).block(ofMillis(0));
WiretapConnector.Info actual = wiretapConnector.claimRequest("1");
ExchangeResult result = actual.createExchangeResult(Duration.ZERO, null);
assertEquals(HttpMethod.GET, result.getMethod());
assertEquals("/test", result.getUrl().toString());
}
代码示例来源:origin: spring-projects/spring-framework
@Override
public Mono<Void> handle(ServerWebExchange exchange) {
if (exchange.getRequest().getQueryParams().containsKey("expire")) {
return exchange.getSession().doOnNext(session -> {
// Don't do anything, leave it expired...
}).then();
}
else if (exchange.getRequest().getQueryParams().containsKey("changeId")) {
return exchange.getSession().flatMap(session ->
session.changeSessionId().doOnSuccess(aVoid -> updateSessionAttribute(session)));
}
else if (exchange.getRequest().getQueryParams().containsKey("invalidate")) {
return exchange.getSession().doOnNext(WebSession::invalidate).then();
}
else {
return exchange.getSession().doOnSuccess(this::updateSessionAttribute).then();
}
}
代码示例来源:origin: spring-projects/spring-framework
@Override
public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {
if (this.forwardedHeaderTransformer != null) {
request = this.forwardedHeaderTransformer.apply(request);
}
ServerWebExchange exchange = createExchange(request, response);
LogFormatUtils.traceDebug(logger, traceOn ->
exchange.getLogPrefix() + formatRequest(exchange.getRequest()) +
(traceOn ? ", headers=" + formatHeaders(exchange.getRequest().getHeaders()) : ""));
return getDelegate().handle(exchange)
.doOnSuccess(aVoid -> logResponse(exchange))
.onErrorResume(ex -> handleUnresolvedError(exchange, ex))
.then(Mono.defer(response::setComplete));
}
代码示例来源:origin: spring-projects/spring-framework
@Override
public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {
Mono<Integer> requestSizeMono = request.getBody().
reduce(0, (integer, dataBuffer) -> integer +
dataBuffer.readableByteCount()).
doOnSuccessOrError((size, throwable) -> {
assertNull(throwable);
assertEquals(REQUEST_SIZE, (long) size);
});
response.getHeaders().setContentLength(RESPONSE_SIZE);
return requestSizeMono.then(response.writeWith(multipleChunks()));
}
代码示例来源:origin: spring-projects/spring-framework
@Test // SPR-16231
public void responseCommitted() {
Throwable ex = new ResponseStatusException(HttpStatus.INTERNAL_SERVER_ERROR, "Oops");
this.exchange.getResponse().setStatusCode(HttpStatus.CREATED);
Mono<Void> mono = this.exchange.getResponse().setComplete()
.then(Mono.defer(() -> this.handler.handle(this.exchange, ex)));
StepVerifier.create(mono).consumeErrorWith(actual -> assertSame(ex, actual)).verify();
}
代码示例来源:origin: spring-projects/spring-framework
private Mono<Void> assertGetFormParts(ServerWebExchange exchange) {
return exchange
.getMultipartData()
.doOnNext(parts -> {
assertEquals(2, parts.size());
assertTrue(parts.containsKey("fooPart"));
assertFooPart(parts.getFirst("fooPart"));
assertTrue(parts.containsKey("barPart"));
assertBarPart(parts.getFirst("barPart"));
})
.then();
}
内容来源于网络,如有侵权,请联系作者删除!