本文整理了Java中reactor.core.publisher.Mono.zipWith()
方法的一些代码示例,展示了Mono.zipWith()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Mono.zipWith()
方法的具体详情如下:
包路径:reactor.core.publisher.Mono
类名称:Mono
方法名:zipWith
[英]Combine the result from this mono and another into a Tuple2.
An error or empty completion of any source will cause the other source to be cancelled and the resulting Mono to immediately error or complete, respectively.
[中]将这个单声道和另一个单声道的结果组合成一个元组2。
任何源的错误或空完成将分别导致另一个源被取消,并导致Mono立即出错或完成。
代码示例来源:origin: reactor/reactor-core
/**
* Combine the result from this mono and another into a {@link Tuple2}.
* <p>
* An error or <strong>empty</strong> completion of any source will cause the other source
* to be cancelled and the resulting Mono to immediately error or complete, respectively.
*
* <p>
* <img class="marble" src="doc-files/marbles/zipWithOtherForMono.svg" alt="">
*
* @param other the {@link Mono} to combine with
* @param <T2> the element type of the other Mono instance
*
* @return a new combined Mono
*/
public final <T2> Mono<Tuple2<T, T2>> zipWith(Mono<? extends T2> other) {
return zipWith(other, Flux.tuple2Function());
}
代码示例来源:origin: lettuce-io/lettuce-core
protected Mono<List<RedisNodeDescription>> getNodes(StatefulRedisSentinelConnection<String, String> connection) {
RedisSentinelReactiveCommands<String, String> reactive = connection.reactive();
Mono<Tuple2<Map<String, String>, List<Map<String, String>>>> masterAndSlaves = reactive.master(masterId)
.zipWith(reactive.slaves(masterId).collectList()).timeout(this.timeout).flatMap(tuple -> {
return ResumeAfter.close(connection).thenEmit(tuple);
}).doOnError(e -> connection.closeAsync());
return masterAndSlaves.map(tuple -> {
List<RedisNodeDescription> result = new ArrayList<>();
result.add(toNode(tuple.getT1(), RedisInstance.Role.MASTER));
result.addAll(tuple.getT2().stream().filter(SentinelTopologyProvider::isAvailable)
.map(map -> toNode(map, RedisInstance.Role.SLAVE)).collect(Collectors.toList()));
return result;
});
}
代码示例来源:origin: reactor/reactor-core
@Test
public void andCombinatorAliasZipWithCombinator() {
Mono<String> and = Mono.just(1).zipWith(Mono.just("B"), (i, s) -> i + s);
Mono<String> zipWith = and.zipWith(Mono.just(3), (s, i) -> s + i);
StepVerifier.create(zipWith)
.expectNext("1B3")
.verifyComplete();
}
}
代码示例来源:origin: lettuce-io/lettuce-core
private Mono<Tuple2<RedisURI, StatefulRedisConnection<K, V>>> getMasterConnectionAndUri(List<RedisNodeDescription> nodes,
Tuple2<RedisURI, StatefulRedisConnection<K, V>> connectionTuple, RedisCodec<K, V> codec) {
RedisNodeDescription node = getConnectedNode(redisURI, nodes);
if (node.getRole() != RedisInstance.Role.MASTER) {
RedisNodeDescription master = lookupMaster(nodes);
ConnectionFuture<StatefulRedisConnection<K, V>> masterConnection = redisClient.connectAsync(codec, master.getUri());
return Mono.just(master.getUri()).zipWith(Mono.fromCompletionStage(masterConnection)) //
.doOnNext(it -> {
initialConnections.put(it.getT1(), it.getT2());
});
}
return Mono.just(connectionTuple);
}
代码示例来源:origin: spring-projects/spring-data-elasticsearch
private Flux<Tuple2<InetSocketAddress, ClientResponse>> nodes(@Nullable State state) {
return Flux.fromIterable(hosts()) //
.filter(entry -> state == null || entry.getState().equals(state)) //
.map(ElasticsearchHost::getEndpoint) //
.flatMap(host -> {
Mono<ClientResponse> exchange = createWebClient(host) //
.head().uri("/").exchange().doOnError(throwable -> {
hosts.put(host, new ElasticsearchHost(host, State.OFFLINE));
clientProvider.getErrorListener().accept(throwable);
});
return Mono.just(host).zipWith(exchange);
}) //
.onErrorContinue((throwable, o) -> clientProvider.getErrorListener().accept(throwable));
}
代码示例来源:origin: reactor/reactor-core
@Test
public void andAliasZipWith() {
Mono<Tuple2<Integer, String>> and = Mono.just(1)
.zipWith(Mono.just("B"));
Mono<Tuple2<Tuple2<Integer, String>, Integer>> zipWith = and.zipWith(Mono.just(3));
StepVerifier.create(zipWith)
.expectNext(Tuples.of(Tuples.of(1, "B"), 3))
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void introFutureHellReactorVersion() {
Flux<String> ids = ifhrIds(); // <1>
Flux<String> combinations =
ids.flatMap(id -> { // <2>
Mono<String> nameTask = ifhrName(id); // <3>
Mono<Integer> statTask = ifhrStat(id); // <4>
return nameTask.zipWith(statTask, // <5>
(name, stat) -> "Name " + name + " has stats " + stat);
});
Mono<List<String>> result = combinations.collectList(); // <6>
List<String> results = result.block(); // <7>
assertThat(results).containsExactly( // <8>
"Name NameJoe has stats 103",
"Name NameBart has stats 104",
"Name NameHenry has stats 105",
"Name NameNicole has stats 106",
"Name NameABSLAJNFOAJNFOANFANSF has stats 121"
);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void publishOnAsyncDetection() {
Publisher<String> a = Flux.just("a");
Publisher<String> b = Mono.just("b");
Flux<Tuple2<String, String>> flux =
Flux.from(a)
.flatMap(value -> Mono.just(value)
.zipWith(Mono.from(b)))
.publishOn(Schedulers.single());
StepVerifier.create(flux)
.expectFusion(Fuseable.ASYNC)
.assertNext(tuple -> {
Assertions.assertThat(tuple.getT1()).isEqualTo("a");
Assertions.assertThat(tuple.getT2()).isEqualTo("b");
})
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void pairWise() {
Mono<Tuple2<Integer, String>> f = Mono.just(1)
.zipWith(Mono.just("test2"));
Assert.assertTrue(f instanceof MonoZip);
MonoZip<?, ?> s = (MonoZip<?, ?>) f;
Assert.assertTrue(s.sources != null);
Assert.assertTrue(s.sources.length == 2);
f.subscribeWith(AssertSubscriber.create())
.assertValues(Tuples.of(1, "test2"))
.assertComplete();
}
代码示例来源:origin: reactor/reactor-core
Mono<Tuple2<Integer, String>> doPut(String url, Mono<String> data) {
Mono<Tuple2<String, Optional<Object>>> dataAndContext =
data.zipWith(Mono.subscriberContext()
.map(c -> c.getOrEmpty(HTTP_CORRELATION_ID)));
return dataAndContext
.<String>handle((dac, sink) -> {
if (dac.getT2().isPresent()) {
sink.next("PUT <" + dac.getT1() + "> sent to " + url + " with header X-Correlation-ID = " + dac.getT2().get());
}
else {
sink.next("PUT <" + dac.getT1() + "> sent to " + url);
}
sink.complete();
})
.map(msg -> Tuples.of(200, msg));
}
代码示例来源:origin: reactor/reactor-core
@Test
public void publishOnAsyncDetectionConditional() {
Publisher<String> a = Flux.just("a");
Publisher<String> b = Mono.just("b");
Flux<Tuple2<String, String>> flux =
Flux.from(a)
.flatMap(value -> Mono.just(value)
.zipWith(Mono.from(b)))
.publishOn(Schedulers.single())
.filter(t -> true);
StepVerifier.create(flux)
.expectFusion(Fuseable.ASYNC)
.assertNext(tuple -> {
Assertions.assertThat(tuple.getT1()).isEqualTo("a");
Assertions.assertThat(tuple.getT2()).isEqualTo("b");
})
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void pairWise3() {
Mono<Tuple2<Tuple2<Integer, String>, String>> f =
Mono.zip(Arrays.asList(Mono.just(1), Mono.just("test")),
obj -> Tuples.of((int) obj[0], (String) obj[1]))
.zipWith(Mono.just("test2"));
Assert.assertTrue(f instanceof MonoZip);
MonoZip<?, ?> s = (MonoZip<?, ?>) f;
Assert.assertTrue(s.sources != null);
Assert.assertTrue(s.sources.length == 2);
Mono<Tuple2<Integer, String>> ff = f.map(t -> Tuples.of(t.getT1()
.getT1(),
t.getT1()
.getT2() + t.getT2()));
ff.subscribeWith(AssertSubscriber.create())
.assertValues(Tuples.of(1, "testtest2"))
.assertComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void pairWise2() {
Mono<Tuple2<Tuple2<Integer, String>, String>> f =
Mono.zip(Mono.just(1), Mono.just("test"))
.zipWith(Mono.just("test2"));
Assert.assertTrue(f instanceof MonoZip);
MonoZip<?, ?> s = (MonoZip<?, ?>) f;
Assert.assertTrue(s.sources != null);
Assert.assertTrue(s.sources.length == 3);
Mono<Tuple2<Integer, String>> ff = f.map(t -> Tuples.of(t.getT1()
.getT1(),
t.getT1()
.getT2() + t.getT2()));
ff.subscribeWith(AssertSubscriber.create())
.assertValues(Tuples.of(1, "testtest2"))
.assertComplete();
}
代码示例来源:origin: reactor/reactor-core
.zipWith(Mono.subscriberContext())
.doOnNext(it -> resourceContextValue.set(it.getT2().get(String.class)))
.map(Tuple2::getT1);
代码示例来源:origin: reactor/reactor-core
.zipWith(Mono.subscriberContext())
.doOnNext(it -> resourceContextValue.set(it.getT2().get(String.class)))
.map(Tuple2::getT1);
代码示例来源:origin: spring-projects/spring-integration
.defaultIfEmpty(messageBuilder)
.map(AbstractIntegrationMessageBuilder::build)
.zipWith(Mono.just(httpEntity));
代码示例来源:origin: org.cloudfoundry/cloudfoundry-operations
private static Mono<Tuple2<Optional<List<org.cloudfoundry.client.v2.routes.Route>>, String>> getRoutesAndApplicationId(CloudFoundryClient cloudFoundryClient, DeleteApplicationRequest request,
String spaceId, boolean deleteRoutes) {
return getApplicationId(cloudFoundryClient, request.getName(), spaceId)
.flatMap(applicationId -> getOptionalRoutes(cloudFoundryClient, deleteRoutes, applicationId)
.zipWith(Mono.just(applicationId)));
}
代码示例来源:origin: naturalprogrammer/spring-lemon
public Mono<U> fetchUserById(ID userId) {
// fetch the user
return findUserById(userId)
.zipWith(LecrUtils.currentUser())
.doOnNext(this::hideConfidentialFields)
.map(Tuple2::getT1);
}
代码示例来源:origin: naturalprogrammer/spring-lemon
/**
* Resends verification mail to the user.
*/
public Mono<Void> resendVerificationMail(ID userId) {
return findUserById(userId)
.zipWith(LecrUtils.currentUser())
.doOnNext(this::ensureEditable)
.map(Tuple2::getT1)
.doOnNext(this::resendVerificationMail).then();
}
代码示例来源:origin: danielfernandez/reactive-matchday
private Mono<MatchInfo> buildMatchInfo(final Match match) {
// For a specific Match, gets the info of the playing teams and creates the MatchInfo
return this.mongoTemplate.findById(match.getTeamACode(), Team.class)
.zipWith(this.mongoTemplate.findById(match.getTeamBCode(), Team.class))
.map(teams -> new MatchInfo(match.getId(), teams.getT1(), teams.getT2()));
}
内容来源于网络,如有侵权,请联系作者删除!