reactor.core.publisher.Mono.toFuture()方法的使用及代码示例

x33g5p2x  于2022-01-24 转载在 其他  
字(10.3k)|赞(0)|评价(0)|浏览(365)

本文整理了Java中reactor.core.publisher.Mono.toFuture()方法的一些代码示例,展示了Mono.toFuture()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Mono.toFuture()方法的具体详情如下:
包路径:reactor.core.publisher.Mono
类名称:Mono
方法名:toFuture

Mono.toFuture介绍

[英]Transform this Mono into a CompletableFuture completing on onNext or onComplete and failing on onError.
[中]将此Mono转换为一个完整的未来,在onNext或onComplete上完成,在onError上失败。

代码示例

代码示例来源:origin: lettuce-io/lettuce-core

@Override
public CompletableFuture<List<RedisNodeDescription>> getNodesAsync() {
  logger.debug("lookup topology for masterId {}", masterId);
  Mono<StatefulRedisSentinelConnection<String, String>> connect = Mono.fromFuture(redisClient.connectSentinelAsync(CODEC,
      sentinelUri));
  return connect.flatMap(this::getNodes).toFuture();
}

代码示例来源:origin: lettuce-io/lettuce-core

@Override
public CompletableFuture<List<RedisNodeDescription>> getNodesAsync() {
  List<StatefulRedisConnection<String, String>> connections = new CopyOnWriteArrayList<>();
  Flux<RedisURI> uris = Flux.fromIterable(redisURIs);
  Mono<List<RedisNodeDescription>> nodes = uris
      .flatMap(uri -> getNodeDescription(connections, uri))
      .collectList()
      .flatMap(
          (nodeDescriptions) -> {
            if (nodeDescriptions.isEmpty()) {
              return Mono.error(new RedisConnectionException(String.format(
                  "Failed to connect to at least one node in %s", redisURIs)));
            }
            return Mono.just(nodeDescriptions);
          });
  return nodes.toFuture();
}

代码示例来源:origin: lettuce-io/lettuce-core

@Override
public CompletableFuture<StatefulRedisMasterSlaveConnection<K, V>> connectAsync() {
  Map<RedisURI, StatefulRedisConnection<K, V>> initialConnections = new HashMap<>();
  TopologyProvider topologyProvider = new StaticMasterSlaveTopologyProvider(redisClient, redisURIs);
  RedisURI seedNode = redisURIs.iterator().next();
  MasterSlaveTopologyRefresh refresh = new MasterSlaveTopologyRefresh(redisClient, topologyProvider);
  MasterSlaveConnectionProvider<K, V> connectionProvider = new MasterSlaveConnectionProvider<>(redisClient, codec,
      seedNode, initialConnections);
  return refresh.getNodes(seedNode).flatMap(nodes -> {
    if (nodes.isEmpty()) {
      return Mono.error(new RedisException(String.format("Cannot determine topology from %s", redisURIs)));
    }
    return initializeConnection(codec, seedNode, connectionProvider, nodes);
  }).onErrorMap(ExecutionException.class, Throwable::getCause).toFuture();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void normal() throws Exception {
  CompletableFuture<Integer> f = Mono.just(1)
                    .toFuture();
  assertThat(f.get()).isEqualTo(1);
}

代码示例来源:origin: spring-projects/spring-framework

void registerAdapters(ReactiveAdapterRegistry registry) {
    // Register Flux and Mono before Publisher...
    registry.registerReactiveType(
        ReactiveTypeDescriptor.singleOptionalValue(Mono.class, Mono::empty),
        source -> (Mono<?>) source,
        Mono::from
    );
    registry.registerReactiveType(
        ReactiveTypeDescriptor.multiValue(Flux.class, Flux::empty),
        source -> (Flux<?>) source,
        Flux::from);
    registry.registerReactiveType(
        ReactiveTypeDescriptor.multiValue(Publisher.class, Flux::empty),
        source -> (Publisher<?>) source,
        source -> source);
    registry.registerReactiveType(
        ReactiveTypeDescriptor.singleOptionalValue(CompletableFuture.class, () -> {
          CompletableFuture<?> empty = new CompletableFuture<>();
          empty.complete(null);
          return empty;
        }),
        source -> Mono.fromFuture((CompletableFuture<?>) source),
        source -> Mono.from(source).toFuture()
    );
  }
}

代码示例来源:origin: lettuce-io/lettuce-core

@Override
public CompletableFuture<List<RedisNodeDescription>> getNodesAsync() {
  logger.debug("Performing topology lookup");
  RedisFuture<String> info = connection.async().info("replication");
  try {
    return Mono.fromCompletionStage(info).timeout(redisURI.getTimeout()).map(this::getNodesFromInfo).toFuture();
  } catch (RuntimeException e) {
    throw Exceptions.bubble(e);
  }
}

代码示例来源:origin: reactor/reactor-core

@Test
  public void empty() throws Exception {
    CompletableFuture<Integer> f = Mono.<Integer>empty().toFuture();

    assertThat(f.get()).isNull();
  }
}

代码示例来源:origin: reactor/reactor-core

@Test(expected = Exception.class)
public void error() throws Exception {
  CompletableFuture<Integer> f =
      Mono.<Integer>error(new Exception("test")).toFuture();
  assertThat(f.isDone()).isTrue();
  assertThat(f.isCompletedExceptionally()).isTrue();
  f.get();
}

代码示例来源:origin: lettuce-io/lettuce-core

@Override
public CompletableFuture<StatefulRedisMasterSlaveConnection<K, V>> connectAsync() {
  TopologyProvider topologyProvider = new SentinelTopologyProvider(redisURI.getSentinelMasterId(), redisClient, redisURI);
  SentinelTopologyRefresh sentinelTopologyRefresh = new SentinelTopologyRefresh(redisClient,
      redisURI.getSentinelMasterId(), redisURI.getSentinels());
  MasterSlaveTopologyRefresh refresh = new MasterSlaveTopologyRefresh(redisClient, topologyProvider);
  MasterSlaveConnectionProvider<K, V> connectionProvider = new MasterSlaveConnectionProvider<>(redisClient, codec,
      redisURI, Collections.emptyMap());
  Runnable runnable = getTopologyRefreshRunnable(refresh, connectionProvider);
  return refresh.getNodes(redisURI).flatMap(nodes -> {
    if (nodes.isEmpty()) {
      return Mono.error(new RedisException(String.format("Cannot determine topology from %s", redisURI)));
    }
    return initializeConnection(codec, sentinelTopologyRefresh, connectionProvider, runnable, nodes);
  }).onErrorMap(ExecutionException.class, Throwable::getCause).toFuture();
}

代码示例来源:origin: lettuce-io/lettuce-core

@Override
public CompletableFuture<StatefulRedisMasterSlaveConnection<K, V>> connectAsync() {
  ConnectionFuture<StatefulRedisConnection<K, V>> initialConnection = redisClient.connectAsync(codec, redisURI);
  Mono<StatefulRedisMasterSlaveConnection<K, V>> connect = Mono
      .fromCompletionStage(initialConnection)
      .flatMap(
          nodeConnection -> {
            initialConnections.put(redisURI, nodeConnection);
            TopologyProvider topologyProvider = new MasterSlaveTopologyProvider(nodeConnection, redisURI);
            return Mono.fromCompletionStage(topologyProvider.getNodesAsync()).flatMap(
                nodes -> getMasterConnectionAndUri(nodes, Tuples.of(redisURI, nodeConnection), codec));
          }).flatMap(connectionAndUri -> {
        return initializeConnection(codec, connectionAndUri);
      });
  return connect.onErrorResume(t -> {
    Mono<Void> close = Mono.empty();
    for (StatefulRedisConnection<?, ?> connection : initialConnections.values()) {
      close = close.then(Mono.fromFuture(connection.closeAsync()));
    }
    return close.then(Mono.error(t));
  }).onErrorMap(ExecutionException.class, Throwable::getCause).toFuture();
}

代码示例来源:origin: lettuce-io/lettuce-core

return connections.filter(StatefulConnection::isOpen).next().switchIfEmpty(connections.next()).toFuture();
    int index = ThreadLocalRandom.current().nextInt(it.size());
    return it.get(index);
  }).switchIfEmpty(connections.next()).toFuture();
} catch (RuntimeException e) {
  throw Exceptions.bubble(e);

代码示例来源:origin: resilience4j/resilience4j

@Test
  public void shouldRecordSuccessWhenUsingToFuture() {
    try {
      Mono.just("Event")
          .transform(CircuitBreakerOperator.of(circuitBreaker))
          .toFuture()
          .get();

      assertSingleSuccessfulCall();
    } catch (InterruptedException | ExecutionException e) {
      fail();
    }

  }
}

代码示例来源:origin: org.springframework/spring-core

void registerAdapters(ReactiveAdapterRegistry registry) {
    // Register Flux and Mono before Publisher...
    registry.registerReactiveType(
        ReactiveTypeDescriptor.singleOptionalValue(Mono.class, Mono::empty),
        source -> (Mono<?>) source,
        Mono::from
    );
    registry.registerReactiveType(
        ReactiveTypeDescriptor.multiValue(Flux.class, Flux::empty),
        source -> (Flux<?>) source,
        Flux::from);
    registry.registerReactiveType(
        ReactiveTypeDescriptor.multiValue(Publisher.class, Flux::empty),
        source -> (Publisher<?>) source,
        source -> source);
    registry.registerReactiveType(
        ReactiveTypeDescriptor.singleOptionalValue(CompletableFuture.class, () -> {
          CompletableFuture<?> empty = new CompletableFuture<>();
          empty.complete(null);
          return empty;
        }),
        source -> Mono.fromFuture((CompletableFuture<?>) source),
        source -> Mono.from(source).toFuture()
    );
  }
}

代码示例来源:origin: reactor/reactor-core

@Test
public void delayElementShouldNotCancelTwice() throws Exception {
  DirectProcessor<Long> p = DirectProcessor.create();
  AtomicInteger cancellations = new AtomicInteger();
  Flux<Long> publishedFlux = p
    .publish()
    .refCount(2)
    .doOnCancel(() -> cancellations.incrementAndGet());
  publishedFlux.any(x -> x > 5)
    .delayElement(Duration.ofMillis(2))
    .subscribe();
  CompletableFuture<List<Long>> result = publishedFlux.collectList().toFuture();
  for (long i = 0; i < 10; i++) {
    p.onNext(i);
    Thread.sleep(1);
  }
  p.onComplete();
  assertThat(result.get(10, TimeUnit.MILLISECONDS).size()).isEqualTo(10);
  assertThat(cancellations.get()).isEqualTo(2);
}

代码示例来源:origin: lettuce-io/lettuce-core

.doOnNext(
    c -> connection.registerCloseables(closeableResources, clusterWriter, pooledClusterConnectionProvider))
.map(it -> (StatefulRedisClusterPubSubConnection<K, V>) it).toFuture();

代码示例来源:origin: lettuce-io/lettuce-core

.doOnNext(
    c -> connection.registerCloseables(closeableResources, clusterWriter, pooledClusterConnectionProvider))
.map(it -> (StatefulRedisClusterConnection<K, V>) it).toFuture();

代码示例来源:origin: reactor/reactor-core

@Test
public void test() throws Exception {
  Flux.just("red", "white", "blue")
    .log("source")
    .flatMap(value -> Mono.fromCallable(() -> {
                Thread.sleep(1000);
                return value;
              }).subscribeOn(Schedulers.elastic()))
    .log("merged")
    .collect(Result::new, Result::add)
    .doOnNext(Result::stop)
    .log("accumulated")
    .toFuture()
    .get();
}

代码示例来源:origin: lettuce-io/lettuce-core

}).toFuture();

代码示例来源:origin: mulesoft/mule

/**
 * {@inheritDoc}
 */
@Override
public <T, A> CompletableFuture<Result<T, A>> executeAsync(String extension, String operation, OperationParameters parameters) {
 OperationMessageProcessor processor = createProcessor(extension, operation, parameters);
 return just(getEvent()).transform(processor)
   .map(event -> Result.<T, A>builder(event.getMessage()).build())
   .onErrorMap(Exceptions::unwrap)
   .doAfterTerminate(() -> disposeProcessor(processor)).toFuture();
}

代码示例来源:origin: mulesoft/mule

@Override
public CompletableFuture<InterceptionEvent> around(ComponentLocation location,
                          Map<String, ProcessorParameterValue> parameters,
                          InterceptionEvent event, InterceptionAction action) {
 Mono<InterceptionEvent> errorMono = Mono.error(thrown);
 return Mono.from(((BaseEventContext) event.getContext()).error(thrown)).then(errorMono).toFuture();
}

相关文章

Mono类方法