reactor.core.publisher.Mono.thenMany()方法的使用及代码示例

x33g5p2x  于2022-01-24 转载在 其他  
字(8.3k)|赞(0)|评价(0)|浏览(371)

本文整理了Java中reactor.core.publisher.Mono.thenMany()方法的一些代码示例,展示了Mono.thenMany()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Mono.thenMany()方法的具体详情如下:
包路径:reactor.core.publisher.Mono
类名称:Mono
方法名:thenMany

Mono.thenMany介绍

[英]Let this Mono complete then play another Publisher.

In other words ignore element from this mono and transform the completion signal into a Flux that will emit elements from the provided Publisher.
[中]让这个单声道完成,然后播放另一个出版商。
换句话说,忽略来自这个mono的元素,并将完成信号转换成一个通量,该通量将从提供的发布者发出元素。

代码示例

代码示例来源:origin: spring-projects/spring-framework

private static <T> Supplier<Flux<T>> skipBodyAsFlux(ReactiveHttpInputMessage message) {
  return message instanceof ClientHttpResponse ?
      () -> consumeAndCancel(message).thenMany(Mono.empty()) : Flux::empty;
}

代码示例来源:origin: spring-projects/spring-framework

private static <T> Flux<T> unsupportedErrorHandler(
    ReactiveHttpInputMessage message, UnsupportedMediaTypeException ex) {
  Flux<T> result;
  if (message.getHeaders().getContentType() == null) {
    // Maybe it's okay there is no content type, if there is no content..
    result = message.getBody().map(buffer -> {
      DataBufferUtils.release(buffer);
      throw ex;
    });
  }
  else {
    result = message instanceof ClientHttpResponse ?
        consumeAndCancel(message).thenMany(Flux.error(ex)) : Flux.error(ex);
  }
  return result;
}

代码示例来源:origin: spring-projects/spring-framework

Flux<DataBuffer> partContent = partContentReady.thenMany(Flux.defer(outputMessage::getBody));

代码示例来源:origin: spring-projects/spring-framework

@Test
public void echo() throws Exception {
  int count = 100;
  Flux<String> input = Flux.range(1, count).map(index -> "msg-" + index);
  ReplayProcessor<Object> output = ReplayProcessor.create(count);
  this.client.execute(getUrl("/echo"), session -> session
      .send(input.map(session::textMessage))
      .thenMany(session.receive().take(count).map(WebSocketMessage::getPayloadAsText))
      .subscribeWith(output)
      .then())
      .block(TIMEOUT);
  assertEquals(input.collectList().block(TIMEOUT), output.collectList().block(TIMEOUT));
}

代码示例来源:origin: org.springframework/spring-web

Flux<DataBuffer> partContent = partContentReady.thenMany(Flux.defer(outputMessage::getBody));

代码示例来源:origin: reactor/reactor-core

@Test
public void windowWithTimeoutStartsTimerOnSubscription() {
  StepVerifier.withVirtualTime(() ->
      Mono.delay(Duration.ofMillis(300))
        .thenMany(Flux.range(1, 3))
        .delayElements(Duration.ofMillis(150))
        .concatWith(Flux.range(4, 10).delaySubscription(Duration.ofMillis(500)))
        .windowTimeout(10, Duration.ofMillis(500))
        .flatMap(Flux::collectList)
  )
        .expectSubscription()
        .thenAwait(Duration.ofSeconds(100))
        .assertNext(l -> assertThat(l).containsExactly(1))
        .assertNext(l -> assertThat(l).containsExactly(2, 3))
        .assertNext(l -> assertThat(l).containsExactly(4, 5, 6, 7, 8, 9, 10, 11, 12, 13))
        .assertNext(l -> assertThat(l).isEmpty())
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void testThenManySameType() {
  Flux<String> test = Mono.just("A")
              .thenMany(Flux.just("C", "D"));
  AssertSubscriber<String> ts = AssertSubscriber.create();
  test.subscribe(ts);
  ts.assertValues("C", "D");
  ts.assertComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
  public void testThenManyDifferentType() {
    Flux<String> test = Mono.just(1)
                .thenMany(Flux.just("C", "D"));

    AssertSubscriber<String> ts = AssertSubscriber.create();
    test.subscribe(ts);
    ts.assertValues("C", "D");
    ts.assertComplete();
  }
}

代码示例来源:origin: spring-cloud/spring-cloud-gateway

@Test
public void echo() throws Exception {
  int count = 100;
  Flux<String> input = Flux.range(1, count).map(index -> "msg-" + index);
  ReplayProcessor<Object> output = ReplayProcessor.create(count);
  client.execute(getUrl("/echo"),
      session -> {
        logger.debug("Starting to send messages");
        return session
            .send(input.doOnNext(s -> logger.debug("outbound " + s)).map(s -> session.textMessage(s)))
            .thenMany(session.receive().take(count).map(WebSocketMessage::getPayloadAsText))
            .subscribeWith(output)
            .doOnNext(s -> logger.debug("inbound " + s))
            .then()
            .doOnSuccessOrError((aVoid, ex) ->
                logger.debug("Done with " + (ex != null ? ex.getMessage() : "success")));
      })
      .block(Duration.ofMillis(5000));
  assertEquals(input.collectList().block(Duration.ofMillis(5000)),
      output.collectList().block(Duration.ofMillis(5000)));
}

代码示例来源:origin: spring-cloud/spring-cloud-gateway

@Test
public void echoForHttp() throws Exception {
  int count = 100;
  Flux<String> input = Flux.range(1, count).map(index -> "msg-" + index);
  ReplayProcessor<Object> output = ReplayProcessor.create(count);
  client.execute(getHttpUrl("/echoForHttp"),
      session -> {
        logger.debug("Starting to send messages");
        return session
            .send(input.doOnNext(s -> logger.debug("outbound " + s)).map(s -> session.textMessage(s)))
            .thenMany(session.receive().take(count).map(WebSocketMessage::getPayloadAsText))
            .subscribeWith(output)
            .doOnNext(s -> logger.debug("inbound " + s))
            .then()
            .doOnSuccessOrError((aVoid, ex) ->
                logger.debug("Done with " + (ex != null ? ex.getMessage() : "success")));
      })
      .block(Duration.ofMillis(5000));
  assertEquals(input.collectList().block(Duration.ofMillis(5000)),
      output.collectList().block(Duration.ofMillis(5000)));
}

代码示例来源:origin: reactor/reactor-core

@Test
public void testMonoThenManySupplier() {
  AssertSubscriber<String> ts = AssertSubscriber.create();
  Flux<String> test = Mono.just(1).thenMany(Flux.defer(() -> Flux.just("A", "B")));
  test.subscribe(ts);
  ts.assertValues("A", "B");
  ts.assertComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void testThenManyFusion() {
  Flux<Integer> test = Mono.just("A")
               .thenMany(Flux.just("C", "D"))
               .thenMany(Flux.just(1, 2));
  Assert.assertTrue(test instanceof FluxConcatArray);
  FluxConcatArray<Integer> s = (FluxConcatArray<Integer>) test;
  Assert.assertTrue(s.array.length == 3);
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  test.subscribe(ts);
  ts.assertValues(1, 2);
  ts.assertComplete();
}

代码示例来源:origin: com.aol.cyclops/cyclops-reactor

/**
 * @param other
 * @return
 * @see reactor.core.publisher.Mono#thenMany(org.reactivestreams.Publisher)
 */
public final <V> Flux<V> thenMany(Publisher<V> other) {
  return boxed.thenMany(other);
}
/**

代码示例来源:origin: rsocket/rsocket-java

private Flux<Payload> handleChannel(Flux<Payload> request) {
 return lifecycle
   .active()
   .thenMany(
     Flux.defer(
       () -> {

代码示例来源:origin: rsocket/rsocket-java

private Flux<Payload> handleRequestStream(final Payload payload) {
 return lifecycle
   .active()
   .thenMany(
     Flux.defer(
       () -> {

代码示例来源:origin: io.netifi.proteus/client

@Override
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
 return _notify()
   .thenMany(source.requestChannel(payloads));
}

代码示例来源:origin: LuoLiangDSGA/spring-learning

@PostConstruct
  public void loadData() {
    factory.getReactiveConnection().serverCommands().flushAll()
        .thenMany(Flux.just("Thor", "Hulk", "Tony")
            .map(name -> new User(UUID.randomUUID().toString().substring(0, 5), name, "123456"))
            .flatMap(user -> redisOperations.opsForValue().set(user.getId(), user))
        ).thenMany(redisOperations.keys("*")
        .flatMap(redisOperations.opsForValue()::get))
        .subscribe(System.out::println);
  }
}

代码示例来源:origin: hantsy/spring-reactive-sample

private void initPosts() {
  this.posts.deleteAll()
    .thenMany(
      Flux.just("Post one", "Post two")
        .flatMap(title -> this.posts.save(Post.builder().id(UUID.randomUUID().toString()).title(title).content("content of " + title).build()))
    );
    //.subscribe(null, null, () -> log.info("done posts initialization..."));
}

代码示例来源:origin: hantsy/spring-reactive-sample

private void initPosts() {
  this.posts.deleteAll()
    .thenMany(
      Flux.just("Post one", "Post two")
        .flatMap(title -> this.posts.save(Post.builder().id(UUID.randomUUID().toString()).title(title).content("content of " + title).build()))
    );
    //.subscribe(null, null, () -> log.info("done posts initialization..."));
}

代码示例来源:origin: reactor/reactor-kafka

private Mono<Void> transaction(Publisher<? extends ProducerRecord<K, V>> transactionRecords) {
  return transactionManager()
      .begin()
      .thenMany(sendProducerRecords(transactionRecords))
      .concatWith(transactionManager().commit())
      .onErrorResume(e -> transactionManager().abort().then(Mono.error(e)))
      .publishOn(senderOptions.scheduler())
      .then();
}

相关文章

Mono类方法