reactor.core.publisher.Mono.zipWith()方法的使用及代码示例

x33g5p2x  于2022-01-24 转载在 其他  
字(10.1k)|赞(0)|评价(0)|浏览(537)

本文整理了Java中reactor.core.publisher.Mono.zipWith()方法的一些代码示例,展示了Mono.zipWith()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Mono.zipWith()方法的具体详情如下:
包路径:reactor.core.publisher.Mono
类名称:Mono
方法名:zipWith

Mono.zipWith介绍

[英]Combine the result from this mono and another into a Tuple2.

An error or empty completion of any source will cause the other source to be cancelled and the resulting Mono to immediately error or complete, respectively.
[中]将这个单声道和另一个单声道的结果组合成一个元组2。
任何源的错误或空完成将分别导致另一个源被取消,并导致Mono立即出错或完成。

代码示例

代码示例来源:origin: reactor/reactor-core

/**
 * Combine the result from this mono and another into a {@link Tuple2}.
 * <p>
 * An error or <strong>empty</strong> completion of any source will cause the other source
 * to be cancelled and the resulting Mono to immediately error or complete, respectively.
 *
 * <p>
 * <img class="marble" src="doc-files/marbles/zipWithOtherForMono.svg" alt="">
 *
 * @param other the {@link Mono} to combine with
 * @param <T2> the element type of the other Mono instance
 *
 * @return a new combined Mono
 */
public final <T2> Mono<Tuple2<T, T2>> zipWith(Mono<? extends T2> other) {
  return zipWith(other, Flux.tuple2Function());
}

代码示例来源:origin: lettuce-io/lettuce-core

protected Mono<List<RedisNodeDescription>> getNodes(StatefulRedisSentinelConnection<String, String> connection) {
  RedisSentinelReactiveCommands<String, String> reactive = connection.reactive();
  Mono<Tuple2<Map<String, String>, List<Map<String, String>>>> masterAndSlaves = reactive.master(masterId)
      .zipWith(reactive.slaves(masterId).collectList()).timeout(this.timeout).flatMap(tuple -> {
        return ResumeAfter.close(connection).thenEmit(tuple);
      }).doOnError(e -> connection.closeAsync());
  return masterAndSlaves.map(tuple -> {
    List<RedisNodeDescription> result = new ArrayList<>();
    result.add(toNode(tuple.getT1(), RedisInstance.Role.MASTER));
    result.addAll(tuple.getT2().stream().filter(SentinelTopologyProvider::isAvailable)
        .map(map -> toNode(map, RedisInstance.Role.SLAVE)).collect(Collectors.toList()));
    return result;
  });
}

代码示例来源:origin: reactor/reactor-core

@Test
  public void andCombinatorAliasZipWithCombinator() {
    Mono<String> and = Mono.just(1).zipWith(Mono.just("B"), (i, s) -> i + s);

    Mono<String> zipWith = and.zipWith(Mono.just(3), (s, i) -> s + i);

    StepVerifier.create(zipWith)
          .expectNext("1B3")
          .verifyComplete();
  }
}

代码示例来源:origin: lettuce-io/lettuce-core

private Mono<Tuple2<RedisURI, StatefulRedisConnection<K, V>>> getMasterConnectionAndUri(List<RedisNodeDescription> nodes,
    Tuple2<RedisURI, StatefulRedisConnection<K, V>> connectionTuple, RedisCodec<K, V> codec) {
  RedisNodeDescription node = getConnectedNode(redisURI, nodes);
  if (node.getRole() != RedisInstance.Role.MASTER) {
    RedisNodeDescription master = lookupMaster(nodes);
    ConnectionFuture<StatefulRedisConnection<K, V>> masterConnection = redisClient.connectAsync(codec, master.getUri());
    return Mono.just(master.getUri()).zipWith(Mono.fromCompletionStage(masterConnection)) //
        .doOnNext(it -> {
          initialConnections.put(it.getT1(), it.getT2());
        });
  }
  return Mono.just(connectionTuple);
}

代码示例来源:origin: spring-projects/spring-data-elasticsearch

private Flux<Tuple2<InetSocketAddress, ClientResponse>> nodes(@Nullable State state) {
  return Flux.fromIterable(hosts()) //
      .filter(entry -> state == null || entry.getState().equals(state)) //
      .map(ElasticsearchHost::getEndpoint) //
      .flatMap(host -> {
        Mono<ClientResponse> exchange = createWebClient(host) //
            .head().uri("/").exchange().doOnError(throwable -> {
              hosts.put(host, new ElasticsearchHost(host, State.OFFLINE));
              clientProvider.getErrorListener().accept(throwable);
            });
        return Mono.just(host).zipWith(exchange);
      }) //
      .onErrorContinue((throwable, o) -> clientProvider.getErrorListener().accept(throwable));
}

代码示例来源:origin: reactor/reactor-core

@Test
public void andAliasZipWith() {
  Mono<Tuple2<Integer, String>> and = Mono.just(1)
                      .zipWith(Mono.just("B"));
  Mono<Tuple2<Tuple2<Integer, String>, Integer>> zipWith = and.zipWith(Mono.just(3));
  StepVerifier.create(zipWith)
        .expectNext(Tuples.of(Tuples.of(1, "B"), 3))
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void introFutureHellReactorVersion() {
  Flux<String> ids = ifhrIds(); // <1>
  Flux<String> combinations =
      ids.flatMap(id -> { // <2>
        Mono<String> nameTask = ifhrName(id); // <3>
        Mono<Integer> statTask = ifhrStat(id); // <4>
        return nameTask.zipWith(statTask, // <5>
            (name, stat) -> "Name " + name + " has stats " + stat);
      });
  Mono<List<String>> result = combinations.collectList(); // <6>
  List<String> results = result.block(); // <7>
  assertThat(results).containsExactly( // <8>
      "Name NameJoe has stats 103",
      "Name NameBart has stats 104",
      "Name NameHenry has stats 105",
      "Name NameNicole has stats 106",
      "Name NameABSLAJNFOAJNFOANFANSF has stats 121"
  );
}

代码示例来源:origin: reactor/reactor-core

@Test
public void publishOnAsyncDetection() {
  Publisher<String> a = Flux.just("a");
  Publisher<String> b = Mono.just("b");
  Flux<Tuple2<String, String>> flux =
      Flux.from(a)
        .flatMap(value -> Mono.just(value)
                   .zipWith(Mono.from(b)))
        .publishOn(Schedulers.single());
  StepVerifier.create(flux)
        .expectFusion(Fuseable.ASYNC)
        .assertNext(tuple -> {
          Assertions.assertThat(tuple.getT1()).isEqualTo("a");
          Assertions.assertThat(tuple.getT2()).isEqualTo("b");
        })
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void pairWise() {
  Mono<Tuple2<Integer, String>> f = Mono.just(1)
                     .zipWith(Mono.just("test2"));
  Assert.assertTrue(f instanceof MonoZip);
  MonoZip<?, ?> s = (MonoZip<?, ?>) f;
  Assert.assertTrue(s.sources != null);
  Assert.assertTrue(s.sources.length == 2);
  f.subscribeWith(AssertSubscriber.create())
   .assertValues(Tuples.of(1, "test2"))
   .assertComplete();
}

代码示例来源:origin: reactor/reactor-core

Mono<Tuple2<Integer, String>> doPut(String url, Mono<String> data) {
  Mono<Tuple2<String, Optional<Object>>> dataAndContext =
      data.zipWith(Mono.subscriberContext()
               .map(c -> c.getOrEmpty(HTTP_CORRELATION_ID)));
  return dataAndContext
      .<String>handle((dac, sink) -> {
        if (dac.getT2().isPresent()) {
          sink.next("PUT <" + dac.getT1() + "> sent to " + url + " with header X-Correlation-ID = " + dac.getT2().get());
        }
        else {
          sink.next("PUT <" + dac.getT1() + "> sent to " + url);
        }
        sink.complete();
      })
      .map(msg -> Tuples.of(200, msg));
}

代码示例来源:origin: reactor/reactor-core

@Test
public void publishOnAsyncDetectionConditional() {
  Publisher<String> a = Flux.just("a");
  Publisher<String> b = Mono.just("b");
  Flux<Tuple2<String, String>> flux =
      Flux.from(a)
        .flatMap(value -> Mono.just(value)
                   .zipWith(Mono.from(b)))
        .publishOn(Schedulers.single())
        .filter(t -> true);
  StepVerifier.create(flux)
        .expectFusion(Fuseable.ASYNC)
        .assertNext(tuple -> {
          Assertions.assertThat(tuple.getT1()).isEqualTo("a");
          Assertions.assertThat(tuple.getT2()).isEqualTo("b");
        })
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void pairWise3() {
  Mono<Tuple2<Tuple2<Integer, String>, String>> f =
      Mono.zip(Arrays.asList(Mono.just(1), Mono.just("test")),
          obj -> Tuples.of((int) obj[0], (String) obj[1]))
        .zipWith(Mono.just("test2"));
  Assert.assertTrue(f instanceof MonoZip);
  MonoZip<?, ?> s = (MonoZip<?, ?>) f;
  Assert.assertTrue(s.sources != null);
  Assert.assertTrue(s.sources.length == 2);
  Mono<Tuple2<Integer, String>> ff = f.map(t -> Tuples.of(t.getT1()
                               .getT1(),
      t.getT1()
       .getT2() + t.getT2()));
  ff.subscribeWith(AssertSubscriber.create())
   .assertValues(Tuples.of(1, "testtest2"))
   .assertComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void pairWise2() {
  Mono<Tuple2<Tuple2<Integer, String>, String>> f =
      Mono.zip(Mono.just(1), Mono.just("test"))
        .zipWith(Mono.just("test2"));
  Assert.assertTrue(f instanceof MonoZip);
  MonoZip<?, ?> s = (MonoZip<?, ?>) f;
  Assert.assertTrue(s.sources != null);
  Assert.assertTrue(s.sources.length == 3);
  Mono<Tuple2<Integer, String>> ff = f.map(t -> Tuples.of(t.getT1()
                               .getT1(),
      t.getT1()
       .getT2() + t.getT2()));
  ff.subscribeWith(AssertSubscriber.create())
   .assertValues(Tuples.of(1, "testtest2"))
   .assertComplete();
}

代码示例来源:origin: reactor/reactor-core

.zipWith(Mono.subscriberContext())
.doOnNext(it -> resourceContextValue.set(it.getT2().get(String.class)))
.map(Tuple2::getT1);

代码示例来源:origin: reactor/reactor-core

.zipWith(Mono.subscriberContext())
.doOnNext(it -> resourceContextValue.set(it.getT2().get(String.class)))
.map(Tuple2::getT1);

代码示例来源:origin: spring-projects/spring-integration

.defaultIfEmpty(messageBuilder)
.map(AbstractIntegrationMessageBuilder::build)
.zipWith(Mono.just(httpEntity));

代码示例来源:origin: org.cloudfoundry/cloudfoundry-operations

private static Mono<Tuple2<Optional<List<org.cloudfoundry.client.v2.routes.Route>>, String>> getRoutesAndApplicationId(CloudFoundryClient cloudFoundryClient, DeleteApplicationRequest request,
                                                            String spaceId, boolean deleteRoutes) {
  return getApplicationId(cloudFoundryClient, request.getName(), spaceId)
    .flatMap(applicationId -> getOptionalRoutes(cloudFoundryClient, deleteRoutes, applicationId)
      .zipWith(Mono.just(applicationId)));
}

代码示例来源:origin: naturalprogrammer/spring-lemon

public Mono<U> fetchUserById(ID userId) {
  // fetch the user
  return findUserById(userId)
      .zipWith(LecrUtils.currentUser())
      .doOnNext(this::hideConfidentialFields)
      .map(Tuple2::getT1);
}

代码示例来源:origin: naturalprogrammer/spring-lemon

/**
 * Resends verification mail to the user.
 */
public Mono<Void> resendVerificationMail(ID userId) {
  
  return findUserById(userId)
    .zipWith(LecrUtils.currentUser())
    .doOnNext(this::ensureEditable)
    .map(Tuple2::getT1)
    .doOnNext(this::resendVerificationMail).then();
}

代码示例来源:origin: danielfernandez/reactive-matchday

private Mono<MatchInfo> buildMatchInfo(final Match match) {
  // For a specific Match, gets the info of the playing teams and creates the MatchInfo
  return this.mongoTemplate.findById(match.getTeamACode(), Team.class)
      .zipWith(this.mongoTemplate.findById(match.getTeamBCode(), Team.class))
      .map(teams -> new MatchInfo(match.getId(), teams.getT1(), teams.getT2()));
}

相关文章

Mono类方法