本文整理了Java中reactor.core.publisher.Mono.fromCompletionStage()
方法的一些代码示例,展示了Mono.fromCompletionStage()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Mono.fromCompletionStage()
方法的具体详情如下:
包路径:reactor.core.publisher.Mono
类名称:Mono
方法名:fromCompletionStage
[英]Create a Mono, producing its value using the provided CompletionStage.
Note that the completion stage is not cancelled when that Mono is cancelled, but that behavior can be obtained by using #doFinally(Consumer) that checks for a SignalType#CANCEL and calls eg. CompletionStage#toCompletableFuture().
[中]创建Mono,使用提供的CompletionStage产生其价值。
请注意,当Mono被取消时,完成阶段不会被取消,但可以通过使用#doFinally(Consumer)检查SignalType#CANCEL并调用例如CompletionStage#toCompletableFuture()来获得该行为。
代码示例来源:origin: lettuce-io/lettuce-core
private static <T> Mono<T> getMono(CompletableFuture<T> future) {
return Mono.fromCompletionStage(future);
}
}
代码示例来源:origin: lettuce-io/lettuce-core
public Mono<Connections> asMono(Duration timeout, ScheduledExecutorService timeoutExecutor) {
Connections connections = new Connections(this.connections.size(), nodeList);
for (Map.Entry<RedisURI, CompletableFuture<StatefulRedisConnection<String, String>>> entry : this.connections
.entrySet()) {
CompletableFuture<StatefulRedisConnection<String, String>> future = entry.getValue();
future.whenComplete((connection, throwable) -> {
if (throwable != null) {
connections.accept(throwable);
} else {
connections.accept(Tuples.of(entry.getKey(), connection));
}
});
}
return Mono.fromCompletionStage(connections.getOrTimeout(timeout, timeoutExecutor));
}
}
代码示例来源:origin: lettuce-io/lettuce-core
private Mono<SocketAddress> lookupRedis(RedisURI sentinelUri) {
Mono<StatefulRedisSentinelConnection<String, String>> connection = Mono.fromCompletionStage(connectSentinelAsync(
newStringStringCodec(), sentinelUri, timeout));
return connection.flatMap(c -> c.reactive() //
.getMasterAddrByName(sentinelUri.getSentinelMasterId()) //
.timeout(this.timeout) //
.flatMap(it -> Mono.fromCompletionStage(c.closeAsync()) //
.then(Mono.just(it))));
}
代码示例来源:origin: lettuce-io/lettuce-core
private <T, K, V> Mono<T> connect(Mono<SocketAddress> socketAddressSupplier, RedisCodec<K, V> codec,
DefaultEndpoint endpoint,
RedisChannelHandler<K, V> connection, Supplier<CommandHandler> commandHandlerSupplier) {
ConnectionFuture<T> future = connectStatefulAsync(connection, codec, endpoint, getFirstUri(), socketAddressSupplier,
commandHandlerSupplier);
return Mono.fromCompletionStage(future).doOnError(t -> logger.warn(t.getMessage()));
}
代码示例来源:origin: reactor/reactor-core
@Test
public void fromCompletionStage() {
CompletionStage<String> completionStage = CompletableFuture.supplyAsync(() -> "helloFuture");
assertThat(Mono.fromCompletionStage(completionStage).block())
.isEqualTo("helloFuture");
}
代码示例来源:origin: reactor/reactor-core
@Test
public void fromCompletionStageSupplier() {
AtomicInteger source = new AtomicInteger();
Supplier<CompletableFuture<Integer>> supplier = () -> CompletableFuture.completedFuture(source.incrementAndGet());
Mono<Number> mono = Mono.fromCompletionStage(supplier);
Assertions.assertThat(source).hasValue(0);
Assertions.assertThat(mono.block())
.isEqualTo(source.get())
.isEqualTo(1);
Assertions.assertThat(mono.block())
.isEqualTo(source.get())
.isEqualTo(2);
}
代码示例来源:origin: lettuce-io/lettuce-core
private Mono<RedisNodeDescription> getNodeDescription(List<StatefulRedisConnection<String, String>> connections,
RedisURI uri) {
return Mono.fromCompletionStage(redisClient.connectAsync(StringCodec.UTF8, uri)) //
.onErrorResume(t -> {
logger.warn("Cannot connect to {}", uri, t);
return Mono.empty();
}) //
.doOnNext(connections::add) //
.flatMap(connection -> {
Mono<RedisNodeDescription> instance = getNodeDescription(uri, connection);
return instance.flatMap(it -> ResumeAfter.close(connection).thenEmit(it)).doFinally(s -> {
connections.remove(connection);
});
});
}
代码示例来源:origin: lettuce-io/lettuce-core
public <T> Mono<T> thenEmit(T value) {
return Mono.defer(() -> {
if (firstCloseLatch()) {
return Mono.fromCompletionStage(closeable.closeAsync());
}
return Mono.empty();
}).then(Mono.just(value)).doFinally(s -> {
if (firstCloseLatch()) {
closeable.closeAsync();
}
});
}
代码示例来源:origin: spring-projects/spring-data-redis
@SuppressWarnings({ "unchecked", "rawtypes" })
protected Mono<RedisReactiveCommands<ByteBuffer, ByteBuffer>> getCommands(RedisNode node) {
if (StringUtils.hasText(node.getId())) {
return getConnection().cast(StatefulRedisClusterConnection.class)
.map(it -> it.getConnection(node.getId()).reactive());
}
return getConnection().flatMap(it -> Mono.fromCompletionStage(it.getConnectionAsync(node.getHost(), node.getPort()))
.map(StatefulRedisConnection::reactive));
}
}
代码示例来源:origin: lettuce-io/lettuce-core
@Override
public CompletableFuture<StatefulRedisMasterSlaveConnection<K, V>> connectAsync() {
ConnectionFuture<StatefulRedisConnection<K, V>> initialConnection = redisClient.connectAsync(codec, redisURI);
Mono<StatefulRedisMasterSlaveConnection<K, V>> connect = Mono
.fromCompletionStage(initialConnection)
.flatMap(
nodeConnection -> {
initialConnections.put(redisURI, nodeConnection);
TopologyProvider topologyProvider = new MasterSlaveTopologyProvider(nodeConnection, redisURI);
return Mono.fromCompletionStage(topologyProvider.getNodesAsync()).flatMap(
nodes -> getMasterConnectionAndUri(nodes, Tuples.of(redisURI, nodeConnection), codec));
}).flatMap(connectionAndUri -> {
return initializeConnection(codec, connectionAndUri);
});
return connect.onErrorResume(t -> {
Mono<Void> close = Mono.empty();
for (StatefulRedisConnection<?, ?> connection : initialConnections.values()) {
close = close.then(Mono.fromFuture(connection.closeAsync()));
}
return close.then(Mono.error(t));
}).onErrorMap(ExecutionException.class, Throwable::getCause).toFuture();
}
代码示例来源:origin: lettuce-io/lettuce-core
private Mono<Tuple2<RedisURI, StatefulRedisConnection<K, V>>> getMasterConnectionAndUri(List<RedisNodeDescription> nodes,
Tuple2<RedisURI, StatefulRedisConnection<K, V>> connectionTuple, RedisCodec<K, V> codec) {
RedisNodeDescription node = getConnectedNode(redisURI, nodes);
if (node.getRole() != RedisInstance.Role.MASTER) {
RedisNodeDescription master = lookupMaster(nodes);
ConnectionFuture<StatefulRedisConnection<K, V>> masterConnection = redisClient.connectAsync(codec, master.getUri());
return Mono.just(master.getUri()).zipWith(Mono.fromCompletionStage(masterConnection)) //
.doOnNext(it -> {
initialConnections.put(it.getT1(), it.getT2());
});
}
return Mono.just(connectionTuple);
}
代码示例来源:origin: lettuce-io/lettuce-core
@Override
public CompletableFuture<List<RedisNodeDescription>> getNodesAsync() {
logger.debug("Performing topology lookup");
RedisFuture<String> info = connection.async().info("replication");
try {
return Mono.fromCompletionStage(info).timeout(redisURI.getTimeout()).map(this::getNodesFromInfo).toFuture();
} catch (RuntimeException e) {
throw Exceptions.bubble(e);
}
}
代码示例来源:origin: lettuce-io/lettuce-core
public <T> Mono<T> thenError(Throwable t) {
return Mono.defer(() -> {
if (firstCloseLatch()) {
return Mono.fromCompletionStage(closeable.closeAsync());
}
return Mono.empty();
}).then(Mono.<T> error(t)).doFinally(s -> {
if (firstCloseLatch()) {
closeable.closeAsync();
}
});
}
代码示例来源:origin: spring-projects/spring-data-redis
@SuppressWarnings("unchecked")
AsyncConnect(LettuceConnectionProvider connectionProvider, Class<T> connectionType) {
Assert.notNull(connectionProvider, "LettuceConnectionProvider must not be null!");
Assert.notNull(connectionType, "Connection type must not be null!");
this.connectionProvider = connectionProvider;
Mono<StatefulConnection> defer = Mono
.defer(() -> Mono.fromCompletionStage(connectionProvider.getConnectionAsync(connectionType)));
this.connectionPublisher = defer.doOnNext(it -> {
if (isClosing(this.state.get())) {
it.closeAsync();
} else {
connection = it;
}
}) //
.cache() //
.handle((connection, sink) -> {
if (isClosing(this.state.get())) {
sink.error(new IllegalStateException("Unable to connect. Connection is closed!"));
} else {
sink.next((T) connection);
}
});
}
代码示例来源:origin: lettuce-io/lettuce-core
/**
* Load master slave nodes. Result contains an ordered list of {@link RedisNodeDescription}s. The sort key is the latency.
* Nodes with lower latency come first.
*
* @param seed collection of {@link RedisURI}s
* @return mapping between {@link RedisURI} and {@link Partitions}
*/
public Mono<List<RedisNodeDescription>> getNodes(RedisURI seed) {
CompletableFuture<List<RedisNodeDescription>> future = topologyProvider.getNodesAsync();
Mono<List<RedisNodeDescription>> initialNodes = Mono.fromFuture(future).doOnNext(nodes -> {
addPasswordIfNeeded(nodes, seed);
});
return initialNodes
.map(this::getConnections)
.flatMap(asyncConnections -> asyncConnections.asMono(seed.getTimeout(), eventExecutors))
.flatMap(
connections -> {
Requests requests = connections.requestPing();
CompletionStage<List<RedisNodeDescription>> nodes = requests.getOrTimeout(seed.getTimeout(),
eventExecutors);
return Mono.fromCompletionStage(nodes).flatMap(it -> ResumeAfter.close(connections).thenEmit(it));
});
}
代码示例来源:origin: lettuce-io/lettuce-core
private <K, V> Mono<StatefulRedisSentinelConnection<K, V>> connectSentinel(ConnectionBuilder connectionBuilder, RedisURI uri) {
connectionBuilder.socketAddressSupplier(getSocketAddressSupplier(uri));
SocketAddress socketAddress = clientResources.socketAddressResolver().resolve(uri);
logger.debug("Connecting to Redis Sentinel, address: " + socketAddress);
Mono<StatefulRedisSentinelConnection<K, V>> connectionMono = Mono
.fromCompletionStage(initializeChannelAsync(connectionBuilder));
return connectionMono.onErrorMap(CompletionException.class, Throwable::getCause) //
.doOnError(t -> logger.warn("Cannot connect Redis Sentinel at " + uri + ": " + t.toString())) //
.onErrorMap(e -> new RedisConnectionException("Cannot connect Redis Sentinel at " + uri, e));
}
代码示例来源:origin: lettuce-io/lettuce-core
private Mono<StatefulRedisMasterSlaveConnection<K, V>> initializeConnection(RedisCodec<K, V> codec,
SentinelTopologyRefresh sentinelTopologyRefresh, MasterSlaveConnectionProvider<K, V> connectionProvider,
Runnable runnable, List<RedisNodeDescription> nodes) {
connectionProvider.setKnownNodes(nodes);
MasterSlaveChannelWriter channelWriter = new MasterSlaveChannelWriter(connectionProvider, redisClient.getResources()) {
@Override
public CompletableFuture<Void> closeAsync() {
return CompletableFuture.allOf(super.closeAsync(), sentinelTopologyRefresh.closeAsync());
}
};
StatefulRedisMasterSlaveConnectionImpl<K, V> connection = new StatefulRedisMasterSlaveConnectionImpl<>(channelWriter,
codec, redisURI.getTimeout());
connection.setOptions(redisClient.getOptions());
CompletionStage<Void> bind = sentinelTopologyRefresh.bind(runnable);
return Mono.fromCompletionStage(bind).onErrorResume(t -> {
return ResumeAfter.close(connection).thenError(t);
}).then(Mono.just(connection));
}
代码示例来源:origin: lettuce-io/lettuce-core
connect = Mono.fromCompletionStage(initializeChannelAsync(connectionBuilder));
} else {
代码示例来源:origin: com.fireflysource/firefly-reactive
@Override
public <T> Mono<T> inTransaction(Func1<ReactiveSQLConnection, Mono<T>> func1) {
return Mono.fromCompletionStage(sqlConnection.inTransaction(conn -> {
Promise.Completable<T> completable = new Promise.Completable<>();
func1.call(this)
.subscribe(completable::succeeded, completable::failed);
return completable;
}));
}
代码示例来源:origin: hypercube1024/firefly
@Override
public <T> Mono<T> newTransaction(Func1<ReactiveSQLConnection, Mono<T>> func1) {
return Mono.fromCompletionStage(sqlClient.newTransaction(conn -> {
Promise.Completable<T> completable = new Promise.Completable<>();
func1.call(new ReactiveSQLConnectionAdapter(conn))
.subscribe(completable::succeeded, completable::failed);
return completable;
}));
}
内容来源于网络,如有侵权,请联系作者删除!