reactor.core.publisher.Mono.doOnNext()方法的使用及代码示例

x33g5p2x  于2022-01-24 转载在 其他  
字(11.7k)|赞(0)|评价(0)|浏览(1050)

本文整理了Java中reactor.core.publisher.Mono.doOnNext()方法的一些代码示例,展示了Mono.doOnNext()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Mono.doOnNext()方法的具体详情如下:
包路径:reactor.core.publisher.Mono
类名称:Mono
方法名:doOnNext

Mono.doOnNext介绍

[英]Add behavior triggered when the Mono emits a data successfully.
[中]添加Mono成功发送数据时触发的行为。

代码示例

代码示例来源:origin: spring-projects/spring-framework

@Override
protected Mono<String> resolveUrlPathInternal(String resourceUrlPath,
    List<? extends Resource> locations, ResourceResolverChain chain) {
  String key = RESOLVED_URL_PATH_CACHE_KEY_PREFIX + resourceUrlPath;
  String cachedUrlPath = this.cache.get(key, String.class);
  if (cachedUrlPath != null) {
    logger.trace("Path resolved from cache");
    return Mono.just(cachedUrlPath);
  }
  return chain.resolveUrlPath(resourceUrlPath, locations)
      .doOnNext(resolvedPath -> this.cache.put(key, resolvedPath));
}

代码示例来源:origin: spring-projects/spring-framework

@Override
public Mono<Resource> transform(ServerWebExchange exchange, Resource resource,
    ResourceTransformerChain transformerChain) {
  Resource cachedResource = this.cache.get(resource, Resource.class);
  if (cachedResource != null) {
    logger.trace(exchange.getLogPrefix() + "Resource resolved from cache");
    return Mono.just(cachedResource);
  }
  return transformerChain.transform(exchange, resource)
      .doOnNext(transformed -> this.cache.put(resource, transformed));
}

代码示例来源:origin: spring-projects/spring-framework

@Override
protected Mono<Resource> resolveResourceInternal(@Nullable ServerWebExchange exchange,
    String requestPath, List<? extends Resource> locations, ResourceResolverChain chain) {
  String key = computeKey(exchange, requestPath);
  Resource cachedResource = this.cache.get(key, Resource.class);
  if (cachedResource != null) {
    String logPrefix = exchange != null ? exchange.getLogPrefix() : "";
    logger.trace(logPrefix + "Resource resolved from cache");
    return Mono.just(cachedResource);
  }
  return chain.resolveResource(exchange, requestPath, locations)
      .doOnNext(resource -> this.cache.put(key, resource));
}

代码示例来源:origin: spring-projects/spring-framework

@Override
protected Mono<?> getHandlerInternal(ServerWebExchange exchange) {
  if (this.routerFunction != null) {
    ServerRequest request = ServerRequest.create(exchange, this.messageReaders);
    return this.routerFunction.route(request)
        .doOnNext(handler -> setAttributes(exchange.getAttributes(), request, handler));
  }
  else {
    return Mono.empty();
  }
}

代码示例来源:origin: spring-projects/spring-framework

@Override
public Mono<HandlerFunction<T>> route(ServerRequest serverRequest) {
  return this.predicate.nest(serverRequest)
      .map(nestedRequest -> {
            if (logger.isTraceEnabled()) {
              String logPrefix = serverRequest.exchange().getLogPrefix();
              logger.trace(logPrefix + String.format("Matched nested %s", this.predicate));
            }
            return this.routerFunction.route(nestedRequest)
                .doOnNext(match -> {
                  if (nestedRequest != serverRequest) {
                    serverRequest.attributes().clear();
                    serverRequest.attributes()
                        .putAll(nestedRequest.attributes());
                  }
                });
          }
      ).orElseGet(Mono::empty);
}

代码示例来源:origin: spring-projects/spring-framework

@PostMapping("/mono")
public Mono<Void> createWithMono(@RequestBody Mono<Person> mono) {
  return mono.doOnNext(persons::add).then();
}

代码示例来源:origin: spring-projects/spring-framework

@ModelAttribute
public Mono<Void> voidMonoMethodBean(Model model) {
  return Mono.just("Void Mono Method Bean")
      .doOnNext(name -> model.addAttribute("voidMonoMethodBean", new TestBean(name)))
      .then();
}

代码示例来源:origin: spring-projects/spring-framework

/**
 * Bind query params, form data, and or multipart form data to the binder target.
 * @param exchange the current exchange.
 * @return a {@code Mono<Void>} when binding is complete
 */
public Mono<Void> bind(ServerWebExchange exchange) {
  return getValuesToBind(exchange)
      .doOnNext(values -> doBind(new MutablePropertyValues(values)))
      .then();
}

代码示例来源:origin: spring-projects/spring-framework

@Override
public Mono<MultiValueMap<String, Part>> readMono(ResolvableType elementType,
    ReactiveHttpInputMessage inputMessage, Map<String, Object> hints) {
  Map<String, Object> allHints = Hints.merge(hints, Hints.SUPPRESS_LOGGING_HINT, true);
  return this.partReader.read(elementType, inputMessage, allHints)
      .collectMultimap(Part::name)
      .doOnNext(map -> {
        LogFormatUtils.traceDebug(logger, traceOn -> Hints.getLogPrefix(hints) + "Parsed " +
            (isEnableLoggingRequestDetails() ?
                LogFormatUtils.formatValue(map, !traceOn) :
                "parts " + map.keySet() + " (content masked)"));
      })
      .map(this::toMultiValueMap);
}

代码示例来源:origin: spring-projects/spring-framework

@Override
public Mono<HandlerResult> handle(ServerWebExchange exchange, Object handler) {
  HandlerMethod handlerMethod = (HandlerMethod) handler;
  Assert.state(this.methodResolver != null && this.modelInitializer != null, "Not initialized");
  InitBinderBindingContext bindingContext = new InitBinderBindingContext(
      getWebBindingInitializer(), this.methodResolver.getInitBinderMethods(handlerMethod));
  InvocableHandlerMethod invocableMethod = this.methodResolver.getRequestMappingMethod(handlerMethod);
  Function<Throwable, Mono<HandlerResult>> exceptionHandler =
      ex -> handleException(ex, handlerMethod, bindingContext, exchange);
  return this.modelInitializer
      .initModel(handlerMethod, bindingContext, exchange)
      .then(Mono.defer(() -> invocableMethod.invoke(exchange, bindingContext)))
      .doOnNext(result -> result.setExceptionHandler(exceptionHandler))
      .doOnNext(result -> bindingContext.saveModel())
      .onErrorResume(exceptionHandler);
}

代码示例来源:origin: spring-projects/spring-framework

@Override
public Mono<WebSession> getSession(ServerWebExchange exchange) {
  return Mono.defer(() -> retrieveSession(exchange)
      .switchIfEmpty(this.sessionStore.createWebSession())
      .doOnNext(session -> exchange.getResponse().beforeCommit(() -> save(exchange, session))));
}

代码示例来源:origin: spring-projects/spring-framework

/**
 * Apply {@link Flux#reduce(Object, BiFunction) reduce} on the body, count
 * the number of bytes produced, release data buffers without writing, and
 * set the {@literal Content-Length} header.
 */
@Override
public final Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
  return Flux.from(body)
      .reduce(0, (current, buffer) -> {
        int next = current + buffer.readableByteCount();
        DataBufferUtils.release(buffer);
        return next;
      })
      .doOnNext(count -> getHeaders().setContentLength(count))
      .then();
}

代码示例来源:origin: spring-projects/spring-framework

@Override
public ListenableFuture<Void> connect(TcpConnectionHandler<P> handler, ReconnectStrategy strategy) {
  Assert.notNull(handler, "TcpConnectionHandler is required");
  Assert.notNull(strategy, "ReconnectStrategy is required");
  if (this.stopping) {
    return handleShuttingDownConnectFailure(handler);
  }
  // Report first connect to the ListenableFuture
  MonoProcessor<Void> connectMono = MonoProcessor.create();
  this.tcpClient
      .handle(new ReactorNettyHandler(handler))
      .connect()
      .doOnNext(updateConnectMono(connectMono))
      .doOnError(updateConnectMono(connectMono))
      .doOnError(handler::afterConnectFailure)    // report all connect failures to the handler
      .flatMap(Connection::onDispose)             // post-connect issues
      .retryWhen(reconnectFunction(strategy))
      .repeatWhen(reconnectFunction(strategy))
      .subscribe();
  return new MonoToListenableFutureAdapter<>(connectMono);
}

代码示例来源:origin: org.springframework/spring-web

@Override
public Mono<WebSession> getSession(ServerWebExchange exchange) {
  return Mono.defer(() -> retrieveSession(exchange)
      .switchIfEmpty(this.sessionStore.createWebSession())
      .doOnNext(session -> exchange.getResponse().beforeCommit(() -> save(exchange, session))));
}

代码示例来源:origin: spring-projects/spring-framework

@Test
public void value() {
  Mono<Msg> result = this.webClient.get()
      .uri("/message")
      .exchange()
      .doOnNext(response -> {
        Assert.assertFalse(response.headers().contentType().get().getParameters().containsKey("delimited"));
        Assert.assertEquals("sample.proto", response.headers().header("X-Protobuf-Schema").get(0));
        Assert.assertEquals("Msg", response.headers().header("X-Protobuf-Message").get(0));
      })
      .flatMap(response -> response.bodyToMono(Msg.class));
  StepVerifier.create(result)
      .expectNext(TEST_MSG)
      .verifyComplete();
}

代码示例来源:origin: spring-projects/spring-framework

@Test
public void values() {
  Flux<Msg> result = this.webClient.get()
      .uri("/messages")
      .exchange()
      .doOnNext(response -> {
        Assert.assertEquals("true", response.headers().contentType().get().getParameters().get("delimited"));
        Assert.assertEquals("sample.proto", response.headers().header("X-Protobuf-Schema").get(0));
        Assert.assertEquals("Msg", response.headers().header("X-Protobuf-Message").get(0));
      })
      .flatMapMany(response -> response.bodyToFlux(Msg.class));
  StepVerifier.create(result)
      .expectNext(TEST_MSG)
      .expectNext(TEST_MSG)
      .expectNext(TEST_MSG)
      .verifyComplete();
}

代码示例来源:origin: spring-projects/spring-framework

@Override
public Mono<Void> handle(ServerWebExchange exchange) {
  if (exchange.getRequest().getQueryParams().containsKey("expire")) {
    return exchange.getSession().doOnNext(session -> {
      // Don't do anything, leave it expired...
    }).then();
  }
  else if (exchange.getRequest().getQueryParams().containsKey("changeId")) {
    return exchange.getSession().flatMap(session ->
        session.changeSessionId().doOnSuccess(aVoid -> updateSessionAttribute(session)));
  }
  else if (exchange.getRequest().getQueryParams().containsKey("invalidate")) {
    return exchange.getSession().doOnNext(WebSession::invalidate).then();
  }
  else {
    return exchange.getSession().doOnSuccess(this::updateSessionAttribute).then();
  }
}

代码示例来源:origin: spring-projects/spring-framework

private Mono<Void> assertGetFormParts(ServerWebExchange exchange) {
  return exchange
      .getMultipartData()
      .doOnNext(parts -> {
        assertEquals(2, parts.size());
        assertTrue(parts.containsKey("fooPart"));
        assertFooPart(parts.getFirst("fooPart"));
        assertTrue(parts.containsKey("barPart"));
        assertBarPart(parts.getFirst("barPart"));
      })
      .then();
}

代码示例来源:origin: spring-projects/spring-framework

@Test // SPR-15674 (in comments)
public void mutateDoesNotCreateNewSession() {
  WebTestClient client = WebTestClient
      .bindToWebHandler(exchange -> {
        if (exchange.getRequest().getURI().getPath().equals("/set")) {
          return exchange.getSession()
              .doOnNext(session -> session.getAttributes().put("foo", "bar"))
              .then();
        }
        else {
          return exchange.getSession()
              .map(session -> session.getAttributeOrDefault("foo", "none"))
              .flatMap(value -> {
                DataBuffer buffer = toDataBuffer(value);
                return exchange.getResponse().writeWith(Mono.just(buffer));
              });
        }
      })
      .build();
  // Set the session attribute
  EntityExchangeResult<Void> result = client.get().uri("/set").exchange()
      .expectStatus().isOk().expectBody().isEmpty();
  ResponseCookie session = result.getResponseCookies().getFirst("SESSION");
  // Now get attribute
  client.mutate().build()
      .get().uri("/get")
      .cookie(session.getName(), session.getValue())
      .exchange()
      .expectBody(String.class).isEqualTo("bar");
}

代码示例来源:origin: spring-projects/spring-framework

@Test
public void streaming() {
  Flux<Msg> result = this.webClient.get()
      .uri("/message-stream")
      .exchange()
      .doOnNext(response -> {
        Assert.assertEquals("true", response.headers().contentType().get().getParameters().get("delimited"));
        Assert.assertEquals("sample.proto", response.headers().header("X-Protobuf-Schema").get(0));
        Assert.assertEquals("Msg", response.headers().header("X-Protobuf-Message").get(0));
      })
      .flatMapMany(response -> response.bodyToFlux(Msg.class));
  StepVerifier.create(result)
      .expectNext(Msg.newBuilder().setFoo("Foo").setBlah(SecondMsg.newBuilder().setBlah(0).build()).build())
      .expectNext(Msg.newBuilder().setFoo("Foo").setBlah(SecondMsg.newBuilder().setBlah(1).build()).build())
      .thenCancel()
      .verify();
}

相关文章

Mono类方法