本文整理了Java中reactor.core.publisher.Mono.doOnNext()
方法的一些代码示例,展示了Mono.doOnNext()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Mono.doOnNext()
方法的具体详情如下:
包路径:reactor.core.publisher.Mono
类名称:Mono
方法名:doOnNext
[英]Add behavior triggered when the Mono emits a data successfully.
[中]添加Mono成功发送数据时触发的行为。
代码示例来源:origin: spring-projects/spring-framework
@Override
protected Mono<String> resolveUrlPathInternal(String resourceUrlPath,
List<? extends Resource> locations, ResourceResolverChain chain) {
String key = RESOLVED_URL_PATH_CACHE_KEY_PREFIX + resourceUrlPath;
String cachedUrlPath = this.cache.get(key, String.class);
if (cachedUrlPath != null) {
logger.trace("Path resolved from cache");
return Mono.just(cachedUrlPath);
}
return chain.resolveUrlPath(resourceUrlPath, locations)
.doOnNext(resolvedPath -> this.cache.put(key, resolvedPath));
}
代码示例来源:origin: spring-projects/spring-framework
@Override
public Mono<Resource> transform(ServerWebExchange exchange, Resource resource,
ResourceTransformerChain transformerChain) {
Resource cachedResource = this.cache.get(resource, Resource.class);
if (cachedResource != null) {
logger.trace(exchange.getLogPrefix() + "Resource resolved from cache");
return Mono.just(cachedResource);
}
return transformerChain.transform(exchange, resource)
.doOnNext(transformed -> this.cache.put(resource, transformed));
}
代码示例来源:origin: spring-projects/spring-framework
@Override
protected Mono<Resource> resolveResourceInternal(@Nullable ServerWebExchange exchange,
String requestPath, List<? extends Resource> locations, ResourceResolverChain chain) {
String key = computeKey(exchange, requestPath);
Resource cachedResource = this.cache.get(key, Resource.class);
if (cachedResource != null) {
String logPrefix = exchange != null ? exchange.getLogPrefix() : "";
logger.trace(logPrefix + "Resource resolved from cache");
return Mono.just(cachedResource);
}
return chain.resolveResource(exchange, requestPath, locations)
.doOnNext(resource -> this.cache.put(key, resource));
}
代码示例来源:origin: spring-projects/spring-framework
@Override
protected Mono<?> getHandlerInternal(ServerWebExchange exchange) {
if (this.routerFunction != null) {
ServerRequest request = ServerRequest.create(exchange, this.messageReaders);
return this.routerFunction.route(request)
.doOnNext(handler -> setAttributes(exchange.getAttributes(), request, handler));
}
else {
return Mono.empty();
}
}
代码示例来源:origin: spring-projects/spring-framework
@Override
public Mono<HandlerFunction<T>> route(ServerRequest serverRequest) {
return this.predicate.nest(serverRequest)
.map(nestedRequest -> {
if (logger.isTraceEnabled()) {
String logPrefix = serverRequest.exchange().getLogPrefix();
logger.trace(logPrefix + String.format("Matched nested %s", this.predicate));
}
return this.routerFunction.route(nestedRequest)
.doOnNext(match -> {
if (nestedRequest != serverRequest) {
serverRequest.attributes().clear();
serverRequest.attributes()
.putAll(nestedRequest.attributes());
}
});
}
).orElseGet(Mono::empty);
}
代码示例来源:origin: spring-projects/spring-framework
@PostMapping("/mono")
public Mono<Void> createWithMono(@RequestBody Mono<Person> mono) {
return mono.doOnNext(persons::add).then();
}
代码示例来源:origin: spring-projects/spring-framework
@ModelAttribute
public Mono<Void> voidMonoMethodBean(Model model) {
return Mono.just("Void Mono Method Bean")
.doOnNext(name -> model.addAttribute("voidMonoMethodBean", new TestBean(name)))
.then();
}
代码示例来源:origin: spring-projects/spring-framework
/**
* Bind query params, form data, and or multipart form data to the binder target.
* @param exchange the current exchange.
* @return a {@code Mono<Void>} when binding is complete
*/
public Mono<Void> bind(ServerWebExchange exchange) {
return getValuesToBind(exchange)
.doOnNext(values -> doBind(new MutablePropertyValues(values)))
.then();
}
代码示例来源:origin: spring-projects/spring-framework
@Override
public Mono<MultiValueMap<String, Part>> readMono(ResolvableType elementType,
ReactiveHttpInputMessage inputMessage, Map<String, Object> hints) {
Map<String, Object> allHints = Hints.merge(hints, Hints.SUPPRESS_LOGGING_HINT, true);
return this.partReader.read(elementType, inputMessage, allHints)
.collectMultimap(Part::name)
.doOnNext(map -> {
LogFormatUtils.traceDebug(logger, traceOn -> Hints.getLogPrefix(hints) + "Parsed " +
(isEnableLoggingRequestDetails() ?
LogFormatUtils.formatValue(map, !traceOn) :
"parts " + map.keySet() + " (content masked)"));
})
.map(this::toMultiValueMap);
}
代码示例来源:origin: spring-projects/spring-framework
@Override
public Mono<HandlerResult> handle(ServerWebExchange exchange, Object handler) {
HandlerMethod handlerMethod = (HandlerMethod) handler;
Assert.state(this.methodResolver != null && this.modelInitializer != null, "Not initialized");
InitBinderBindingContext bindingContext = new InitBinderBindingContext(
getWebBindingInitializer(), this.methodResolver.getInitBinderMethods(handlerMethod));
InvocableHandlerMethod invocableMethod = this.methodResolver.getRequestMappingMethod(handlerMethod);
Function<Throwable, Mono<HandlerResult>> exceptionHandler =
ex -> handleException(ex, handlerMethod, bindingContext, exchange);
return this.modelInitializer
.initModel(handlerMethod, bindingContext, exchange)
.then(Mono.defer(() -> invocableMethod.invoke(exchange, bindingContext)))
.doOnNext(result -> result.setExceptionHandler(exceptionHandler))
.doOnNext(result -> bindingContext.saveModel())
.onErrorResume(exceptionHandler);
}
代码示例来源:origin: spring-projects/spring-framework
@Override
public Mono<WebSession> getSession(ServerWebExchange exchange) {
return Mono.defer(() -> retrieveSession(exchange)
.switchIfEmpty(this.sessionStore.createWebSession())
.doOnNext(session -> exchange.getResponse().beforeCommit(() -> save(exchange, session))));
}
代码示例来源:origin: spring-projects/spring-framework
/**
* Apply {@link Flux#reduce(Object, BiFunction) reduce} on the body, count
* the number of bytes produced, release data buffers without writing, and
* set the {@literal Content-Length} header.
*/
@Override
public final Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
return Flux.from(body)
.reduce(0, (current, buffer) -> {
int next = current + buffer.readableByteCount();
DataBufferUtils.release(buffer);
return next;
})
.doOnNext(count -> getHeaders().setContentLength(count))
.then();
}
代码示例来源:origin: spring-projects/spring-framework
@Override
public ListenableFuture<Void> connect(TcpConnectionHandler<P> handler, ReconnectStrategy strategy) {
Assert.notNull(handler, "TcpConnectionHandler is required");
Assert.notNull(strategy, "ReconnectStrategy is required");
if (this.stopping) {
return handleShuttingDownConnectFailure(handler);
}
// Report first connect to the ListenableFuture
MonoProcessor<Void> connectMono = MonoProcessor.create();
this.tcpClient
.handle(new ReactorNettyHandler(handler))
.connect()
.doOnNext(updateConnectMono(connectMono))
.doOnError(updateConnectMono(connectMono))
.doOnError(handler::afterConnectFailure) // report all connect failures to the handler
.flatMap(Connection::onDispose) // post-connect issues
.retryWhen(reconnectFunction(strategy))
.repeatWhen(reconnectFunction(strategy))
.subscribe();
return new MonoToListenableFutureAdapter<>(connectMono);
}
代码示例来源:origin: org.springframework/spring-web
@Override
public Mono<WebSession> getSession(ServerWebExchange exchange) {
return Mono.defer(() -> retrieveSession(exchange)
.switchIfEmpty(this.sessionStore.createWebSession())
.doOnNext(session -> exchange.getResponse().beforeCommit(() -> save(exchange, session))));
}
代码示例来源:origin: spring-projects/spring-framework
@Test
public void value() {
Mono<Msg> result = this.webClient.get()
.uri("/message")
.exchange()
.doOnNext(response -> {
Assert.assertFalse(response.headers().contentType().get().getParameters().containsKey("delimited"));
Assert.assertEquals("sample.proto", response.headers().header("X-Protobuf-Schema").get(0));
Assert.assertEquals("Msg", response.headers().header("X-Protobuf-Message").get(0));
})
.flatMap(response -> response.bodyToMono(Msg.class));
StepVerifier.create(result)
.expectNext(TEST_MSG)
.verifyComplete();
}
代码示例来源:origin: spring-projects/spring-framework
@Test
public void values() {
Flux<Msg> result = this.webClient.get()
.uri("/messages")
.exchange()
.doOnNext(response -> {
Assert.assertEquals("true", response.headers().contentType().get().getParameters().get("delimited"));
Assert.assertEquals("sample.proto", response.headers().header("X-Protobuf-Schema").get(0));
Assert.assertEquals("Msg", response.headers().header("X-Protobuf-Message").get(0));
})
.flatMapMany(response -> response.bodyToFlux(Msg.class));
StepVerifier.create(result)
.expectNext(TEST_MSG)
.expectNext(TEST_MSG)
.expectNext(TEST_MSG)
.verifyComplete();
}
代码示例来源:origin: spring-projects/spring-framework
@Override
public Mono<Void> handle(ServerWebExchange exchange) {
if (exchange.getRequest().getQueryParams().containsKey("expire")) {
return exchange.getSession().doOnNext(session -> {
// Don't do anything, leave it expired...
}).then();
}
else if (exchange.getRequest().getQueryParams().containsKey("changeId")) {
return exchange.getSession().flatMap(session ->
session.changeSessionId().doOnSuccess(aVoid -> updateSessionAttribute(session)));
}
else if (exchange.getRequest().getQueryParams().containsKey("invalidate")) {
return exchange.getSession().doOnNext(WebSession::invalidate).then();
}
else {
return exchange.getSession().doOnSuccess(this::updateSessionAttribute).then();
}
}
代码示例来源:origin: spring-projects/spring-framework
private Mono<Void> assertGetFormParts(ServerWebExchange exchange) {
return exchange
.getMultipartData()
.doOnNext(parts -> {
assertEquals(2, parts.size());
assertTrue(parts.containsKey("fooPart"));
assertFooPart(parts.getFirst("fooPart"));
assertTrue(parts.containsKey("barPart"));
assertBarPart(parts.getFirst("barPart"));
})
.then();
}
代码示例来源:origin: spring-projects/spring-framework
@Test // SPR-15674 (in comments)
public void mutateDoesNotCreateNewSession() {
WebTestClient client = WebTestClient
.bindToWebHandler(exchange -> {
if (exchange.getRequest().getURI().getPath().equals("/set")) {
return exchange.getSession()
.doOnNext(session -> session.getAttributes().put("foo", "bar"))
.then();
}
else {
return exchange.getSession()
.map(session -> session.getAttributeOrDefault("foo", "none"))
.flatMap(value -> {
DataBuffer buffer = toDataBuffer(value);
return exchange.getResponse().writeWith(Mono.just(buffer));
});
}
})
.build();
// Set the session attribute
EntityExchangeResult<Void> result = client.get().uri("/set").exchange()
.expectStatus().isOk().expectBody().isEmpty();
ResponseCookie session = result.getResponseCookies().getFirst("SESSION");
// Now get attribute
client.mutate().build()
.get().uri("/get")
.cookie(session.getName(), session.getValue())
.exchange()
.expectBody(String.class).isEqualTo("bar");
}
代码示例来源:origin: spring-projects/spring-framework
@Test
public void streaming() {
Flux<Msg> result = this.webClient.get()
.uri("/message-stream")
.exchange()
.doOnNext(response -> {
Assert.assertEquals("true", response.headers().contentType().get().getParameters().get("delimited"));
Assert.assertEquals("sample.proto", response.headers().header("X-Protobuf-Schema").get(0));
Assert.assertEquals("Msg", response.headers().header("X-Protobuf-Message").get(0));
})
.flatMapMany(response -> response.bodyToFlux(Msg.class));
StepVerifier.create(result)
.expectNext(Msg.newBuilder().setFoo("Foo").setBlah(SecondMsg.newBuilder().setBlah(0).build()).build())
.expectNext(Msg.newBuilder().setFoo("Foo").setBlah(SecondMsg.newBuilder().setBlah(1).build()).build())
.thenCancel()
.verify();
}
内容来源于网络,如有侵权,请联系作者删除!