本文整理了Java中reactor.core.publisher.Mono.from()
方法的一些代码示例,展示了Mono.from()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Mono.from()
方法的具体详情如下:
包路径:reactor.core.publisher.Mono
类名称:Mono
方法名:from
[英]Expose the specified Publisher with the Mono API, and ensure it will emit 0 or 1 item. The source emitter will be cancelled on the first onNext
.
[中]使用Mono API公开指定的发布服务器,并确保它将发出0或1项。源发射器将在第一个“onNext”上取消。
代码示例来源:origin: spring-projects/spring-framework
@Override
public Mono<Void> write(Publisher<? extends Resource> inputStream, ResolvableType elementType,
@Nullable MediaType mediaType, ReactiveHttpOutputMessage message, Map<String, Object> hints) {
return Mono.from(inputStream).flatMap(resource ->
writeResource(resource, elementType, mediaType, message, hints));
}
代码示例来源:origin: spring-projects/spring-framework
void registerAdapters(ReactiveAdapterRegistry registry) {
// Register Flux and Mono before Publisher...
registry.registerReactiveType(
ReactiveTypeDescriptor.singleOptionalValue(Mono.class, Mono::empty),
source -> (Mono<?>) source,
Mono::from
);
registry.registerReactiveType(
ReactiveTypeDescriptor.multiValue(Flux.class, Flux::empty),
source -> (Flux<?>) source,
Flux::from);
registry.registerReactiveType(
ReactiveTypeDescriptor.multiValue(Publisher.class, Flux::empty),
source -> (Publisher<?>) source,
source -> source);
registry.registerReactiveType(
ReactiveTypeDescriptor.singleOptionalValue(CompletableFuture.class, () -> {
CompletableFuture<?> empty = new CompletableFuture<>();
empty.complete(null);
return empty;
}),
source -> Mono.fromFuture((CompletableFuture<?>) source),
source -> Mono.from(source).toFuture()
);
}
}
代码示例来源:origin: spring-projects/spring-framework
@Override
public ListenableFuture<?> toListenableFuture(Object returnValue, MethodParameter returnType) {
ReactiveAdapter adapter = this.adapterRegistry.getAdapter(returnType.getParameterType(), returnValue);
Assert.state(adapter != null, () -> "No ReactiveAdapter found for " + returnType.getParameterType());
return new MonoToListenableFutureAdapter<>(Mono.from(adapter.toPublisher(returnValue)));
}
代码示例来源:origin: org.springframework/spring-web
@Override
public Mono<Void> write(Publisher<? extends Resource> inputStream, ResolvableType elementType,
@Nullable MediaType mediaType, ReactiveHttpOutputMessage message, Map<String, Object> hints) {
return Mono.from(inputStream).flatMap(resource ->
writeResource(resource, elementType, mediaType, message, hints));
}
代码示例来源:origin: spring-projects/spring-framework
@Override
public Mono<Void> write(Publisher<? extends MultiValueMap<String, ?>> inputStream,
ResolvableType elementType, @Nullable MediaType mediaType, ReactiveHttpOutputMessage outputMessage,
Map<String, Object> hints) {
return Mono.from(inputStream).flatMap(map -> {
if (this.formWriter == null || isMultipart(map, mediaType)) {
return writeMultipart(map, outputMessage, hints);
}
else {
@SuppressWarnings("unchecked")
MultiValueMap<String, String> formData = (MultiValueMap<String, String>) map;
return this.formWriter.write(Mono.just(formData), elementType, mediaType, outputMessage, hints);
}
});
}
代码示例来源:origin: spring-projects/spring-framework
@Override
public <T> Publisher<T> toPublisher(@Nullable Object source) {
Publisher<T> publisher = super.toPublisher(source);
return (isMultiValue() ? Flux.from(publisher) : Mono.from(publisher));
}
}
代码示例来源:origin: spring-projects/spring-framework
@RequestMapping("/request-body")
@ResponseBody
public Publisher<String> requestBody(@RequestBody Publisher<String> body) {
return Mono.from(body).map(s -> "hello " + s);
}
}
代码示例来源:origin: spring-projects/spring-framework
@Override
public Mono<Object> resolveArgument(
MethodParameter parameter, BindingContext context, ServerWebExchange exchange) {
Mono<Principal> principal = exchange.getPrincipal();
ReactiveAdapter adapter = getAdapterRegistry().getAdapter(parameter.getParameterType());
return (adapter != null ? Mono.just(adapter.fromPublisher(principal)) : Mono.from(principal));
}
代码示例来源:origin: spring-projects/spring-framework
@Override
public Mono<Object> resolveArgument(
MethodParameter parameter, BindingContext context, ServerWebExchange exchange) {
Mono<WebSession> session = exchange.getSession();
ReactiveAdapter adapter = getAdapterRegistry().getAdapter(parameter.getParameterType());
return (adapter != null ? Mono.just(adapter.fromPublisher(session)) : Mono.from(session));
}
代码示例来源:origin: spring-projects/spring-framework
@Override
protected final Mono<Void> writeAndFlushWithInternal(
Publisher<? extends Publisher<? extends DataBuffer>> body) {
if (this.writeCalled.compareAndSet(false, true)) {
Processor<? super Publisher<? extends DataBuffer>, Void> processor = createBodyFlushProcessor();
return Mono.from(subscriber -> {
body.subscribe(processor);
processor.subscribe(subscriber);
});
}
return Mono.error(new IllegalStateException(
"writeWith() or writeAndFlushWith() has already been called"));
}
代码示例来源:origin: spring-projects/spring-framework
@Override
public Mono<Void> send(Publisher<WebSocketMessage> messages) {
if (this.sendCalled.compareAndSet(false, true)) {
WebSocketSendProcessor sendProcessor = new WebSocketSendProcessor();
this.sendProcessor = sendProcessor;
return Mono.from(subscriber -> {
messages.subscribe(sendProcessor);
sendProcessor.subscribe(subscriber);
});
}
else {
return Mono.error(new IllegalStateException("send() has already been called"));
}
}
代码示例来源:origin: spring-projects/spring-framework
@Override
public Mono<ServerResponse> build(Publisher<Void> voidPublisher) {
Assert.notNull(voidPublisher, "Publisher must not be null");
return build((exchange, handlerStrategies) ->
Mono.from(voidPublisher).then(exchange.getResponse().setComplete()));
}
代码示例来源:origin: spring-projects/spring-framework
@Override
public Mono<Void> write(Publisher<? extends MultiValueMap<String, String>> inputStream,
ResolvableType elementType, @Nullable MediaType mediaType, ReactiveHttpOutputMessage message,
Map<String, Object> hints) {
mediaType = getMediaType(mediaType);
message.getHeaders().setContentType(mediaType);
Charset charset = mediaType.getCharset();
Assert.notNull(charset, "No charset"); // should never occur
return Mono.from(inputStream).flatMap(form -> {
logFormData(form, hints);
String value = serializeForm(form, charset);
ByteBuffer byteBuffer = charset.encode(value);
DataBuffer buffer = message.bufferFactory().wrap(byteBuffer);
message.getHeaders().setContentLength(byteBuffer.remaining());
return message.writeWith(Mono.just(buffer));
});
}
代码示例来源:origin: org.springframework/spring-core
@Override
public <T> Publisher<T> toPublisher(@Nullable Object source) {
Publisher<T> publisher = super.toPublisher(source);
return (isMultiValue() ? Flux.from(publisher) : Mono.from(publisher));
}
}
代码示例来源:origin: spring-projects/spring-framework
private static <T> BodyExtractor<Mono<T>, ReactiveHttpInputMessage> toMono(ResolvableType elementType) {
return (inputMessage, context) ->
readWithMessageReaders(inputMessage, context, elementType,
(HttpMessageReader<T> reader) -> readToMono(inputMessage, context, elementType, reader),
ex -> Mono.from(unsupportedErrorHandler(inputMessage, ex)),
skipBodyAsMono(inputMessage));
}
代码示例来源:origin: spring-projects/spring-framework
private Mono<Void> handleResult(HandlerResult handlerResult, BindingContext bindingContext) {
Object value = handlerResult.getReturnValue();
if (value != null) {
ResolvableType type = handlerResult.getReturnType();
ReactiveAdapter adapter = this.adapterRegistry.getAdapter(type.resolve(), value);
if (isAsyncVoidType(type, adapter)) {
return Mono.from(adapter.toPublisher(value));
}
String name = getAttributeName(handlerResult.getReturnTypeSource());
bindingContext.getModel().asMap().putIfAbsent(name, value);
}
return Mono.empty();
}
代码示例来源:origin: spring-projects/spring-framework
@Override
public Mono<ClientHttpResponse> connect(HttpMethod method, URI uri,
Function<? super ClientHttpRequest, Mono<Void>> requestCallback) {
if (!uri.isAbsolute()) {
return Mono.error(new IllegalArgumentException("URI is not absolute: " + uri));
}
if (!this.httpClient.isStarted()) {
try {
this.httpClient.start();
}
catch (Exception ex) {
return Mono.error(ex);
}
}
JettyClientHttpRequest clientHttpRequest = new JettyClientHttpRequest(
this.httpClient.newRequest(uri).method(method.toString()), this.bufferFactory);
return requestCallback.apply(clientHttpRequest).then(Mono.from(
clientHttpRequest.getReactiveRequest().response((response, chunks) -> {
Flux<DataBuffer> content = Flux.from(chunks).map(this::toDataBuffer);
return Mono.just(new JettyClientHttpResponse(response, content));
})));
}
代码示例来源:origin: spring-projects/spring-framework
@SuppressWarnings("unchecked")
@Override
public Mono<Void> write(Publisher<? extends T> inputStream, ResolvableType elementType,
@Nullable MediaType mediaType, ReactiveHttpOutputMessage message, Map<String, Object> hints) {
MediaType contentType = updateContentType(message, mediaType);
Flux<DataBuffer> body = this.encoder.encode(
inputStream, message.bufferFactory(), elementType, contentType, hints);
if (inputStream instanceof Mono) {
HttpHeaders headers = message.getHeaders();
return Mono.from(body)
.switchIfEmpty(Mono.defer(() -> {
headers.setContentLength(0);
return message.setComplete().then(Mono.empty());
}))
.flatMap(buffer -> {
headers.setContentLength(buffer.readableByteCount());
return message.writeWith(Mono.just(buffer));
});
}
return (isStreamingMediaType(contentType) ?
message.writeAndFlushWith(body.map(Flux::just)) : message.writeWith(body));
}
代码示例来源:origin: spring-projects/spring-framework
private Mono<?> prepareAttributeMono(String attributeName, ResolvableType attributeType,
BindingContext context, ServerWebExchange exchange) {
Object attribute = context.getModel().asMap().get(attributeName);
if (attribute == null) {
attribute = findAndRemoveReactiveAttribute(context.getModel(), attributeName);
}
if (attribute == null) {
return createAttribute(attributeName, attributeType.toClass(), context, exchange);
}
ReactiveAdapter adapterFrom = getAdapterRegistry().getAdapter(null, attribute);
if (adapterFrom != null) {
Assert.isTrue(!adapterFrom.isMultiValue(), "Data binding only supports single-value async types");
return Mono.from(adapterFrom.toPublisher(attribute));
}
else {
return Mono.justOrEmpty(attribute);
}
}
代码示例来源:origin: spring-projects/spring-framework
@Test
public void validationErrorToSingle() throws Exception {
MethodParameter parameter = this.testMethod
.annotPresent(ModelAttribute.class).arg(Single.class, Foo.class);
testValidationError(parameter,
resolvedArgumentMono -> {
Object value = resolvedArgumentMono.block(Duration.ofSeconds(5));
assertNotNull(value);
assertTrue(value instanceof Single);
return Mono.from(RxReactiveStreams.toPublisher((Single<?>) value));
});
}
内容来源于网络,如有侵权,请联系作者删除!