本文整理了Java中reactor.core.publisher.Mono.toFuture()
方法的一些代码示例,展示了Mono.toFuture()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Mono.toFuture()
方法的具体详情如下:
包路径:reactor.core.publisher.Mono
类名称:Mono
方法名:toFuture
[英]Transform this Mono into a CompletableFuture completing on onNext or onComplete and failing on onError.
[中]将此Mono转换为一个完整的未来,在onNext或onComplete上完成,在onError上失败。
代码示例来源:origin: lettuce-io/lettuce-core
@Override
public CompletableFuture<List<RedisNodeDescription>> getNodesAsync() {
logger.debug("lookup topology for masterId {}", masterId);
Mono<StatefulRedisSentinelConnection<String, String>> connect = Mono.fromFuture(redisClient.connectSentinelAsync(CODEC,
sentinelUri));
return connect.flatMap(this::getNodes).toFuture();
}
代码示例来源:origin: lettuce-io/lettuce-core
@Override
public CompletableFuture<List<RedisNodeDescription>> getNodesAsync() {
List<StatefulRedisConnection<String, String>> connections = new CopyOnWriteArrayList<>();
Flux<RedisURI> uris = Flux.fromIterable(redisURIs);
Mono<List<RedisNodeDescription>> nodes = uris
.flatMap(uri -> getNodeDescription(connections, uri))
.collectList()
.flatMap(
(nodeDescriptions) -> {
if (nodeDescriptions.isEmpty()) {
return Mono.error(new RedisConnectionException(String.format(
"Failed to connect to at least one node in %s", redisURIs)));
}
return Mono.just(nodeDescriptions);
});
return nodes.toFuture();
}
代码示例来源:origin: lettuce-io/lettuce-core
@Override
public CompletableFuture<StatefulRedisMasterSlaveConnection<K, V>> connectAsync() {
Map<RedisURI, StatefulRedisConnection<K, V>> initialConnections = new HashMap<>();
TopologyProvider topologyProvider = new StaticMasterSlaveTopologyProvider(redisClient, redisURIs);
RedisURI seedNode = redisURIs.iterator().next();
MasterSlaveTopologyRefresh refresh = new MasterSlaveTopologyRefresh(redisClient, topologyProvider);
MasterSlaveConnectionProvider<K, V> connectionProvider = new MasterSlaveConnectionProvider<>(redisClient, codec,
seedNode, initialConnections);
return refresh.getNodes(seedNode).flatMap(nodes -> {
if (nodes.isEmpty()) {
return Mono.error(new RedisException(String.format("Cannot determine topology from %s", redisURIs)));
}
return initializeConnection(codec, seedNode, connectionProvider, nodes);
}).onErrorMap(ExecutionException.class, Throwable::getCause).toFuture();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void normal() throws Exception {
CompletableFuture<Integer> f = Mono.just(1)
.toFuture();
assertThat(f.get()).isEqualTo(1);
}
代码示例来源:origin: spring-projects/spring-framework
void registerAdapters(ReactiveAdapterRegistry registry) {
// Register Flux and Mono before Publisher...
registry.registerReactiveType(
ReactiveTypeDescriptor.singleOptionalValue(Mono.class, Mono::empty),
source -> (Mono<?>) source,
Mono::from
);
registry.registerReactiveType(
ReactiveTypeDescriptor.multiValue(Flux.class, Flux::empty),
source -> (Flux<?>) source,
Flux::from);
registry.registerReactiveType(
ReactiveTypeDescriptor.multiValue(Publisher.class, Flux::empty),
source -> (Publisher<?>) source,
source -> source);
registry.registerReactiveType(
ReactiveTypeDescriptor.singleOptionalValue(CompletableFuture.class, () -> {
CompletableFuture<?> empty = new CompletableFuture<>();
empty.complete(null);
return empty;
}),
source -> Mono.fromFuture((CompletableFuture<?>) source),
source -> Mono.from(source).toFuture()
);
}
}
代码示例来源:origin: lettuce-io/lettuce-core
@Override
public CompletableFuture<List<RedisNodeDescription>> getNodesAsync() {
logger.debug("Performing topology lookup");
RedisFuture<String> info = connection.async().info("replication");
try {
return Mono.fromCompletionStage(info).timeout(redisURI.getTimeout()).map(this::getNodesFromInfo).toFuture();
} catch (RuntimeException e) {
throw Exceptions.bubble(e);
}
}
代码示例来源:origin: reactor/reactor-core
@Test
public void empty() throws Exception {
CompletableFuture<Integer> f = Mono.<Integer>empty().toFuture();
assertThat(f.get()).isNull();
}
}
代码示例来源:origin: reactor/reactor-core
@Test(expected = Exception.class)
public void error() throws Exception {
CompletableFuture<Integer> f =
Mono.<Integer>error(new Exception("test")).toFuture();
assertThat(f.isDone()).isTrue();
assertThat(f.isCompletedExceptionally()).isTrue();
f.get();
}
代码示例来源:origin: lettuce-io/lettuce-core
@Override
public CompletableFuture<StatefulRedisMasterSlaveConnection<K, V>> connectAsync() {
TopologyProvider topologyProvider = new SentinelTopologyProvider(redisURI.getSentinelMasterId(), redisClient, redisURI);
SentinelTopologyRefresh sentinelTopologyRefresh = new SentinelTopologyRefresh(redisClient,
redisURI.getSentinelMasterId(), redisURI.getSentinels());
MasterSlaveTopologyRefresh refresh = new MasterSlaveTopologyRefresh(redisClient, topologyProvider);
MasterSlaveConnectionProvider<K, V> connectionProvider = new MasterSlaveConnectionProvider<>(redisClient, codec,
redisURI, Collections.emptyMap());
Runnable runnable = getTopologyRefreshRunnable(refresh, connectionProvider);
return refresh.getNodes(redisURI).flatMap(nodes -> {
if (nodes.isEmpty()) {
return Mono.error(new RedisException(String.format("Cannot determine topology from %s", redisURI)));
}
return initializeConnection(codec, sentinelTopologyRefresh, connectionProvider, runnable, nodes);
}).onErrorMap(ExecutionException.class, Throwable::getCause).toFuture();
}
代码示例来源:origin: lettuce-io/lettuce-core
@Override
public CompletableFuture<StatefulRedisMasterSlaveConnection<K, V>> connectAsync() {
ConnectionFuture<StatefulRedisConnection<K, V>> initialConnection = redisClient.connectAsync(codec, redisURI);
Mono<StatefulRedisMasterSlaveConnection<K, V>> connect = Mono
.fromCompletionStage(initialConnection)
.flatMap(
nodeConnection -> {
initialConnections.put(redisURI, nodeConnection);
TopologyProvider topologyProvider = new MasterSlaveTopologyProvider(nodeConnection, redisURI);
return Mono.fromCompletionStage(topologyProvider.getNodesAsync()).flatMap(
nodes -> getMasterConnectionAndUri(nodes, Tuples.of(redisURI, nodeConnection), codec));
}).flatMap(connectionAndUri -> {
return initializeConnection(codec, connectionAndUri);
});
return connect.onErrorResume(t -> {
Mono<Void> close = Mono.empty();
for (StatefulRedisConnection<?, ?> connection : initialConnections.values()) {
close = close.then(Mono.fromFuture(connection.closeAsync()));
}
return close.then(Mono.error(t));
}).onErrorMap(ExecutionException.class, Throwable::getCause).toFuture();
}
代码示例来源:origin: lettuce-io/lettuce-core
return connections.filter(StatefulConnection::isOpen).next().switchIfEmpty(connections.next()).toFuture();
int index = ThreadLocalRandom.current().nextInt(it.size());
return it.get(index);
}).switchIfEmpty(connections.next()).toFuture();
} catch (RuntimeException e) {
throw Exceptions.bubble(e);
代码示例来源:origin: resilience4j/resilience4j
@Test
public void shouldRecordSuccessWhenUsingToFuture() {
try {
Mono.just("Event")
.transform(CircuitBreakerOperator.of(circuitBreaker))
.toFuture()
.get();
assertSingleSuccessfulCall();
} catch (InterruptedException | ExecutionException e) {
fail();
}
}
}
代码示例来源:origin: org.springframework/spring-core
void registerAdapters(ReactiveAdapterRegistry registry) {
// Register Flux and Mono before Publisher...
registry.registerReactiveType(
ReactiveTypeDescriptor.singleOptionalValue(Mono.class, Mono::empty),
source -> (Mono<?>) source,
Mono::from
);
registry.registerReactiveType(
ReactiveTypeDescriptor.multiValue(Flux.class, Flux::empty),
source -> (Flux<?>) source,
Flux::from);
registry.registerReactiveType(
ReactiveTypeDescriptor.multiValue(Publisher.class, Flux::empty),
source -> (Publisher<?>) source,
source -> source);
registry.registerReactiveType(
ReactiveTypeDescriptor.singleOptionalValue(CompletableFuture.class, () -> {
CompletableFuture<?> empty = new CompletableFuture<>();
empty.complete(null);
return empty;
}),
source -> Mono.fromFuture((CompletableFuture<?>) source),
source -> Mono.from(source).toFuture()
);
}
}
代码示例来源:origin: reactor/reactor-core
@Test
public void delayElementShouldNotCancelTwice() throws Exception {
DirectProcessor<Long> p = DirectProcessor.create();
AtomicInteger cancellations = new AtomicInteger();
Flux<Long> publishedFlux = p
.publish()
.refCount(2)
.doOnCancel(() -> cancellations.incrementAndGet());
publishedFlux.any(x -> x > 5)
.delayElement(Duration.ofMillis(2))
.subscribe();
CompletableFuture<List<Long>> result = publishedFlux.collectList().toFuture();
for (long i = 0; i < 10; i++) {
p.onNext(i);
Thread.sleep(1);
}
p.onComplete();
assertThat(result.get(10, TimeUnit.MILLISECONDS).size()).isEqualTo(10);
assertThat(cancellations.get()).isEqualTo(2);
}
代码示例来源:origin: lettuce-io/lettuce-core
.doOnNext(
c -> connection.registerCloseables(closeableResources, clusterWriter, pooledClusterConnectionProvider))
.map(it -> (StatefulRedisClusterPubSubConnection<K, V>) it).toFuture();
代码示例来源:origin: lettuce-io/lettuce-core
.doOnNext(
c -> connection.registerCloseables(closeableResources, clusterWriter, pooledClusterConnectionProvider))
.map(it -> (StatefulRedisClusterConnection<K, V>) it).toFuture();
代码示例来源:origin: reactor/reactor-core
@Test
public void test() throws Exception {
Flux.just("red", "white", "blue")
.log("source")
.flatMap(value -> Mono.fromCallable(() -> {
Thread.sleep(1000);
return value;
}).subscribeOn(Schedulers.elastic()))
.log("merged")
.collect(Result::new, Result::add)
.doOnNext(Result::stop)
.log("accumulated")
.toFuture()
.get();
}
代码示例来源:origin: lettuce-io/lettuce-core
}).toFuture();
代码示例来源:origin: mulesoft/mule
/**
* {@inheritDoc}
*/
@Override
public <T, A> CompletableFuture<Result<T, A>> executeAsync(String extension, String operation, OperationParameters parameters) {
OperationMessageProcessor processor = createProcessor(extension, operation, parameters);
return just(getEvent()).transform(processor)
.map(event -> Result.<T, A>builder(event.getMessage()).build())
.onErrorMap(Exceptions::unwrap)
.doAfterTerminate(() -> disposeProcessor(processor)).toFuture();
}
代码示例来源:origin: mulesoft/mule
@Override
public CompletableFuture<InterceptionEvent> around(ComponentLocation location,
Map<String, ProcessorParameterValue> parameters,
InterceptionEvent event, InterceptionAction action) {
Mono<InterceptionEvent> errorMono = Mono.error(thrown);
return Mono.from(((BaseEventContext) event.getContext()).error(thrown)).then(errorMono).toFuture();
}
内容来源于网络,如有侵权,请联系作者删除!