本文整理了Java中reactor.core.publisher.Mono.fromFuture()
方法的一些代码示例,展示了Mono.fromFuture()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Mono.fromFuture()
方法的具体详情如下:
包路径:reactor.core.publisher.Mono
类名称:Mono
方法名:fromFuture
[英]Create a Mono, producing its value using the provided CompletableFuture.
Note that the future is not cancelled when that Mono is cancelled, but that behavior can be obtained by using a #doFinally(Consumer) that checks for a SignalType#CANCEL and calls CompletableFuture#cancel(boolean).
[中]创建一个Mono,利用提供的可完成的未来创造其价值。
请注意,取消Mono时不会取消future,但可以通过使用#doFinally(使用者)检查SignalType#CANCEL并调用CompletableFuture#CANCEL(布尔值)来获得该行为。
代码示例来源:origin: line/armeria
/**
* Closes the {@link HttpResponseWriter} if it is opened.
*/
private Mono<Void> cleanup(@Nullable Throwable cause) {
if (future.isDone()) {
return Mono.empty();
}
if (cause != null) {
future.completeExceptionally(cause);
logger.debug("{} Response future has been completed with a cause", ctx, cause);
return Mono.empty();
}
final HttpResponse response = HttpResponse.of(headers);
future.complete(response);
logger.debug("{} Response future has been completed with an HttpResponse", ctx);
return Mono.fromFuture(response.completionFuture());
}
代码示例来源:origin: line/armeria
private Supplier<Mono<Void>> execute(Supplier<HttpRequest> supplier) {
return () -> Mono.defer(() -> {
assert request == null : request;
request = supplier.get();
future.complete(client.execute(request));
return Mono.fromFuture(request.completionFuture());
});
}
代码示例来源:origin: lettuce-io/lettuce-core
@Override
public CompletableFuture<List<RedisNodeDescription>> getNodesAsync() {
logger.debug("lookup topology for masterId {}", masterId);
Mono<StatefulRedisSentinelConnection<String, String>> connect = Mono.fromFuture(redisClient.connectSentinelAsync(CODEC,
sentinelUri));
return connect.flatMap(this::getNodes).toFuture();
}
代码示例来源:origin: line/armeria
@Override
public Mono<ClientHttpResponse> connect(
HttpMethod method, URI uri, Function<? super ClientHttpRequest, Mono<Void>> requestCallback) {
try {
requireNonNull(method, "method");
requireNonNull(uri, "uri");
requireNonNull(requestCallback, "requestCallback");
final ArmeriaClientHttpRequest request = createRequest(method, uri);
return requestCallback.apply(request)
.then(Mono.fromFuture(request.future()))
.map(ArmeriaHttpClientResponseSubscriber::new)
.flatMap(s -> Mono.fromFuture(s.httpHeadersFuture())
.map(headers -> createResponse(headers, s)));
} catch (NullPointerException | IllegalArgumentException e) {
return Mono.error(e);
}
}
代码示例来源:origin: line/armeria
private Mono<Void> write(Flux<? extends DataBuffer> publisher) {
return Mono.defer(() -> {
final HttpResponse response = HttpResponse.of(
Flux.concat(Mono.just(headers), publisher.map(factoryWrapper::toHttpData))
// Publish the response stream on the event loop in order to avoid the possibility of
// calling subscription.request() from multiple threads while publishing messages
// with onNext signals or starting the subscription with onSubscribe signal.
.publishOn(Schedulers.fromExecutor(ctx.eventLoop())));
future.complete(response);
return Mono.fromFuture(response.completionFuture());
});
}
代码示例来源:origin: reactor/reactor-core
@Test
public void fromFutureSupplier() {
AtomicInteger source = new AtomicInteger();
Supplier<CompletableFuture<Integer>> supplier = () -> CompletableFuture.completedFuture(source.incrementAndGet());
Mono<Number> mono = Mono.fromFuture(supplier);
Assertions.assertThat(source).hasValue(0);
Assertions.assertThat(mono.block())
.isEqualTo(source.get())
.isEqualTo(1);
Assertions.assertThat(mono.block())
.isEqualTo(source.get())
.isEqualTo(2);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void fromCompletableFuture(){
CompletableFuture<String> f = CompletableFuture.supplyAsync(() -> "helloFuture");
assertThat(Mono.fromFuture(f)
.block()).isEqualToIgnoringCase("helloFuture");
}
代码示例来源:origin: spring-projects/spring-framework
void registerAdapters(ReactiveAdapterRegistry registry) {
// Register Flux and Mono before Publisher...
registry.registerReactiveType(
ReactiveTypeDescriptor.singleOptionalValue(Mono.class, Mono::empty),
source -> (Mono<?>) source,
Mono::from
);
registry.registerReactiveType(
ReactiveTypeDescriptor.multiValue(Flux.class, Flux::empty),
source -> (Flux<?>) source,
Flux::from);
registry.registerReactiveType(
ReactiveTypeDescriptor.multiValue(Publisher.class, Flux::empty),
source -> (Publisher<?>) source,
source -> source);
registry.registerReactiveType(
ReactiveTypeDescriptor.singleOptionalValue(CompletableFuture.class, () -> {
CompletableFuture<?> empty = new CompletableFuture<>();
empty.complete(null);
return empty;
}),
source -> Mono.fromFuture((CompletableFuture<?>) source),
source -> Mono.from(source).toFuture()
);
}
}
代码示例来源:origin: lettuce-io/lettuce-core
/**
* Load master slave nodes. Result contains an ordered list of {@link RedisNodeDescription}s. The sort key is the latency.
* Nodes with lower latency come first.
*
* @param seed collection of {@link RedisURI}s
* @return mapping between {@link RedisURI} and {@link Partitions}
*/
public Mono<List<RedisNodeDescription>> getNodes(RedisURI seed) {
CompletableFuture<List<RedisNodeDescription>> future = topologyProvider.getNodesAsync();
Mono<List<RedisNodeDescription>> initialNodes = Mono.fromFuture(future).doOnNext(nodes -> {
addPasswordIfNeeded(nodes, seed);
});
return initialNodes
.map(this::getConnections)
.flatMap(asyncConnections -> asyncConnections.asMono(seed.getTimeout(), eventExecutors))
.flatMap(
connections -> {
Requests requests = connections.requestPing();
CompletionStage<List<RedisNodeDescription>> nodes = requests.getOrTimeout(seed.getTimeout(),
eventExecutors);
return Mono.fromCompletionStage(nodes).flatMap(it -> ResumeAfter.close(connections).thenEmit(it));
});
}
代码示例来源:origin: lettuce-io/lettuce-core
@Override
public CompletableFuture<StatefulRedisMasterSlaveConnection<K, V>> connectAsync() {
ConnectionFuture<StatefulRedisConnection<K, V>> initialConnection = redisClient.connectAsync(codec, redisURI);
Mono<StatefulRedisMasterSlaveConnection<K, V>> connect = Mono
.fromCompletionStage(initialConnection)
.flatMap(
nodeConnection -> {
initialConnections.put(redisURI, nodeConnection);
TopologyProvider topologyProvider = new MasterSlaveTopologyProvider(nodeConnection, redisURI);
return Mono.fromCompletionStage(topologyProvider.getNodesAsync()).flatMap(
nodes -> getMasterConnectionAndUri(nodes, Tuples.of(redisURI, nodeConnection), codec));
}).flatMap(connectionAndUri -> {
return initializeConnection(codec, connectionAndUri);
});
return connect.onErrorResume(t -> {
Mono<Void> close = Mono.empty();
for (StatefulRedisConnection<?, ?> connection : initialConnections.values()) {
close = close.then(Mono.fromFuture(connection.closeAsync()));
}
return close.then(Mono.error(t));
}).onErrorMap(ExecutionException.class, Throwable::getCause).toFuture();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void noRetentionOnTermination() throws InterruptedException {
Date date = new Date();
CompletableFuture<Date> future = new CompletableFuture<>();
WeakReference<Date> refDate = new WeakReference<>(date);
WeakReference<CompletableFuture<Date>> refFuture = new WeakReference<>(future);
Mono<Date> source = Mono.fromFuture(future);
Mono<String> data = source.map(Date::toString).log().cache().log();
future.complete(date);
assertThat(data.block()).isEqualTo(date.toString());
date = null;
future = null;
source = null;
System.gc();
int cycles;
for (cycles = 10; cycles > 0 ; cycles--) {
if (refDate.get() == null && refFuture.get() == null) break;
Thread.sleep(100);
}
assertThat(refFuture.get()).isNull();
assertThat(refDate.get()).isNull();
assertThat(cycles).isNotZero()
.isPositive();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void noRetentionOnTerminationError() throws InterruptedException {
CompletableFuture<Date> future = new CompletableFuture<>();
WeakReference<CompletableFuture<Date>> refFuture = new WeakReference<>(future);
Mono<Date> source = Mono.fromFuture(future);
Mono<String> data = source.map(Date::toString).cache();
future.completeExceptionally(new IllegalStateException());
assertThatExceptionOfType(IllegalStateException.class)
.isThrownBy(data::block);
future = null;
source = null;
System.gc();
int cycles;
for (cycles = 10; cycles > 0 ; cycles--) {
if (refFuture.get() == null) break;
Thread.sleep(100);
}
assertThat(refFuture.get()).isNull();
assertThat(cycles).isNotZero()
.isPositive();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void noRetentionOnTerminationCancel() throws InterruptedException {
CompletableFuture<Date> future = new CompletableFuture<>();
WeakReference<CompletableFuture<Date>> refFuture = new WeakReference<>(future);
Mono<Date> source = Mono.fromFuture(future);
Mono<String> data = source.map(Date::toString).cache();
future = null;
source = null;
data.subscribe().dispose();
System.gc();
int cycles;
for (cycles = 10; cycles > 0 ; cycles--) {
if (refFuture.get() == null) break;
Thread.sleep(100);
}
assertThat(refFuture.get()).isNull();
assertThat(cycles).isNotZero()
.isPositive();
}
代码示例来源:origin: org.springframework/spring-core
void registerAdapters(ReactiveAdapterRegistry registry) {
// Register Flux and Mono before Publisher...
registry.registerReactiveType(
ReactiveTypeDescriptor.singleOptionalValue(Mono.class, Mono::empty),
source -> (Mono<?>) source,
Mono::from
);
registry.registerReactiveType(
ReactiveTypeDescriptor.multiValue(Flux.class, Flux::empty),
source -> (Flux<?>) source,
Flux::from);
registry.registerReactiveType(
ReactiveTypeDescriptor.multiValue(Publisher.class, Flux::empty),
source -> (Publisher<?>) source,
source -> source);
registry.registerReactiveType(
ReactiveTypeDescriptor.singleOptionalValue(CompletableFuture.class, () -> {
CompletableFuture<?> empty = new CompletableFuture<>();
empty.complete(null);
return empty;
}),
source -> Mono.fromFuture((CompletableFuture<?>) source),
source -> Mono.from(source).toFuture()
);
}
}
代码示例来源:origin: lettuce-io/lettuce-core
connections = connections.concatWith(Mono.fromFuture(getConnection(node)));
代码示例来源:origin: reactor/reactor-core
@Test
public void cancelThenFutureFails() {
CompletableFuture<Integer> future = new CompletableFuture<>();
AtomicReference<Subscription> subRef = new AtomicReference<>();
Mono<Integer> mono = Mono
.fromFuture(future)
.doOnSubscribe(subRef::set);
StepVerifier.create(mono)
.expectSubscription()
.then(() -> {
subRef.get().cancel();
future.completeExceptionally(new IllegalStateException("boom"));
future.complete(1);
})
.thenCancel()//already cancelled but need to get to verification
.verifyThenAssertThat()
.hasDroppedErrorWithMessage("boom");
}
代码示例来源:origin: reactor/reactor-core
@Test
public void stackOverflowGoesToOnErrorDropped() {
CompletableFuture<Integer> future = new CompletableFuture<>();
future.complete(1);
Mono<Integer> simple = Mono.fromFuture(future);
StepVerifier.create(
simple.map(r -> {
throw new StackOverflowError("boom, good bye Future");
})
)
.expectSubscription()
.expectNoEvent(Duration.ofMillis(1))
.thenCancel()
.verifyThenAssertThat()
.hasDroppedErrorWithMessage("boom, good bye Future");
}
}
代码示例来源:origin: reactor/reactor-core
@Test
public void cancelFutureImmediatelyCancelledLoop() {
for (int i = 0; i < 10000; i++) {
CompletableFuture<Integer> future = new CompletableFuture<>();
Mono<Integer> mono = Mono
.fromFuture(future)
.doFinally(sig -> {
if (sig == SignalType.CANCEL) future.cancel(false);
});
StepVerifier.create(mono)
.expectSubscription()
.thenCancel()
.verifyThenAssertThat()
.hasNotDroppedErrors();
assertThat(future).isCancelled();
}
}
代码示例来源:origin: reactor/reactor-core
@Test
public void cancelFutureTimeoutCancelledLoop() {
for (int i = 0; i < 500; i++) {
CompletableFuture<Integer> future = new CompletableFuture<>();
Mono<Integer> mono = Mono
.fromFuture(future)
.doFinally(sig -> {
if (sig == SignalType.CANCEL) future.cancel(false);
});
StepVerifier.create(mono.timeout(Duration.ofMillis(10)))
.expectSubscription()
.expectErrorSatisfies(e ->
assertThat(e).hasMessageStartingWith("Did not observe any item or terminal signal within 10ms"))
.verifyThenAssertThat()
.hasNotDroppedErrors();
assertThat(future).isCancelled();
}
}
代码示例来源:origin: reactor/reactor-core
@Test
public void cancelFutureDelayedCancelledLoop() {
for (int i = 0; i < 500; i++) {
CompletableFuture<Integer> future = new CompletableFuture<>();
Mono<Integer> mono = Mono
.fromFuture(future)
.doFinally(sig -> {
if (sig == SignalType.CANCEL) future.cancel(false);
});
StepVerifier.create(mono)
.expectSubscription()
.thenAwait(Duration.ofMillis(10))
.thenCancel()
.verifyThenAssertThat()
.hasNotDroppedErrors();
assertThat(future).isCancelled();
}
}
内容来源于网络,如有侵权,请联系作者删除!