reactor.core.publisher.Mono.from()方法的使用及代码示例

x33g5p2x  于2022-01-24 转载在 其他  
字(10.6k)|赞(0)|评价(0)|浏览(611)

本文整理了Java中reactor.core.publisher.Mono.from()方法的一些代码示例,展示了Mono.from()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Mono.from()方法的具体详情如下:
包路径:reactor.core.publisher.Mono
类名称:Mono
方法名:from

Mono.from介绍

[英]Expose the specified Publisher with the Mono API, and ensure it will emit 0 or 1 item. The source emitter will be cancelled on the first onNext.
[中]使用Mono API公开指定的发布服务器,并确保它将发出0或1项。源发射器将在第一个“onNext”上取消。

代码示例

代码示例来源:origin: spring-projects/spring-framework

@Override
public Mono<Void> write(Publisher<? extends Resource> inputStream, ResolvableType elementType,
    @Nullable MediaType mediaType, ReactiveHttpOutputMessage message, Map<String, Object> hints) {
  return Mono.from(inputStream).flatMap(resource ->
      writeResource(resource, elementType, mediaType, message, hints));
}

代码示例来源:origin: spring-projects/spring-framework

void registerAdapters(ReactiveAdapterRegistry registry) {
    // Register Flux and Mono before Publisher...
    registry.registerReactiveType(
        ReactiveTypeDescriptor.singleOptionalValue(Mono.class, Mono::empty),
        source -> (Mono<?>) source,
        Mono::from
    );
    registry.registerReactiveType(
        ReactiveTypeDescriptor.multiValue(Flux.class, Flux::empty),
        source -> (Flux<?>) source,
        Flux::from);
    registry.registerReactiveType(
        ReactiveTypeDescriptor.multiValue(Publisher.class, Flux::empty),
        source -> (Publisher<?>) source,
        source -> source);
    registry.registerReactiveType(
        ReactiveTypeDescriptor.singleOptionalValue(CompletableFuture.class, () -> {
          CompletableFuture<?> empty = new CompletableFuture<>();
          empty.complete(null);
          return empty;
        }),
        source -> Mono.fromFuture((CompletableFuture<?>) source),
        source -> Mono.from(source).toFuture()
    );
  }
}

代码示例来源:origin: spring-projects/spring-framework

@Override
public ListenableFuture<?> toListenableFuture(Object returnValue, MethodParameter returnType) {
  ReactiveAdapter adapter = this.adapterRegistry.getAdapter(returnType.getParameterType(), returnValue);
  Assert.state(adapter != null, () -> "No ReactiveAdapter found for " + returnType.getParameterType());
  return new MonoToListenableFutureAdapter<>(Mono.from(adapter.toPublisher(returnValue)));
}

代码示例来源:origin: org.springframework/spring-web

@Override
public Mono<Void> write(Publisher<? extends Resource> inputStream, ResolvableType elementType,
    @Nullable MediaType mediaType, ReactiveHttpOutputMessage message, Map<String, Object> hints) {
  return Mono.from(inputStream).flatMap(resource ->
      writeResource(resource, elementType, mediaType, message, hints));
}

代码示例来源:origin: spring-projects/spring-framework

@Override
public Mono<Void> write(Publisher<? extends MultiValueMap<String, ?>> inputStream,
    ResolvableType elementType, @Nullable MediaType mediaType, ReactiveHttpOutputMessage outputMessage,
    Map<String, Object> hints) {
  return Mono.from(inputStream).flatMap(map -> {
    if (this.formWriter == null || isMultipart(map, mediaType)) {
      return writeMultipart(map, outputMessage, hints);
    }
    else {
      @SuppressWarnings("unchecked")
      MultiValueMap<String, String> formData = (MultiValueMap<String, String>) map;
      return this.formWriter.write(Mono.just(formData), elementType, mediaType, outputMessage, hints);
    }
  });
}

代码示例来源:origin: spring-projects/spring-framework

@Override
  public <T> Publisher<T> toPublisher(@Nullable Object source) {
    Publisher<T> publisher = super.toPublisher(source);
    return (isMultiValue() ? Flux.from(publisher) : Mono.from(publisher));
  }
}

代码示例来源:origin: spring-projects/spring-framework

@RequestMapping("/request-body")
  @ResponseBody
  public Publisher<String> requestBody(@RequestBody Publisher<String> body) {
    return Mono.from(body).map(s -> "hello " + s);
  }
}

代码示例来源:origin: spring-projects/spring-framework

@Override
public Mono<Object> resolveArgument(
    MethodParameter parameter, BindingContext context, ServerWebExchange exchange) {
  Mono<Principal> principal = exchange.getPrincipal();
  ReactiveAdapter adapter = getAdapterRegistry().getAdapter(parameter.getParameterType());
  return (adapter != null ? Mono.just(adapter.fromPublisher(principal)) : Mono.from(principal));
}

代码示例来源:origin: spring-projects/spring-framework

@Override
public Mono<Object> resolveArgument(
    MethodParameter parameter, BindingContext context, ServerWebExchange exchange) {
  Mono<WebSession> session = exchange.getSession();
  ReactiveAdapter adapter = getAdapterRegistry().getAdapter(parameter.getParameterType());
  return (adapter != null ? Mono.just(adapter.fromPublisher(session)) : Mono.from(session));
}

代码示例来源:origin: spring-projects/spring-framework

@Override
protected final Mono<Void> writeAndFlushWithInternal(
    Publisher<? extends Publisher<? extends DataBuffer>> body) {
  if (this.writeCalled.compareAndSet(false, true)) {
    Processor<? super Publisher<? extends DataBuffer>, Void> processor = createBodyFlushProcessor();
    return Mono.from(subscriber -> {
      body.subscribe(processor);
      processor.subscribe(subscriber);
    });
  }
  return Mono.error(new IllegalStateException(
      "writeWith() or writeAndFlushWith() has already been called"));
}

代码示例来源:origin: spring-projects/spring-framework

@Override
public Mono<Void> send(Publisher<WebSocketMessage> messages) {
  if (this.sendCalled.compareAndSet(false, true)) {
    WebSocketSendProcessor sendProcessor = new WebSocketSendProcessor();
    this.sendProcessor = sendProcessor;
    return Mono.from(subscriber -> {
        messages.subscribe(sendProcessor);
        sendProcessor.subscribe(subscriber);
    });
  }
  else {
    return Mono.error(new IllegalStateException("send() has already been called"));
  }
}

代码示例来源:origin: spring-projects/spring-framework

@Override
public Mono<ServerResponse> build(Publisher<Void> voidPublisher) {
  Assert.notNull(voidPublisher, "Publisher must not be null");
  return build((exchange, handlerStrategies) ->
      Mono.from(voidPublisher).then(exchange.getResponse().setComplete()));
}

代码示例来源:origin: spring-projects/spring-framework

@Override
public Mono<Void> write(Publisher<? extends MultiValueMap<String, String>> inputStream,
    ResolvableType elementType, @Nullable MediaType mediaType, ReactiveHttpOutputMessage message,
    Map<String, Object> hints) {
  mediaType = getMediaType(mediaType);
  message.getHeaders().setContentType(mediaType);
  Charset charset = mediaType.getCharset();
  Assert.notNull(charset, "No charset"); // should never occur
  return Mono.from(inputStream).flatMap(form -> {
    logFormData(form, hints);
    String value = serializeForm(form, charset);
    ByteBuffer byteBuffer = charset.encode(value);
    DataBuffer buffer = message.bufferFactory().wrap(byteBuffer);
    message.getHeaders().setContentLength(byteBuffer.remaining());
    return message.writeWith(Mono.just(buffer));
  });
}

代码示例来源:origin: org.springframework/spring-core

@Override
  public <T> Publisher<T> toPublisher(@Nullable Object source) {
    Publisher<T> publisher = super.toPublisher(source);
    return (isMultiValue() ? Flux.from(publisher) : Mono.from(publisher));
  }
}

代码示例来源:origin: spring-projects/spring-framework

private static <T> BodyExtractor<Mono<T>, ReactiveHttpInputMessage> toMono(ResolvableType elementType) {
  return (inputMessage, context) ->
      readWithMessageReaders(inputMessage, context, elementType,
          (HttpMessageReader<T> reader) -> readToMono(inputMessage, context, elementType, reader),
          ex -> Mono.from(unsupportedErrorHandler(inputMessage, ex)),
          skipBodyAsMono(inputMessage));
}

代码示例来源:origin: spring-projects/spring-framework

private Mono<Void> handleResult(HandlerResult handlerResult, BindingContext bindingContext) {
  Object value = handlerResult.getReturnValue();
  if (value != null) {
    ResolvableType type = handlerResult.getReturnType();
    ReactiveAdapter adapter = this.adapterRegistry.getAdapter(type.resolve(), value);
    if (isAsyncVoidType(type, adapter)) {
      return Mono.from(adapter.toPublisher(value));
    }
    String name = getAttributeName(handlerResult.getReturnTypeSource());
    bindingContext.getModel().asMap().putIfAbsent(name, value);
  }
  return Mono.empty();
}

代码示例来源:origin: spring-projects/spring-framework

@Override
public Mono<ClientHttpResponse> connect(HttpMethod method, URI uri,
    Function<? super ClientHttpRequest, Mono<Void>> requestCallback) {
  if (!uri.isAbsolute()) {
    return Mono.error(new IllegalArgumentException("URI is not absolute: " + uri));
  }
  if (!this.httpClient.isStarted()) {
    try {
      this.httpClient.start();
    }
    catch (Exception ex) {
      return Mono.error(ex);
    }
  }
  JettyClientHttpRequest clientHttpRequest = new JettyClientHttpRequest(
      this.httpClient.newRequest(uri).method(method.toString()), this.bufferFactory);
  return requestCallback.apply(clientHttpRequest).then(Mono.from(
      clientHttpRequest.getReactiveRequest().response((response, chunks) -> {
        Flux<DataBuffer> content = Flux.from(chunks).map(this::toDataBuffer);
        return Mono.just(new JettyClientHttpResponse(response, content));
      })));
}

代码示例来源:origin: spring-projects/spring-framework

@SuppressWarnings("unchecked")
@Override
public Mono<Void> write(Publisher<? extends T> inputStream, ResolvableType elementType,
    @Nullable MediaType mediaType, ReactiveHttpOutputMessage message, Map<String, Object> hints) {
  MediaType contentType = updateContentType(message, mediaType);
  Flux<DataBuffer> body = this.encoder.encode(
      inputStream, message.bufferFactory(), elementType, contentType, hints);
  if (inputStream instanceof Mono) {
    HttpHeaders headers = message.getHeaders();
    return Mono.from(body)
        .switchIfEmpty(Mono.defer(() -> {
          headers.setContentLength(0);
          return message.setComplete().then(Mono.empty());
        }))
        .flatMap(buffer -> {
          headers.setContentLength(buffer.readableByteCount());
          return message.writeWith(Mono.just(buffer));
        });
  }
  return (isStreamingMediaType(contentType) ?
      message.writeAndFlushWith(body.map(Flux::just)) : message.writeWith(body));
}

代码示例来源:origin: spring-projects/spring-framework

private Mono<?> prepareAttributeMono(String attributeName, ResolvableType attributeType,
    BindingContext context, ServerWebExchange exchange) {
  Object attribute = context.getModel().asMap().get(attributeName);
  if (attribute == null) {
    attribute = findAndRemoveReactiveAttribute(context.getModel(), attributeName);
  }
  if (attribute == null) {
    return createAttribute(attributeName, attributeType.toClass(), context, exchange);
  }
  ReactiveAdapter adapterFrom = getAdapterRegistry().getAdapter(null, attribute);
  if (adapterFrom != null) {
    Assert.isTrue(!adapterFrom.isMultiValue(), "Data binding only supports single-value async types");
    return Mono.from(adapterFrom.toPublisher(attribute));
  }
  else {
    return Mono.justOrEmpty(attribute);
  }
}

代码示例来源:origin: spring-projects/spring-framework

@Test
public void validationErrorToSingle() throws Exception {
  MethodParameter parameter = this.testMethod
      .annotPresent(ModelAttribute.class).arg(Single.class, Foo.class);
  testValidationError(parameter,
      resolvedArgumentMono -> {
        Object value = resolvedArgumentMono.block(Duration.ofSeconds(5));
        assertNotNull(value);
        assertTrue(value instanceof Single);
        return Mono.from(RxReactiveStreams.toPublisher((Single<?>) value));
      });
}

相关文章

Mono类方法