reactor.core.publisher.Mono.fromCompletionStage()方法的使用及代码示例

x33g5p2x  于2022-01-24 转载在 其他  
字(11.9k)|赞(0)|评价(0)|浏览(320)

本文整理了Java中reactor.core.publisher.Mono.fromCompletionStage()方法的一些代码示例,展示了Mono.fromCompletionStage()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Mono.fromCompletionStage()方法的具体详情如下:
包路径:reactor.core.publisher.Mono
类名称:Mono
方法名:fromCompletionStage

Mono.fromCompletionStage介绍

[英]Create a Mono, producing its value using the provided CompletionStage.

Note that the completion stage is not cancelled when that Mono is cancelled, but that behavior can be obtained by using #doFinally(Consumer) that checks for a SignalType#CANCEL and calls eg. CompletionStage#toCompletableFuture().
[中]创建Mono,使用提供的CompletionStage产生其价值。
请注意,当Mono被取消时,完成阶段不会被取消,但可以通过使用#doFinally(Consumer)检查SignalType#CANCEL并调用例如CompletionStage#toCompletableFuture()来获得该行为。

代码示例

代码示例来源:origin: lettuce-io/lettuce-core

private static <T> Mono<T> getMono(CompletableFuture<T> future) {
    return Mono.fromCompletionStage(future);
  }
}

代码示例来源:origin: lettuce-io/lettuce-core

public Mono<Connections> asMono(Duration timeout, ScheduledExecutorService timeoutExecutor) {

    Connections connections = new Connections(this.connections.size(), nodeList);

    for (Map.Entry<RedisURI, CompletableFuture<StatefulRedisConnection<String, String>>> entry : this.connections
        .entrySet()) {

      CompletableFuture<StatefulRedisConnection<String, String>> future = entry.getValue();

      future.whenComplete((connection, throwable) -> {

        if (throwable != null) {
          connections.accept(throwable);
        } else {
          connections.accept(Tuples.of(entry.getKey(), connection));
        }
      });
    }

    return Mono.fromCompletionStage(connections.getOrTimeout(timeout, timeoutExecutor));
  }
}

代码示例来源:origin: lettuce-io/lettuce-core

private Mono<SocketAddress> lookupRedis(RedisURI sentinelUri) {
  Mono<StatefulRedisSentinelConnection<String, String>> connection = Mono.fromCompletionStage(connectSentinelAsync(
      newStringStringCodec(), sentinelUri, timeout));
  return connection.flatMap(c -> c.reactive() //
      .getMasterAddrByName(sentinelUri.getSentinelMasterId()) //
      .timeout(this.timeout) //
      .flatMap(it -> Mono.fromCompletionStage(c.closeAsync()) //
          .then(Mono.just(it))));
}

代码示例来源:origin: lettuce-io/lettuce-core

private <T, K, V> Mono<T> connect(Mono<SocketAddress> socketAddressSupplier, RedisCodec<K, V> codec,
    DefaultEndpoint endpoint,
    RedisChannelHandler<K, V> connection, Supplier<CommandHandler> commandHandlerSupplier) {
  ConnectionFuture<T> future = connectStatefulAsync(connection, codec, endpoint, getFirstUri(), socketAddressSupplier,
      commandHandlerSupplier);
  return Mono.fromCompletionStage(future).doOnError(t -> logger.warn(t.getMessage()));
}

代码示例来源:origin: reactor/reactor-core

@Test
public void fromCompletionStage() {
  CompletionStage<String> completionStage = CompletableFuture.supplyAsync(() -> "helloFuture");
  assertThat(Mono.fromCompletionStage(completionStage).block())
      .isEqualTo("helloFuture");
}

代码示例来源:origin: reactor/reactor-core

@Test
public void fromCompletionStageSupplier() {
  AtomicInteger source = new AtomicInteger();
  Supplier<CompletableFuture<Integer>> supplier = () -> CompletableFuture.completedFuture(source.incrementAndGet());
  Mono<Number> mono = Mono.fromCompletionStage(supplier);
  Assertions.assertThat(source).hasValue(0);
  Assertions.assertThat(mono.block())
       .isEqualTo(source.get())
       .isEqualTo(1);
  Assertions.assertThat(mono.block())
       .isEqualTo(source.get())
       .isEqualTo(2);
}

代码示例来源:origin: lettuce-io/lettuce-core

private Mono<RedisNodeDescription> getNodeDescription(List<StatefulRedisConnection<String, String>> connections,
    RedisURI uri) {
  return Mono.fromCompletionStage(redisClient.connectAsync(StringCodec.UTF8, uri)) //
      .onErrorResume(t -> {
        logger.warn("Cannot connect to {}", uri, t);
        return Mono.empty();
      }) //
      .doOnNext(connections::add) //
      .flatMap(connection -> {
        Mono<RedisNodeDescription> instance = getNodeDescription(uri, connection);
        return instance.flatMap(it -> ResumeAfter.close(connection).thenEmit(it)).doFinally(s -> {
          connections.remove(connection);
        });
      });
}

代码示例来源:origin: lettuce-io/lettuce-core

public <T> Mono<T> thenEmit(T value) {
  return Mono.defer(() -> {
    if (firstCloseLatch()) {
      return Mono.fromCompletionStage(closeable.closeAsync());
    }
    return Mono.empty();
  }).then(Mono.just(value)).doFinally(s -> {
    if (firstCloseLatch()) {
      closeable.closeAsync();
    }
  });
}

代码示例来源:origin: spring-projects/spring-data-redis

@SuppressWarnings({ "unchecked", "rawtypes" })
  protected Mono<RedisReactiveCommands<ByteBuffer, ByteBuffer>> getCommands(RedisNode node) {

    if (StringUtils.hasText(node.getId())) {
      return getConnection().cast(StatefulRedisClusterConnection.class)
          .map(it -> it.getConnection(node.getId()).reactive());
    }

    return getConnection().flatMap(it -> Mono.fromCompletionStage(it.getConnectionAsync(node.getHost(), node.getPort()))
        .map(StatefulRedisConnection::reactive));
  }
}

代码示例来源:origin: lettuce-io/lettuce-core

@Override
public CompletableFuture<StatefulRedisMasterSlaveConnection<K, V>> connectAsync() {
  ConnectionFuture<StatefulRedisConnection<K, V>> initialConnection = redisClient.connectAsync(codec, redisURI);
  Mono<StatefulRedisMasterSlaveConnection<K, V>> connect = Mono
      .fromCompletionStage(initialConnection)
      .flatMap(
          nodeConnection -> {
            initialConnections.put(redisURI, nodeConnection);
            TopologyProvider topologyProvider = new MasterSlaveTopologyProvider(nodeConnection, redisURI);
            return Mono.fromCompletionStage(topologyProvider.getNodesAsync()).flatMap(
                nodes -> getMasterConnectionAndUri(nodes, Tuples.of(redisURI, nodeConnection), codec));
          }).flatMap(connectionAndUri -> {
        return initializeConnection(codec, connectionAndUri);
      });
  return connect.onErrorResume(t -> {
    Mono<Void> close = Mono.empty();
    for (StatefulRedisConnection<?, ?> connection : initialConnections.values()) {
      close = close.then(Mono.fromFuture(connection.closeAsync()));
    }
    return close.then(Mono.error(t));
  }).onErrorMap(ExecutionException.class, Throwable::getCause).toFuture();
}

代码示例来源:origin: lettuce-io/lettuce-core

private Mono<Tuple2<RedisURI, StatefulRedisConnection<K, V>>> getMasterConnectionAndUri(List<RedisNodeDescription> nodes,
    Tuple2<RedisURI, StatefulRedisConnection<K, V>> connectionTuple, RedisCodec<K, V> codec) {
  RedisNodeDescription node = getConnectedNode(redisURI, nodes);
  if (node.getRole() != RedisInstance.Role.MASTER) {
    RedisNodeDescription master = lookupMaster(nodes);
    ConnectionFuture<StatefulRedisConnection<K, V>> masterConnection = redisClient.connectAsync(codec, master.getUri());
    return Mono.just(master.getUri()).zipWith(Mono.fromCompletionStage(masterConnection)) //
        .doOnNext(it -> {
          initialConnections.put(it.getT1(), it.getT2());
        });
  }
  return Mono.just(connectionTuple);
}

代码示例来源:origin: lettuce-io/lettuce-core

@Override
public CompletableFuture<List<RedisNodeDescription>> getNodesAsync() {
  logger.debug("Performing topology lookup");
  RedisFuture<String> info = connection.async().info("replication");
  try {
    return Mono.fromCompletionStage(info).timeout(redisURI.getTimeout()).map(this::getNodesFromInfo).toFuture();
  } catch (RuntimeException e) {
    throw Exceptions.bubble(e);
  }
}

代码示例来源:origin: lettuce-io/lettuce-core

public <T> Mono<T> thenError(Throwable t) {
  return Mono.defer(() -> {
    if (firstCloseLatch()) {
      return Mono.fromCompletionStage(closeable.closeAsync());
    }
    return Mono.empty();
  }).then(Mono.<T> error(t)).doFinally(s -> {
    if (firstCloseLatch()) {
      closeable.closeAsync();
    }
  });
}

代码示例来源:origin: spring-projects/spring-data-redis

@SuppressWarnings("unchecked")
AsyncConnect(LettuceConnectionProvider connectionProvider, Class<T> connectionType) {
  Assert.notNull(connectionProvider, "LettuceConnectionProvider must not be null!");
  Assert.notNull(connectionType, "Connection type must not be null!");
  this.connectionProvider = connectionProvider;
  Mono<StatefulConnection> defer = Mono
      .defer(() -> Mono.fromCompletionStage(connectionProvider.getConnectionAsync(connectionType)));
  this.connectionPublisher = defer.doOnNext(it -> {
    if (isClosing(this.state.get())) {
      it.closeAsync();
    } else {
      connection = it;
    }
  }) //
      .cache() //
      .handle((connection, sink) -> {
        if (isClosing(this.state.get())) {
          sink.error(new IllegalStateException("Unable to connect. Connection is closed!"));
        } else {
          sink.next((T) connection);
        }
      });
}

代码示例来源:origin: lettuce-io/lettuce-core

/**
 * Load master slave nodes. Result contains an ordered list of {@link RedisNodeDescription}s. The sort key is the latency.
 * Nodes with lower latency come first.
 *
 * @param seed collection of {@link RedisURI}s
 * @return mapping between {@link RedisURI} and {@link Partitions}
 */
public Mono<List<RedisNodeDescription>> getNodes(RedisURI seed) {
  CompletableFuture<List<RedisNodeDescription>> future = topologyProvider.getNodesAsync();
  Mono<List<RedisNodeDescription>> initialNodes = Mono.fromFuture(future).doOnNext(nodes -> {
    addPasswordIfNeeded(nodes, seed);
  });
  return initialNodes
      .map(this::getConnections)
      .flatMap(asyncConnections -> asyncConnections.asMono(seed.getTimeout(), eventExecutors))
      .flatMap(
          connections -> {
            Requests requests = connections.requestPing();
            CompletionStage<List<RedisNodeDescription>> nodes = requests.getOrTimeout(seed.getTimeout(),
                eventExecutors);
            return Mono.fromCompletionStage(nodes).flatMap(it -> ResumeAfter.close(connections).thenEmit(it));
          });
}

代码示例来源:origin: lettuce-io/lettuce-core

private <K, V> Mono<StatefulRedisSentinelConnection<K, V>> connectSentinel(ConnectionBuilder connectionBuilder, RedisURI uri) {
  connectionBuilder.socketAddressSupplier(getSocketAddressSupplier(uri));
  SocketAddress socketAddress = clientResources.socketAddressResolver().resolve(uri);
  logger.debug("Connecting to Redis Sentinel, address: " + socketAddress);
  Mono<StatefulRedisSentinelConnection<K, V>> connectionMono = Mono
      .fromCompletionStage(initializeChannelAsync(connectionBuilder));
  return connectionMono.onErrorMap(CompletionException.class, Throwable::getCause) //
      .doOnError(t -> logger.warn("Cannot connect Redis Sentinel at " + uri + ": " + t.toString())) //
      .onErrorMap(e -> new RedisConnectionException("Cannot connect Redis Sentinel at " + uri, e));
}

代码示例来源:origin: lettuce-io/lettuce-core

private Mono<StatefulRedisMasterSlaveConnection<K, V>> initializeConnection(RedisCodec<K, V> codec,
    SentinelTopologyRefresh sentinelTopologyRefresh, MasterSlaveConnectionProvider<K, V> connectionProvider,
    Runnable runnable, List<RedisNodeDescription> nodes) {
  connectionProvider.setKnownNodes(nodes);
  MasterSlaveChannelWriter channelWriter = new MasterSlaveChannelWriter(connectionProvider, redisClient.getResources()) {
    @Override
    public CompletableFuture<Void> closeAsync() {
      return CompletableFuture.allOf(super.closeAsync(), sentinelTopologyRefresh.closeAsync());
    }
  };
  StatefulRedisMasterSlaveConnectionImpl<K, V> connection = new StatefulRedisMasterSlaveConnectionImpl<>(channelWriter,
      codec, redisURI.getTimeout());
  connection.setOptions(redisClient.getOptions());
  CompletionStage<Void> bind = sentinelTopologyRefresh.bind(runnable);
  return Mono.fromCompletionStage(bind).onErrorResume(t -> {
    return ResumeAfter.close(connection).thenError(t);
  }).then(Mono.just(connection));
}

代码示例来源:origin: lettuce-io/lettuce-core

connect = Mono.fromCompletionStage(initializeChannelAsync(connectionBuilder));
} else {

代码示例来源:origin: com.fireflysource/firefly-reactive

@Override
public <T> Mono<T> inTransaction(Func1<ReactiveSQLConnection, Mono<T>> func1) {
  return Mono.fromCompletionStage(sqlConnection.inTransaction(conn -> {
    Promise.Completable<T> completable = new Promise.Completable<>();
    func1.call(this)
       .subscribe(completable::succeeded, completable::failed);
    return completable;
  }));
}

代码示例来源:origin: hypercube1024/firefly

@Override
public <T> Mono<T> newTransaction(Func1<ReactiveSQLConnection, Mono<T>> func1) {
  return Mono.fromCompletionStage(sqlClient.newTransaction(conn -> {
    Promise.Completable<T> completable = new Promise.Completable<>();
    func1.call(new ReactiveSQLConnectionAdapter(conn))
       .subscribe(completable::succeeded, completable::failed);
    return completable;
  }));
}

相关文章

Mono类方法