本文整理了Java中reactor.core.publisher.Mono.flatMapMany()
方法的一些代码示例,展示了Mono.flatMapMany()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Mono.flatMapMany()
方法的具体详情如下:
包路径:reactor.core.publisher.Mono
类名称:Mono
方法名:flatMapMany
[英]Transform the item emitted by this Mono into a Publisher, then forward its emissions into the returned Flux.
[中]将此Mono发出的内容转换为发布者,然后将其发出的内容转发到返回的流量中。
代码示例来源:origin: spring-projects/spring-framework
@Override
public <T> Flux<T> bodyToFlux(Class<T> elementType) {
return this.responseMono.flatMapMany(response ->
handleBody(response, response.bodyToFlux(elementType), mono -> mono.flatMapMany(Flux::error)));
}
代码示例来源:origin: spring-projects/spring-framework
@Override
public <T> Flux<T> bodyToFlux(ParameterizedTypeReference<T> elementType) {
return this.responseMono.flatMapMany(response -> handleBody(response,
response.bodyToFlux(elementType), mono -> mono.flatMapMany(Flux::error)));
}
代码示例来源:origin: codecentric/spring-boot-admin
@SuppressWarnings("unchecked")
private static <S, T> Function<Flux<DataBuffer>, Flux<DataBuffer>> convertUsing(ParameterizedTypeReference<S> sourceType,
ParameterizedTypeReference<T> targetType,
Function<S, T> converterFn) {
return input -> DECODER.decodeToMono(input, ResolvableType.forType(sourceType), null, null)
.map(body -> converterFn.apply((S) body))
.flatMapMany(output -> ENCODER.encode(Mono.just(output), new DefaultDataBufferFactory(),
ResolvableType.forType(targetType), null, null));
}
代码示例来源:origin: lettuce-io/lettuce-core
@Override
public Flux<K> clusterGetKeysInSlot(int slot, int count) {
Mono<RedisClusterReactiveCommands<K, V>> connectionBySlot = findConnectionBySlotReactive(slot);
return connectionBySlot.flatMapMany(conn -> conn.clusterGetKeysInSlot(slot, count));
}
代码示例来源:origin: spring-projects/spring-framework
@Override
@SuppressWarnings({"rawtypes", "unchecked"}) // on JDK 9 where XMLEventReader is Iterator<Object>
public Flux<XMLEvent> decode(Publisher<DataBuffer> inputStream, ResolvableType elementType,
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
Flux<DataBuffer> flux = Flux.from(inputStream);
if (this.useAalto) {
AaltoDataBufferToXmlEvent aaltoMapper = new AaltoDataBufferToXmlEvent();
return flux.flatMap(aaltoMapper)
.doFinally(signalType -> aaltoMapper.endOfInput());
}
else {
Mono<DataBuffer> singleBuffer = DataBufferUtils.join(flux);
return singleBuffer.
flatMapMany(dataBuffer -> {
try {
InputStream is = dataBuffer.asInputStream();
Iterator eventReader = inputFactory.createXMLEventReader(is);
return Flux.fromIterable((Iterable<XMLEvent>) () -> eventReader)
.doFinally(t -> DataBufferUtils.release(dataBuffer));
}
catch (XMLStreamException ex) {
return Mono.error(ex);
}
});
}
}
代码示例来源:origin: reactor/reactor-core
public Flux<String> processOrFallback(Mono<String> source, Publisher<String> fallback) {
return source
.flatMapMany(phrase -> Flux.fromArray(phrase.split("\\s+")))
.switchIfEmpty(fallback);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void failScalarMap() {
StepVerifier.create(Mono.just(1)
.flatMapMany(f -> {
throw new RuntimeException("test");
}))
.verifyErrorMessage("test");
}
代码示例来源:origin: spring-projects/spring-framework
.flatMapMany(map -> {
List<Part> list = map.get(name);
if (CollectionUtils.isEmpty(list)) {
代码示例来源:origin: spring-projects/spring-framework
.flatMapMany(region -> {
if (!region.getResource().isReadable()) {
return Flux.error(new EncodingException("Resource " +
代码示例来源:origin: reactor/reactor-core
@Test
public void prematureScalarMapCallableNullComplete() {
StepVerifier.create(Mono.just(1)
.flatMapMany(f -> Mono.fromCallable(() -> null)))
.verifyComplete();
}
代码示例来源:origin: lettuce-io/lettuce-core
private <T> Flux<T> createFlux(Supplier<RedisCommand<K, V, T>> commandSupplier, boolean dissolve) {
if (tracingEnabled) {
return withTraceContext().flatMapMany(
it -> Flux.from(new RedisPublisher<>(decorate(commandSupplier, it), connection, dissolve, getScheduler())));
}
return Flux.from(new RedisPublisher<>(commandSupplier, connection, dissolve, getScheduler()));
}
代码示例来源:origin: reactor/reactor-core
@Test
public void prematureScalarMapCallableJust() {
StepVerifier.create(Mono.just(1)
.flatMapMany(f -> Mono.fromCallable(() -> 2)))
.expectNext(2)
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void triggerSequenceDoneFirst() {
StepVerifier.withVirtualTime(() -> Mono.delay(Duration.ofSeconds(2))
.flatMapMany(Flux::just)
.delayUntil(a -> Mono.just("foo")))
.expectSubscription()
.expectNoEvent(Duration.ofSeconds(2))
.expectNext(0L)
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void currentContext() throws InterruptedException {
StepVerifier.create(Mono.just("foo")
.flatMap(d -> Mono.subscriberContext()
.map(c -> d + c.get(Integer.class)))
.subscriberContext(ctx ->
ctx.put(Integer.class, ctx.get(Integer.class) + 1))
.flatMapMany(Mono::just)
.subscriberContext(ctx -> ctx.put(Integer.class, 0)))
.expectNext("foo1")
.verifyComplete();
}
代码示例来源:origin: spring-projects/spring-framework
@Test
public void values() {
Flux<Msg> result = this.webClient.get()
.uri("/messages")
.exchange()
.doOnNext(response -> {
Assert.assertEquals("true", response.headers().contentType().get().getParameters().get("delimited"));
Assert.assertEquals("sample.proto", response.headers().header("X-Protobuf-Schema").get(0));
Assert.assertEquals("Msg", response.headers().header("X-Protobuf-Message").get(0));
})
.flatMapMany(response -> response.bodyToFlux(Msg.class));
StepVerifier.create(result)
.expectNext(TEST_MSG)
.expectNext(TEST_MSG)
.expectNext(TEST_MSG)
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void normalInnerEmpty() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
Mono.just(1).hide().flatMapMany(v -> Flux.<Integer>empty())
.subscribe(ts);
ts.assertNoValues()
.assertNoError()
.assertComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void normal() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
Mono.just(1).hide().flatMapMany(v -> Flux.just(2).hide())
.subscribe(ts);
ts.assertValues(2)
.assertNoError()
.assertComplete();
}
代码示例来源:origin: spring-projects/spring-framework
@Test
public void streaming() {
Flux<Msg> result = this.webClient.get()
.uri("/message-stream")
.exchange()
.doOnNext(response -> {
Assert.assertEquals("true", response.headers().contentType().get().getParameters().get("delimited"));
Assert.assertEquals("sample.proto", response.headers().header("X-Protobuf-Schema").get(0));
Assert.assertEquals("Msg", response.headers().header("X-Protobuf-Message").get(0));
})
.flatMapMany(response -> response.bodyToFlux(Msg.class));
StepVerifier.create(result)
.expectNext(Msg.newBuilder().setFoo("Foo").setBlah(SecondMsg.newBuilder().setBlah(0).build()).build())
.expectNext(Msg.newBuilder().setFoo("Foo").setBlah(SecondMsg.newBuilder().setBlah(1).build()).build())
.thenCancel()
.verify();
}
代码示例来源:origin: spring-projects/spring-framework
@Test
public void shouldReceivePlainTextFlux() throws Exception {
prepareResponse(response -> response.setBody("Hello Spring!"));
Flux<String> result = this.webClient.get()
.uri("/greeting?name=Spring")
.header("X-Test-Header", "testvalue")
.exchange()
.flatMapMany(response -> response.bodyToFlux(String.class));
StepVerifier.create(result)
.expectNext("Hello Spring!")
.expectComplete().verify(Duration.ofSeconds(3));
expectRequestCount(1);
expectRequest(request -> {
assertEquals("testvalue", request.getHeader("X-Test-Header"));
assertEquals("*/*", request.getHeader(HttpHeaders.ACCEPT));
assertEquals("/greeting?name=Spring", request.getPath());
});
}
代码示例来源:origin: spring-projects/spring-framework
@Test
public void limitResponseSize() {
DefaultDataBufferFactory bufferFactory = new DefaultDataBufferFactory();
DataBuffer b1 = dataBuffer("foo", bufferFactory);
DataBuffer b2 = dataBuffer("bar", bufferFactory);
DataBuffer b3 = dataBuffer("baz", bufferFactory);
ClientRequest request = ClientRequest.create(HttpMethod.GET, DEFAULT_URL).build();
ClientResponse response = ClientResponse.create(HttpStatus.OK).body(Flux.just(b1, b2, b3)).build();
Mono<ClientResponse> result = ExchangeFilterFunctions.limitResponseSize(5)
.filter(request, req -> Mono.just(response));
StepVerifier.create(result.flatMapMany(res -> res.body(BodyExtractors.toDataBuffers())))
.consumeNextWith(buffer -> assertEquals("foo", string(buffer)))
.consumeNextWith(buffer -> assertEquals("ba", string(buffer)))
.expectComplete()
.verify();
}
内容来源于网络,如有侵权,请联系作者删除!