reactor.core.publisher.Mono.fromFuture()方法的使用及代码示例

x33g5p2x  于2022-01-24 转载在 其他  
字(12.5k)|赞(0)|评价(0)|浏览(783)

本文整理了Java中reactor.core.publisher.Mono.fromFuture()方法的一些代码示例,展示了Mono.fromFuture()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Mono.fromFuture()方法的具体详情如下:
包路径:reactor.core.publisher.Mono
类名称:Mono
方法名:fromFuture

Mono.fromFuture介绍

[英]Create a Mono, producing its value using the provided CompletableFuture.

Note that the future is not cancelled when that Mono is cancelled, but that behavior can be obtained by using a #doFinally(Consumer) that checks for a SignalType#CANCEL and calls CompletableFuture#cancel(boolean).
[中]创建一个Mono,利用提供的可完成的未来创造其价值。
请注意,取消Mono时不会取消future,但可以通过使用#doFinally(使用者)检查SignalType#CANCEL并调用CompletableFuture#CANCEL(布尔值)来获得该行为。

代码示例

代码示例来源:origin: line/armeria

/**
 * Closes the {@link HttpResponseWriter} if it is opened.
 */
private Mono<Void> cleanup(@Nullable Throwable cause) {
  if (future.isDone()) {
    return Mono.empty();
  }
  if (cause != null) {
    future.completeExceptionally(cause);
    logger.debug("{} Response future has been completed with a cause", ctx, cause);
    return Mono.empty();
  }
  final HttpResponse response = HttpResponse.of(headers);
  future.complete(response);
  logger.debug("{} Response future has been completed with an HttpResponse", ctx);
  return Mono.fromFuture(response.completionFuture());
}

代码示例来源:origin: line/armeria

private Supplier<Mono<Void>> execute(Supplier<HttpRequest> supplier) {
  return () -> Mono.defer(() -> {
    assert request == null : request;
    request = supplier.get();
    future.complete(client.execute(request));
    return Mono.fromFuture(request.completionFuture());
  });
}

代码示例来源:origin: lettuce-io/lettuce-core

@Override
public CompletableFuture<List<RedisNodeDescription>> getNodesAsync() {
  logger.debug("lookup topology for masterId {}", masterId);
  Mono<StatefulRedisSentinelConnection<String, String>> connect = Mono.fromFuture(redisClient.connectSentinelAsync(CODEC,
      sentinelUri));
  return connect.flatMap(this::getNodes).toFuture();
}

代码示例来源:origin: line/armeria

@Override
public Mono<ClientHttpResponse> connect(
    HttpMethod method, URI uri, Function<? super ClientHttpRequest, Mono<Void>> requestCallback) {
  try {
    requireNonNull(method, "method");
    requireNonNull(uri, "uri");
    requireNonNull(requestCallback, "requestCallback");
    final ArmeriaClientHttpRequest request = createRequest(method, uri);
    return requestCallback.apply(request)
               .then(Mono.fromFuture(request.future()))
               .map(ArmeriaHttpClientResponseSubscriber::new)
               .flatMap(s -> Mono.fromFuture(s.httpHeadersFuture())
                        .map(headers -> createResponse(headers, s)));
  } catch (NullPointerException | IllegalArgumentException e) {
    return Mono.error(e);
  }
}

代码示例来源:origin: line/armeria

private Mono<Void> write(Flux<? extends DataBuffer> publisher) {
  return Mono.defer(() -> {
    final HttpResponse response = HttpResponse.of(
        Flux.concat(Mono.just(headers), publisher.map(factoryWrapper::toHttpData))
          // Publish the response stream on the event loop in order to avoid the possibility of
          // calling subscription.request() from multiple threads while publishing messages
          // with onNext signals or starting the subscription with onSubscribe signal.
          .publishOn(Schedulers.fromExecutor(ctx.eventLoop())));
    future.complete(response);
    return Mono.fromFuture(response.completionFuture());
  });
}

代码示例来源:origin: reactor/reactor-core

@Test
public void fromFutureSupplier() {
  AtomicInteger source = new AtomicInteger();
  Supplier<CompletableFuture<Integer>> supplier = () -> CompletableFuture.completedFuture(source.incrementAndGet());
  Mono<Number> mono = Mono.fromFuture(supplier);
  Assertions.assertThat(source).hasValue(0);
  Assertions.assertThat(mono.block())
       .isEqualTo(source.get())
       .isEqualTo(1);
  Assertions.assertThat(mono.block())
       .isEqualTo(source.get())
       .isEqualTo(2);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void fromCompletableFuture(){
  CompletableFuture<String> f = CompletableFuture.supplyAsync(() -> "helloFuture");
  assertThat(Mono.fromFuture(f)
          .block()).isEqualToIgnoringCase("helloFuture");
}

代码示例来源:origin: spring-projects/spring-framework

void registerAdapters(ReactiveAdapterRegistry registry) {
    // Register Flux and Mono before Publisher...
    registry.registerReactiveType(
        ReactiveTypeDescriptor.singleOptionalValue(Mono.class, Mono::empty),
        source -> (Mono<?>) source,
        Mono::from
    );
    registry.registerReactiveType(
        ReactiveTypeDescriptor.multiValue(Flux.class, Flux::empty),
        source -> (Flux<?>) source,
        Flux::from);
    registry.registerReactiveType(
        ReactiveTypeDescriptor.multiValue(Publisher.class, Flux::empty),
        source -> (Publisher<?>) source,
        source -> source);
    registry.registerReactiveType(
        ReactiveTypeDescriptor.singleOptionalValue(CompletableFuture.class, () -> {
          CompletableFuture<?> empty = new CompletableFuture<>();
          empty.complete(null);
          return empty;
        }),
        source -> Mono.fromFuture((CompletableFuture<?>) source),
        source -> Mono.from(source).toFuture()
    );
  }
}

代码示例来源:origin: lettuce-io/lettuce-core

/**
 * Load master slave nodes. Result contains an ordered list of {@link RedisNodeDescription}s. The sort key is the latency.
 * Nodes with lower latency come first.
 *
 * @param seed collection of {@link RedisURI}s
 * @return mapping between {@link RedisURI} and {@link Partitions}
 */
public Mono<List<RedisNodeDescription>> getNodes(RedisURI seed) {
  CompletableFuture<List<RedisNodeDescription>> future = topologyProvider.getNodesAsync();
  Mono<List<RedisNodeDescription>> initialNodes = Mono.fromFuture(future).doOnNext(nodes -> {
    addPasswordIfNeeded(nodes, seed);
  });
  return initialNodes
      .map(this::getConnections)
      .flatMap(asyncConnections -> asyncConnections.asMono(seed.getTimeout(), eventExecutors))
      .flatMap(
          connections -> {
            Requests requests = connections.requestPing();
            CompletionStage<List<RedisNodeDescription>> nodes = requests.getOrTimeout(seed.getTimeout(),
                eventExecutors);
            return Mono.fromCompletionStage(nodes).flatMap(it -> ResumeAfter.close(connections).thenEmit(it));
          });
}

代码示例来源:origin: lettuce-io/lettuce-core

@Override
public CompletableFuture<StatefulRedisMasterSlaveConnection<K, V>> connectAsync() {
  ConnectionFuture<StatefulRedisConnection<K, V>> initialConnection = redisClient.connectAsync(codec, redisURI);
  Mono<StatefulRedisMasterSlaveConnection<K, V>> connect = Mono
      .fromCompletionStage(initialConnection)
      .flatMap(
          nodeConnection -> {
            initialConnections.put(redisURI, nodeConnection);
            TopologyProvider topologyProvider = new MasterSlaveTopologyProvider(nodeConnection, redisURI);
            return Mono.fromCompletionStage(topologyProvider.getNodesAsync()).flatMap(
                nodes -> getMasterConnectionAndUri(nodes, Tuples.of(redisURI, nodeConnection), codec));
          }).flatMap(connectionAndUri -> {
        return initializeConnection(codec, connectionAndUri);
      });
  return connect.onErrorResume(t -> {
    Mono<Void> close = Mono.empty();
    for (StatefulRedisConnection<?, ?> connection : initialConnections.values()) {
      close = close.then(Mono.fromFuture(connection.closeAsync()));
    }
    return close.then(Mono.error(t));
  }).onErrorMap(ExecutionException.class, Throwable::getCause).toFuture();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void noRetentionOnTermination() throws InterruptedException {
  Date date = new Date();
  CompletableFuture<Date> future = new CompletableFuture<>();
  WeakReference<Date> refDate = new WeakReference<>(date);
  WeakReference<CompletableFuture<Date>> refFuture = new WeakReference<>(future);
  Mono<Date> source = Mono.fromFuture(future);
  Mono<String> data = source.map(Date::toString).log().cache().log();
  future.complete(date);
  assertThat(data.block()).isEqualTo(date.toString());
  date = null;
  future = null;
  source = null;
  System.gc();
  int cycles;
  for (cycles = 10; cycles > 0 ; cycles--) {
    if (refDate.get() == null && refFuture.get() == null) break;
    Thread.sleep(100);
  }
  assertThat(refFuture.get()).isNull();
  assertThat(refDate.get()).isNull();
  assertThat(cycles).isNotZero()
           .isPositive();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void noRetentionOnTerminationError() throws InterruptedException {
  CompletableFuture<Date> future = new CompletableFuture<>();
  WeakReference<CompletableFuture<Date>> refFuture = new WeakReference<>(future);
  Mono<Date> source = Mono.fromFuture(future);
  Mono<String> data = source.map(Date::toString).cache();
  future.completeExceptionally(new IllegalStateException());
  assertThatExceptionOfType(IllegalStateException.class)
      .isThrownBy(data::block);
  future = null;
  source = null;
  System.gc();
  int cycles;
  for (cycles = 10; cycles > 0 ; cycles--) {
    if (refFuture.get() == null) break;
    Thread.sleep(100);
  }
  assertThat(refFuture.get()).isNull();
  assertThat(cycles).isNotZero()
           .isPositive();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void noRetentionOnTerminationCancel() throws InterruptedException {
  CompletableFuture<Date> future = new CompletableFuture<>();
  WeakReference<CompletableFuture<Date>> refFuture = new WeakReference<>(future);
  Mono<Date> source = Mono.fromFuture(future);
  Mono<String> data = source.map(Date::toString).cache();
  future = null;
  source = null;
  data.subscribe().dispose();
  System.gc();
  int cycles;
  for (cycles = 10; cycles > 0 ; cycles--) {
    if (refFuture.get() == null) break;
    Thread.sleep(100);
  }
  assertThat(refFuture.get()).isNull();
  assertThat(cycles).isNotZero()
           .isPositive();
}

代码示例来源:origin: org.springframework/spring-core

void registerAdapters(ReactiveAdapterRegistry registry) {
    // Register Flux and Mono before Publisher...
    registry.registerReactiveType(
        ReactiveTypeDescriptor.singleOptionalValue(Mono.class, Mono::empty),
        source -> (Mono<?>) source,
        Mono::from
    );
    registry.registerReactiveType(
        ReactiveTypeDescriptor.multiValue(Flux.class, Flux::empty),
        source -> (Flux<?>) source,
        Flux::from);
    registry.registerReactiveType(
        ReactiveTypeDescriptor.multiValue(Publisher.class, Flux::empty),
        source -> (Publisher<?>) source,
        source -> source);
    registry.registerReactiveType(
        ReactiveTypeDescriptor.singleOptionalValue(CompletableFuture.class, () -> {
          CompletableFuture<?> empty = new CompletableFuture<>();
          empty.complete(null);
          return empty;
        }),
        source -> Mono.fromFuture((CompletableFuture<?>) source),
        source -> Mono.from(source).toFuture()
    );
  }
}

代码示例来源:origin: lettuce-io/lettuce-core

connections = connections.concatWith(Mono.fromFuture(getConnection(node)));

代码示例来源:origin: reactor/reactor-core

@Test
public void cancelThenFutureFails() {
  CompletableFuture<Integer> future = new CompletableFuture<>();
  AtomicReference<Subscription> subRef = new AtomicReference<>();
  Mono<Integer> mono = Mono
      .fromFuture(future)
      .doOnSubscribe(subRef::set);
  StepVerifier.create(mono)
        .expectSubscription()
        .then(() -> {
          subRef.get().cancel();
          future.completeExceptionally(new IllegalStateException("boom"));
          future.complete(1);
        })
        .thenCancel()//already cancelled but need to get to verification
        .verifyThenAssertThat()
        .hasDroppedErrorWithMessage("boom");
}

代码示例来源:origin: reactor/reactor-core

@Test
  public void stackOverflowGoesToOnErrorDropped() {
    CompletableFuture<Integer> future = new CompletableFuture<>();
    future.complete(1);
    Mono<Integer> simple = Mono.fromFuture(future);

    StepVerifier.create(
        simple.map(r -> {
          throw new StackOverflowError("boom, good bye Future");
        })
    )
          .expectSubscription()
          .expectNoEvent(Duration.ofMillis(1))
          .thenCancel()
          .verifyThenAssertThat()
          .hasDroppedErrorWithMessage("boom, good bye Future");
  }
}

代码示例来源:origin: reactor/reactor-core

@Test
public void cancelFutureImmediatelyCancelledLoop() {
  for (int i = 0; i < 10000; i++) {
    CompletableFuture<Integer> future = new CompletableFuture<>();
    Mono<Integer> mono = Mono
        .fromFuture(future)
        .doFinally(sig -> {
          if (sig == SignalType.CANCEL) future.cancel(false);
        });
    StepVerifier.create(mono)
          .expectSubscription()
          .thenCancel()
          .verifyThenAssertThat()
          .hasNotDroppedErrors();
    assertThat(future).isCancelled();
  }
}

代码示例来源:origin: reactor/reactor-core

@Test
public void cancelFutureTimeoutCancelledLoop() {
  for (int i = 0; i < 500; i++) {
    CompletableFuture<Integer> future = new CompletableFuture<>();
    Mono<Integer> mono = Mono
        .fromFuture(future)
        .doFinally(sig -> {
          if (sig == SignalType.CANCEL) future.cancel(false);
        });
    StepVerifier.create(mono.timeout(Duration.ofMillis(10)))
          .expectSubscription()
          .expectErrorSatisfies(e ->
              assertThat(e).hasMessageStartingWith("Did not observe any item or terminal signal within 10ms"))
          .verifyThenAssertThat()
          .hasNotDroppedErrors();
    assertThat(future).isCancelled();
  }
}

代码示例来源:origin: reactor/reactor-core

@Test
public void cancelFutureDelayedCancelledLoop() {
  for (int i = 0; i < 500; i++) {
    CompletableFuture<Integer> future = new CompletableFuture<>();
    Mono<Integer> mono = Mono
        .fromFuture(future)
        .doFinally(sig -> {
          if (sig == SignalType.CANCEL) future.cancel(false);
        });
    StepVerifier.create(mono)
          .expectSubscription()
          .thenAwait(Duration.ofMillis(10))
          .thenCancel()
          .verifyThenAssertThat()
          .hasNotDroppedErrors();
    assertThat(future).isCancelled();
  }
}

相关文章

Mono类方法