reactor.core.publisher.Mono.then()方法的使用及代码示例

x33g5p2x  于2022-01-24 转载在 其他  
字(10.1k)|赞(0)|评价(0)|浏览(1783)

本文整理了Java中reactor.core.publisher.Mono.then()方法的一些代码示例,展示了Mono.then()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Mono.then()方法的具体详情如下:
包路径:reactor.core.publisher.Mono
类名称:Mono
方法名:then

Mono.then介绍

[英]Return a Mono which only replays complete and error signals from this Mono.
[中]返回一个单声道,该单声道只回放来自该单声道的完整和错误信号。

代码示例

代码示例来源:origin: spring-projects/spring-framework

@SuppressWarnings("unchecked")
private static <T> Supplier<Mono<T>> skipBodyAsMono(ReactiveHttpInputMessage message) {
  return message instanceof ClientHttpResponse ?
      () -> consumeAndCancel(message).then(Mono.empty()) : Mono::empty;
}

代码示例来源:origin: spring-projects/spring-framework

@Override
public Mono<HandlerResult> handle(ServerWebExchange exchange, Object handler) {
  WebHandler webHandler = (WebHandler) handler;
  Mono<Void> mono = webHandler.handle(exchange);
  return mono.then(Mono.empty());
}

代码示例来源:origin: spring-projects/spring-framework

/**
 * Prepare the model to use for rendering.
 * <p>The default implementation creates a combined output Map that includes
 * model as well as static attributes with the former taking precedence.
 */
protected Mono<Map<String, Object>> getModelAttributes(@Nullable Map<String, ?> model,
    ServerWebExchange exchange) {
  int size = (model != null ? model.size() : 0);
  Map<String, Object> attributes = new LinkedHashMap<>(size);
  if (model != null) {
    attributes.putAll(model);
  }
  return resolveAsyncAttributes(attributes).then(Mono.just(attributes));
}

代码示例来源:origin: spring-projects/spring-framework

@Override
public Mono<HandlerResult> handle(ServerWebExchange exchange, Object handler) {
  WebSocketHandler webSocketHandler = (WebSocketHandler) handler;
  return getWebSocketService().handleRequest(exchange, webSocketHandler).then(Mono.empty());
}

代码示例来源:origin: spring-projects/spring-framework

@Override
public Mono<ServerResponse> build(Publisher<Void> voidPublisher) {
  Assert.notNull(voidPublisher, "Publisher must not be null");
  return build((exchange, handlerStrategies) ->
      Mono.from(voidPublisher).then(exchange.getResponse().setComplete()));
}

代码示例来源:origin: spring-projects/spring-framework

@PostMapping("/mono")
public Mono<Void> createWithMono(@RequestBody Mono<Person> mono) {
  return mono.doOnNext(persons::add).then();
}

代码示例来源:origin: spring-projects/spring-framework

@ModelAttribute
public Mono<Void> voidMonoMethodBean(Model model) {
  return Mono.just("Void Mono Method Bean")
      .doOnNext(name -> model.addAttribute("voidMonoMethodBean", new TestBean(name)))
      .then();
}

代码示例来源:origin: spring-projects/spring-framework

/**
 * Bind query params, form data, and or multipart form data to the binder target.
 * @param exchange the current exchange.
 * @return a {@code Mono<Void>} when binding is complete
 */
public Mono<Void> bind(ServerWebExchange exchange) {
  return getValuesToBind(exchange)
      .doOnNext(values -> doBind(new MutablePropertyValues(values)))
      .then();
}

代码示例来源:origin: spring-projects/spring-framework

@Override
public ListenableFuture<Void> connect(final TcpConnectionHandler<P> handler) {
  Assert.notNull(handler, "TcpConnectionHandler is required");
  if (this.stopping) {
    return handleShuttingDownConnectFailure(handler);
  }
  Mono<Void> connectMono = this.tcpClient
      .handle(new ReactorNettyHandler(handler))
      .connect()
      .doOnError(handler::afterConnectFailure)
      .then();
  return new MonoToListenableFutureAdapter<>(connectMono);
}

代码示例来源:origin: spring-projects/spring-framework

private Mono<Void> executeInternal(URI url, HttpHeaders headers, WebSocketHandler handler) {
  MonoProcessor<Void> completionMono = MonoProcessor.create();
  return Mono.fromCallable(
      () -> {
        if (logger.isDebugEnabled()) {
          logger.debug("Connecting to " + url);
        }
        Object jettyHandler = createHandler(url, handler, completionMono);
        ClientUpgradeRequest request = new ClientUpgradeRequest();
        request.setSubProtocols(handler.getSubProtocols());
        UpgradeListener upgradeListener = new DefaultUpgradeListener(headers);
        return this.jettyClient.connect(jettyHandler, url, request, upgradeListener);
      })
      .then(completionMono);
}

代码示例来源:origin: spring-projects/spring-framework

private Mono<Void> executeInternal(URI url, HttpHeaders requestHeaders, WebSocketHandler handler) {
  MonoProcessor<Void> completionMono = MonoProcessor.create();
  return Mono.fromCallable(
      () -> {
        if (logger.isDebugEnabled()) {
          logger.debug("Connecting to " + url);
        }
        List<String> protocols = handler.getSubProtocols();
        DefaultConfigurator configurator = new DefaultConfigurator(requestHeaders);
        Endpoint endpoint = createEndpoint(url, handler, completionMono, configurator);
        ClientEndpointConfig config = createEndpointConfig(configurator, protocols);
        return this.webSocketContainer.connectToServer(endpoint, config, url);
      })
      .subscribeOn(Schedulers.elastic()) // connectToServer is blocking
      .then(completionMono);
}

代码示例来源:origin: spring-projects/spring-framework

@Test
public void onStatusWithMonoErrorAndBodyConsumed() {
  RuntimeException ex = new RuntimeException("response error");
  testOnStatus(ex, response -> response.bodyToMono(Void.class).then(Mono.error(ex)));
}

代码示例来源:origin: spring-projects/spring-framework

/**
 * Apply {@link Flux#reduce(Object, BiFunction) reduce} on the body, count
 * the number of bytes produced, release data buffers without writing, and
 * set the {@literal Content-Length} header.
 */
@Override
public final Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
  return Flux.from(body)
      .reduce(0, (current, buffer) -> {
        int next = current + buffer.readableByteCount();
        DataBufferUtils.release(buffer);
        return next;
      })
      .doOnNext(count -> getHeaders().setContentLength(count))
      .then();
}

代码示例来源:origin: spring-projects/spring-framework

@Override
public Mono<HandlerResult> handle(ServerWebExchange exchange, Object handler) {
  HandlerMethod handlerMethod = (HandlerMethod) handler;
  Assert.state(this.methodResolver != null && this.modelInitializer != null, "Not initialized");
  InitBinderBindingContext bindingContext = new InitBinderBindingContext(
      getWebBindingInitializer(), this.methodResolver.getInitBinderMethods(handlerMethod));
  InvocableHandlerMethod invocableMethod = this.methodResolver.getRequestMappingMethod(handlerMethod);
  Function<Throwable, Mono<HandlerResult>> exceptionHandler =
      ex -> handleException(ex, handlerMethod, bindingContext, exchange);
  return this.modelInitializer
      .initModel(handlerMethod, bindingContext, exchange)
      .then(Mono.defer(() -> invocableMethod.invoke(exchange, bindingContext)))
      .doOnNext(result -> result.setExceptionHandler(exceptionHandler))
      .doOnNext(result -> bindingContext.saveModel())
      .onErrorResume(exceptionHandler);
}

代码示例来源:origin: spring-projects/spring-framework

@Test
public void captureAndClaim() {
  ClientHttpRequest request = new MockClientHttpRequest(HttpMethod.GET, "/test");
  ClientHttpResponse response = new MockClientHttpResponse(HttpStatus.OK);
  ClientHttpConnector connector = (method, uri, fn) -> fn.apply(request).then(Mono.just(response));
  ClientRequest clientRequest = ClientRequest.create(HttpMethod.GET, URI.create("/test"))
      .header(WebTestClient.WEBTESTCLIENT_REQUEST_ID, "1").build();
  WiretapConnector wiretapConnector = new WiretapConnector(connector);
  ExchangeFunction function = ExchangeFunctions.create(wiretapConnector);
  function.exchange(clientRequest).block(ofMillis(0));
  WiretapConnector.Info actual = wiretapConnector.claimRequest("1");
  ExchangeResult result = actual.createExchangeResult(Duration.ZERO, null);
  assertEquals(HttpMethod.GET, result.getMethod());
  assertEquals("/test", result.getUrl().toString());
}

代码示例来源:origin: spring-projects/spring-framework

@Override
public Mono<Void> handle(ServerWebExchange exchange) {
  if (exchange.getRequest().getQueryParams().containsKey("expire")) {
    return exchange.getSession().doOnNext(session -> {
      // Don't do anything, leave it expired...
    }).then();
  }
  else if (exchange.getRequest().getQueryParams().containsKey("changeId")) {
    return exchange.getSession().flatMap(session ->
        session.changeSessionId().doOnSuccess(aVoid -> updateSessionAttribute(session)));
  }
  else if (exchange.getRequest().getQueryParams().containsKey("invalidate")) {
    return exchange.getSession().doOnNext(WebSession::invalidate).then();
  }
  else {
    return exchange.getSession().doOnSuccess(this::updateSessionAttribute).then();
  }
}

代码示例来源:origin: spring-projects/spring-framework

@Override
public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {
  if (this.forwardedHeaderTransformer != null) {
    request = this.forwardedHeaderTransformer.apply(request);
  }
  ServerWebExchange exchange = createExchange(request, response);
  LogFormatUtils.traceDebug(logger, traceOn ->
      exchange.getLogPrefix() + formatRequest(exchange.getRequest()) +
          (traceOn ? ", headers=" + formatHeaders(exchange.getRequest().getHeaders()) : ""));
  return getDelegate().handle(exchange)
      .doOnSuccess(aVoid -> logResponse(exchange))
      .onErrorResume(ex -> handleUnresolvedError(exchange, ex))
      .then(Mono.defer(response::setComplete));
}

代码示例来源:origin: spring-projects/spring-framework

@Override
public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {
  Mono<Integer> requestSizeMono = request.getBody().
      reduce(0, (integer, dataBuffer) -> integer +
          dataBuffer.readableByteCount()).
      doOnSuccessOrError((size, throwable) -> {
        assertNull(throwable);
        assertEquals(REQUEST_SIZE, (long) size);
      });
  response.getHeaders().setContentLength(RESPONSE_SIZE);
  return requestSizeMono.then(response.writeWith(multipleChunks()));
}

代码示例来源:origin: spring-projects/spring-framework

@Test  // SPR-16231
public void responseCommitted() {
  Throwable ex = new ResponseStatusException(HttpStatus.INTERNAL_SERVER_ERROR, "Oops");
  this.exchange.getResponse().setStatusCode(HttpStatus.CREATED);
  Mono<Void> mono = this.exchange.getResponse().setComplete()
      .then(Mono.defer(() -> this.handler.handle(this.exchange, ex)));
  StepVerifier.create(mono).consumeErrorWith(actual -> assertSame(ex, actual)).verify();
}

代码示例来源:origin: spring-projects/spring-framework

private Mono<Void> assertGetFormParts(ServerWebExchange exchange) {
  return exchange
      .getMultipartData()
      .doOnNext(parts -> {
        assertEquals(2, parts.size());
        assertTrue(parts.containsKey("fooPart"));
        assertFooPart(parts.getFirst("fooPart"));
        assertTrue(parts.containsKey("barPart"));
        assertBarPart(parts.getFirst("barPart"));
      })
      .then();
}

相关文章

Mono类方法