[英]A Reactive Streams Publisher with basic rx operators that completes successfully by emitting an element, or with an error.
The recommended way to learn about the Mono API and discover new operators is through the reference documentation, rather than through this javadoc (as opposed to learning more about individual operators). See the "which operator do I need?" appendix.
The rx operators will offer aliases for input Mono type to preserve the "at most one" property of the resulting Mono. For instance Mono#flatMap returns a Mono, while there is a Mono#flatMapMany alias with possibly more than 1 emission.
Mono should be used for Publisher that just completes without any value.
It is intended to be used in implementations and return types, input parameters should keep using raw Publisher as much as possible.
Note that using state in the java.util.function / lambdas used within Mono operators should be avoided, as these may be shared between several Subscriber.
学习MonoAPI和发现新操作符的推荐方法是通过参考文档,而不是通过这个javadoc(而不是学习更多关于单个操作符的内容)。请参阅"which operator do I need?" appendix。
* Get the size of sorted set with {@literal key}.
* @param key must not be {@literal null}.
* @return
* @see <a href="http://redis.io/commands/zcard">Redis Documentation: ZCARD</a>
default Mono<Long> zCard(ByteBuffer key) {
Assert.notNull(key, "Key must not be null!");
return zCard(Mono.just(new KeyCommand(key))).next().map(NumericResponse::getOutput);
public ResponseSpec exchange() {
ClientResponse clientResponse = this.bodySpec.exchange().block(getTimeout());
Assert.state(clientResponse != null, "No ClientResponse");
WiretapConnector.Info info = wiretapConnector.claimRequest(this.requestId);
return new DefaultResponseSpec(info, clientResponse, this.uriTemplate, getTimeout());
* Adapt the given request processor function to a filter function that only
* operates on the {@code ClientRequest}.
* @param processor the request processor
* @return the resulting filter adapter
static ExchangeFilterFunction ofRequestProcessor(Function<ClientRequest, Mono<ClientRequest>> processor) {
Assert.notNull(processor, "ClientRequest Function must not be null");
return (request, next) -> processor.apply(request).flatMap(next::exchange);
private <R> Mono<R> createNotFoundError() {
return Mono.defer(() -> {
Exception ex = new ResponseStatusException(HttpStatus.NOT_FOUND, "No matching handler");
return Mono.error(ex);
public Mono<HandlerResult> handle(ServerWebExchange exchange, Object handler) {
WebHandler webHandler = (WebHandler) handler;
Mono<Void> mono = webHandler.handle(exchange);
return mono.then(Mono.empty());
public Mono<HandlerFunction<ServerResponse>> route(ServerRequest request) {
return this.first.route(request)
.switchIfEmpty(Mono.defer(() -> this.second.route(request).map(RouterFunctions::cast)));
public Flux<BooleanResponse<KeyCommand>> exists(Publisher<KeyCommand> commands) {
return connection.execute(cmd -> Flux.from(commands).concatMap((command) -> {
Assert.notNull(command.getKey(), "Key must not be null!");
return cmd.exists(command.getKey()).map(LettuceConverters.longToBooleanConverter()::convert)
.map((value) -> new BooleanResponse<>(command, value));
* Get elements in {@literal range} from sorted set in reverse {@literal score} ordering.
* @param key must not be {@literal null}.
* @param range must not be {@literal null}.
* @return
* @see <a href="http://redis.io/commands/zrevrange">Redis Documentation: ZREVRANGE</a>
default Flux<ByteBuffer> zRevRange(ByteBuffer key, Range<Long> range) {
Assert.notNull(key, "Key must not be null!");
return zRange(Mono.just(ZRangeCommand.reverseValuesWithin(range).from(key))).flatMap(CommandResponse::getOutput)
.map(tuple -> ByteBuffer.wrap(tuple.getValue()));
public Mono<Long> unionAndStore(K key, Collection<K> otherKeys, K destKey, Aggregate aggregate, Weights weights) {
Assert.notNull(key, "Key must not be null!");
Assert.notNull(otherKeys, "Other keys must not be null!");
Assert.notNull(destKey, "Destination key must not be null!");
Assert.notNull(aggregate, "Aggregate must not be null!");
Assert.notNull(weights, "Weights must not be null!");
return createMono(connection -> Flux.fromIterable(getKeys(key, otherKeys)) //
.map(this::rawKey) //
.collectList() //
.flatMap(serialized -> connection.zUnionStore(rawKey(destKey), serialized, weights, aggregate)));
public Mono<Long> intersectAndStore(K key, Collection<K> otherKeys, K destKey, Aggregate aggregate, Weights weights) {
Assert.notNull(key, "Key must not be null!");
Assert.notNull(otherKeys, "Other keys must not be null!");
Assert.notNull(destKey, "Destination key must not be null!");
Assert.notNull(aggregate, "Aggregate must not be null!");
Assert.notNull(weights, "Weights must not be null!");
return createMono(connection -> Flux.fromIterable(getKeys(key, otherKeys)) //
.map(this::rawKey) //
.collectList() //
.flatMap(serialized -> connection.zInterStore(rawKey(destKey), serialized, weights, aggregate)));
public Mono<ServerResponse> syncBody(Object body) {
Assert.notNull(body, "Body must not be null");
Assert.isTrue(!(body instanceof Publisher),
"Please specify the element class by using body(Publisher, Class)");
return new DefaultEntityResponseBuilder<>(body,
.map(entityResponse -> entityResponse);
代码示例来源:origin: spring-projects/spring-data-redis
Assert.notNull(key, "Key must not be null!");
Assert.notEmpty(members, "Members must not be null or empty!");
Assert.noNullElements(members, "Members must not contain null elements!");
return template.createMono(connection -> Flux.fromArray(members) //
.map(this::rawValue) //
.collectList() //
.flatMap(serialized -> connection.zSetCommands().zRem(rawKey(key), serialized)));
ResolvableType elementType, @Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
Assert.notNull(inputStream, "'inputStream' must not be null");
Assert.notNull(bufferFactory, "'bufferFactory' must not be null");
Assert.notNull(elementType, "'elementType' must not be null");
return Mono.from(inputStream).map(value ->
encodeValue(value, mimeType, bufferFactory, elementType, hints, encoding)).flux();
byte[] separator =
return Flux.from(inputStream).map(value -> {
DataBuffer buffer =
encodeValue(value, mimeType, bufferFactory, elementType, hints,
ResolvableType listType =
ResolvableType.forClassWithGenerics(List.class, elementType);
return Flux.from(inputStream).collectList().map(list ->
encodeValue(list, mimeType, bufferFactory, listType, hints,
* Return a new {@code DataBuffer} composed from joining together the given
* {@code dataBuffers} elements. Depending on the {@link DataBuffer} type,
* the returned buffer may be a single buffer containing all data of the
* provided buffers, or it may be a zero-copy, composite with references to
* the given buffers.
* <p>If {@code dataBuffers} produces an error or if there is a cancel
* signal, then all accumulated buffers will be
* {@linkplain #release(DataBuffer) released}.
* <p>Note that the given data buffers do <strong>not</strong> have to be
* released. They will be released as part of the returned composite.
* @param dataBuffers the data buffers that are to be composed
* @return a buffer that is composed from the {@code dataBuffers} argument
* @since 5.0.3
public static Mono<DataBuffer> join(Publisher<DataBuffer> dataBuffers) {
Assert.notNull(dataBuffers, "'dataBuffers' must not be null");
return Flux.from(dataBuffers)
.filter(list -> !list.isEmpty())
.map(list -> list.get(0).factory().join(list))
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
public Mono<ServerResponse> build(Publisher<Void> voidPublisher) {
Assert.notNull(voidPublisher, "Publisher must not be null");
return build((exchange, handlerStrategies) ->
代码示例来源:origin: spring-projects/spring-framework
Assert.notNull(handler, "TcpConnectionHandler is required");
Assert.notNull(strategy, "ReconnectStrategy is required");
if (this.stopping) {
return handleShuttingDownConnectFailure(handler);
// Report first connect to the ListenableFuture
MonoProcessor<Void> connectMono = MonoProcessor.create();
.handle(new ReactorNettyHandler(handler))
.doOnError(handler::afterConnectFailure) // report all connect failures to the handler
.flatMap(Connection::onDispose) // post-connect issues
return new MonoToListenableFutureAdapter<>(connectMono);
private Mono<DataBuffer> encodeText(CharSequence text, MediaType mediaType, DataBufferFactory bufferFactory) {
Assert.notNull(mediaType.getCharset(), "Expected MediaType with charset");
byte[] bytes = text.toString().getBytes(mediaType.getCharset());
return Mono.defer(() ->
代码示例来源:origin: spring-projects/spring-data-redis
Assert.notNull(key, "Key must not be null!");
Assert.notNull(tuples, "Key must not be null!");
return createMono(connection -> Flux.fromIterable(tuples) //
.map(t -> new DefaultTuple(ByteUtils.getBytes(rawValue(t.getValue())), t.getScore())) //
.collectList() //
.flatMap(serialized -> connection.zAdd(rawKey(key), serialized)));
代码示例来源:origin: spring-projects/spring-data-redis
return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {
Assert.notNull(command.getKey(), "Key must not be null!");
Flux<ByteBuffer> result = cmd.hvals(command.getKey());
return Mono.just(new CommandResponse<>(command, result));
代码示例来源:origin: spring-projects/spring-framework
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
Assert.notNull(tokens, "'tokens' must not be null");
Assert.notNull(elementType, "'elementType' must not be null");
return tokens.flatMap(tokenBuffer -> {
try {
Object value = reader.readValue(tokenBuffer.asParser(getObjectMapper()));
return Mono.justOrEmpty(value);
return Mono.error(new CodecException("Type definition error: " + ex.getType(), ex));
return Mono.error(new DecodingException("JSON decoding error: " + ex.getOriginalMessage(), ex));
return Mono.error(new DecodingException("I/O error while parsing input stream", ex));