reactor.core.publisher.Mono.flatMapMany()方法的使用及代码示例

x33g5p2x  于2022-01-24 转载在 其他  
字(8.7k)|赞(0)|评价(0)|浏览(720)

本文整理了Java中reactor.core.publisher.Mono.flatMapMany()方法的一些代码示例,展示了Mono.flatMapMany()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Mono.flatMapMany()方法的具体详情如下:
包路径:reactor.core.publisher.Mono
类名称:Mono
方法名:flatMapMany

Mono.flatMapMany介绍

[英]Transform the item emitted by this Mono into a Publisher, then forward its emissions into the returned Flux.
[中]将此Mono发出的内容转换为发布者,然后将其发出的内容转发到返回的流量中。

代码示例

代码示例来源:origin: spring-projects/spring-framework

@Override
public <T> Flux<T> bodyToFlux(Class<T> elementType) {
  return this.responseMono.flatMapMany(response ->
      handleBody(response, response.bodyToFlux(elementType), mono -> mono.flatMapMany(Flux::error)));
}

代码示例来源:origin: spring-projects/spring-framework

@Override
public <T> Flux<T> bodyToFlux(ParameterizedTypeReference<T> elementType) {
  return this.responseMono.flatMapMany(response -> handleBody(response,
      response.bodyToFlux(elementType), mono -> mono.flatMapMany(Flux::error)));
}

代码示例来源:origin: codecentric/spring-boot-admin

@SuppressWarnings("unchecked")
private static <S, T> Function<Flux<DataBuffer>, Flux<DataBuffer>> convertUsing(ParameterizedTypeReference<S> sourceType,
                                        ParameterizedTypeReference<T> targetType,
                                        Function<S, T> converterFn) {
  return input -> DECODER.decodeToMono(input, ResolvableType.forType(sourceType), null, null)
              .map(body -> converterFn.apply((S) body))
              .flatMapMany(output -> ENCODER.encode(Mono.just(output), new DefaultDataBufferFactory(),
                ResolvableType.forType(targetType), null, null));
}

代码示例来源:origin: lettuce-io/lettuce-core

@Override
public Flux<K> clusterGetKeysInSlot(int slot, int count) {
  Mono<RedisClusterReactiveCommands<K, V>> connectionBySlot = findConnectionBySlotReactive(slot);
  return connectionBySlot.flatMapMany(conn -> conn.clusterGetKeysInSlot(slot, count));
}

代码示例来源:origin: spring-projects/spring-framework

@Override
@SuppressWarnings({"rawtypes", "unchecked"})  // on JDK 9 where XMLEventReader is Iterator<Object>
public Flux<XMLEvent> decode(Publisher<DataBuffer> inputStream, ResolvableType elementType,
    @Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
  Flux<DataBuffer> flux = Flux.from(inputStream);
  if (this.useAalto) {
    AaltoDataBufferToXmlEvent aaltoMapper = new AaltoDataBufferToXmlEvent();
    return flux.flatMap(aaltoMapper)
        .doFinally(signalType -> aaltoMapper.endOfInput());
  }
  else {
    Mono<DataBuffer> singleBuffer = DataBufferUtils.join(flux);
    return singleBuffer.
        flatMapMany(dataBuffer -> {
          try {
            InputStream is = dataBuffer.asInputStream();
            Iterator eventReader = inputFactory.createXMLEventReader(is);
            return Flux.fromIterable((Iterable<XMLEvent>) () -> eventReader)
                .doFinally(t -> DataBufferUtils.release(dataBuffer));
          }
          catch (XMLStreamException ex) {
            return Mono.error(ex);
          }
        });
  }
}

代码示例来源:origin: reactor/reactor-core

public Flux<String> processOrFallback(Mono<String> source, Publisher<String> fallback) {
  return source
      .flatMapMany(phrase -> Flux.fromArray(phrase.split("\\s+")))
      .switchIfEmpty(fallback);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void failScalarMap() {
  StepVerifier.create(Mono.just(1)
              .flatMapMany(f -> {
                throw new RuntimeException("test");
              }))
        .verifyErrorMessage("test");
}

代码示例来源:origin: spring-projects/spring-framework

.flatMapMany(map -> {
  List<Part> list = map.get(name);
  if (CollectionUtils.isEmpty(list)) {

代码示例来源:origin: spring-projects/spring-framework

.flatMapMany(region -> {
  if (!region.getResource().isReadable()) {
    return Flux.error(new EncodingException("Resource " +

代码示例来源:origin: reactor/reactor-core

@Test
public void prematureScalarMapCallableNullComplete() {
  StepVerifier.create(Mono.just(1)
              .flatMapMany(f -> Mono.fromCallable(() -> null)))
        .verifyComplete();
}

代码示例来源:origin: lettuce-io/lettuce-core

private <T> Flux<T> createFlux(Supplier<RedisCommand<K, V, T>> commandSupplier, boolean dissolve) {
  if (tracingEnabled) {
    return withTraceContext().flatMapMany(
        it -> Flux.from(new RedisPublisher<>(decorate(commandSupplier, it), connection, dissolve, getScheduler())));
  }
  return Flux.from(new RedisPublisher<>(commandSupplier, connection, dissolve, getScheduler()));
}

代码示例来源:origin: reactor/reactor-core

@Test
public void prematureScalarMapCallableJust() {
  StepVerifier.create(Mono.just(1)
              .flatMapMany(f -> Mono.fromCallable(() -> 2)))
        .expectNext(2)
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void triggerSequenceDoneFirst() {
  StepVerifier.withVirtualTime(() -> Mono.delay(Duration.ofSeconds(2))
                      .flatMapMany(Flux::just)
                      .delayUntil(a -> Mono.just("foo")))
        .expectSubscription()
        .expectNoEvent(Duration.ofSeconds(2))
        .expectNext(0L)
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void currentContext() throws InterruptedException {
  StepVerifier.create(Mono.just("foo")
              .flatMap(d -> Mono.subscriberContext()
                       .map(c -> d + c.get(Integer.class)))
              .subscriberContext(ctx ->
                  ctx.put(Integer.class, ctx.get(Integer.class) + 1))
              .flatMapMany(Mono::just)
              .subscriberContext(ctx -> ctx.put(Integer.class, 0)))
        .expectNext("foo1")
        .verifyComplete();
}

代码示例来源:origin: spring-projects/spring-framework

@Test
public void values() {
  Flux<Msg> result = this.webClient.get()
      .uri("/messages")
      .exchange()
      .doOnNext(response -> {
        Assert.assertEquals("true", response.headers().contentType().get().getParameters().get("delimited"));
        Assert.assertEquals("sample.proto", response.headers().header("X-Protobuf-Schema").get(0));
        Assert.assertEquals("Msg", response.headers().header("X-Protobuf-Message").get(0));
      })
      .flatMapMany(response -> response.bodyToFlux(Msg.class));
  StepVerifier.create(result)
      .expectNext(TEST_MSG)
      .expectNext(TEST_MSG)
      .expectNext(TEST_MSG)
      .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void normalInnerEmpty() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  Mono.just(1).hide().flatMapMany(v -> Flux.<Integer>empty())
  .subscribe(ts);
  ts.assertNoValues()
  .assertNoError()
  .assertComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void normal() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  Mono.just(1).hide().flatMapMany(v -> Flux.just(2).hide())
  .subscribe(ts);
  ts.assertValues(2)
  .assertNoError()
  .assertComplete();
}

代码示例来源:origin: spring-projects/spring-framework

@Test
public void streaming() {
  Flux<Msg> result = this.webClient.get()
      .uri("/message-stream")
      .exchange()
      .doOnNext(response -> {
        Assert.assertEquals("true", response.headers().contentType().get().getParameters().get("delimited"));
        Assert.assertEquals("sample.proto", response.headers().header("X-Protobuf-Schema").get(0));
        Assert.assertEquals("Msg", response.headers().header("X-Protobuf-Message").get(0));
      })
      .flatMapMany(response -> response.bodyToFlux(Msg.class));
  StepVerifier.create(result)
      .expectNext(Msg.newBuilder().setFoo("Foo").setBlah(SecondMsg.newBuilder().setBlah(0).build()).build())
      .expectNext(Msg.newBuilder().setFoo("Foo").setBlah(SecondMsg.newBuilder().setBlah(1).build()).build())
      .thenCancel()
      .verify();
}

代码示例来源:origin: spring-projects/spring-framework

@Test
public void shouldReceivePlainTextFlux() throws Exception {
  prepareResponse(response -> response.setBody("Hello Spring!"));
  Flux<String> result = this.webClient.get()
      .uri("/greeting?name=Spring")
      .header("X-Test-Header", "testvalue")
      .exchange()
      .flatMapMany(response -> response.bodyToFlux(String.class));
  StepVerifier.create(result)
      .expectNext("Hello Spring!")
      .expectComplete().verify(Duration.ofSeconds(3));
  expectRequestCount(1);
  expectRequest(request -> {
    assertEquals("testvalue", request.getHeader("X-Test-Header"));
    assertEquals("*/*", request.getHeader(HttpHeaders.ACCEPT));
    assertEquals("/greeting?name=Spring", request.getPath());
  });
}

代码示例来源:origin: spring-projects/spring-framework

@Test
public void limitResponseSize() {
  DefaultDataBufferFactory bufferFactory = new DefaultDataBufferFactory();
  DataBuffer b1 = dataBuffer("foo", bufferFactory);
  DataBuffer b2 = dataBuffer("bar", bufferFactory);
  DataBuffer b3 = dataBuffer("baz", bufferFactory);
  ClientRequest request = ClientRequest.create(HttpMethod.GET, DEFAULT_URL).build();
  ClientResponse response = ClientResponse.create(HttpStatus.OK).body(Flux.just(b1, b2, b3)).build();
  Mono<ClientResponse> result = ExchangeFilterFunctions.limitResponseSize(5)
      .filter(request, req -> Mono.just(response));
  StepVerifier.create(result.flatMapMany(res -> res.body(BodyExtractors.toDataBuffers())))
      .consumeNextWith(buffer -> assertEquals("foo", string(buffer)))
      .consumeNextWith(buffer -> assertEquals("ba", string(buffer)))
      .expectComplete()
      .verify();
}

相关文章

Mono类方法