本文整理了Java中reactor.core.publisher.Mono.repeatWhen()
方法的一些代码示例,展示了Mono.repeatWhen()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Mono.repeatWhen()
方法的具体详情如下:
包路径:reactor.core.publisher.Mono
类名称:Mono
方法名:repeatWhen
[英]Repeatedly subscribe to this Mono when a companion sequence emits elements in response to the flux completion signal. Any terminal signal from the companion sequence will terminate the resulting Flux with the same signal immediately.
If the companion sequence signals when this Mono is active, the repeat attempt is suppressed.
Note that if the companion Publisher created by the repeatFactoryemits Context as trigger objects, the content of these Context will be added to the operator's own Context.
[中]当伴随序列响应通量完成信号发射元素时,重复订阅此单声道。来自伴随序列的任何终端信号将立即用相同的信号终止产生的通量。
如果伴奏序列在该单声道激活时发出信号,则重复尝试被抑制。
请注意,如果RepeatFactoryEmit创建的伴随发布者将上下文作为触发器对象,则这些上下文的内容将添加到操作员自己的上下文中。
代码示例来源:origin: spring-projects/spring-framework
@Override
public ListenableFuture<Void> connect(TcpConnectionHandler<P> handler, ReconnectStrategy strategy) {
Assert.notNull(handler, "TcpConnectionHandler is required");
Assert.notNull(strategy, "ReconnectStrategy is required");
if (this.stopping) {
return handleShuttingDownConnectFailure(handler);
}
// Report first connect to the ListenableFuture
MonoProcessor<Void> connectMono = MonoProcessor.create();
this.tcpClient
.handle(new ReactorNettyHandler(handler))
.connect()
.doOnNext(updateConnectMono(connectMono))
.doOnError(updateConnectMono(connectMono))
.doOnError(handler::afterConnectFailure) // report all connect failures to the handler
.flatMap(Connection::onDispose) // post-connect issues
.retryWhen(reconnectFunction(strategy))
.repeatWhen(reconnectFunction(strategy))
.subscribe();
return new MonoToListenableFutureAdapter<>(connectMono);
}
代码示例来源:origin: reactor/reactor-core
return this.repeatWhen(o -> repeatFactory.apply(o
.zipWith(iterations, 1, (c, i) -> i)))
.next();
代码示例来源:origin: reactor/reactor-core
Flux<Integer> exponentialRepeatScenario1() {
AtomicInteger i = new AtomicInteger();
return Mono.fromCallable(i::incrementAndGet)
.repeatWhen(repeat -> repeat.zipWith(Flux.range(1, 3), (t1, t2) -> t2)
.flatMap(time -> Mono.delay(Duration.ofSeconds(
time))));
}
代码示例来源:origin: reactor/reactor-core
Flux<String> exponentialRepeatScenario2() {
AtomicInteger i = new AtomicInteger();
return Mono.<String>create(s -> {
if (i.incrementAndGet() == 4) {
s.success("hey");
}
else {
s.success();
}
}).repeatWhen(repeat -> repeat.zipWith(Flux.range(1, 3), (t1, t2) -> t2)
.flatMap(time -> Mono.delay(Duration.ofSeconds(time))));
}
代码示例来源:origin: spring-cloud/spring-cloud-gateway
public GatewayFilter apply(Repeat<ServerWebExchange> repeat, Retry<ServerWebExchange> retry) {
return (exchange, chain) -> {
trace("Entering retry-filter");
// chain.filter returns a Mono<Void>
Publisher<Void> publisher = chain.filter(exchange)
//.log("retry-filter", Level.INFO)
.doOnSuccessOrError((aVoid, throwable) -> {
int iteration = exchange.getAttributeOrDefault(RETRY_ITERATION_KEY, -1);
int newIteration = iteration + 1;
trace("setting new iteration in attr %d", newIteration);
exchange.getAttributes().put(RETRY_ITERATION_KEY, newIteration);
});
if (retry != null) {
// retryWhen returns a Mono<Void>
// retry needs to go before repeat
publisher = ((Mono<Void>)publisher).retryWhen(retry.withApplicationContext(exchange));
}
if (repeat != null) {
// repeatWhen returns a Flux<Void>
// so this needs to be last and the variable a Publisher<Void>
publisher = ((Mono<Void>)publisher).repeatWhen(repeat.withApplicationContext(exchange));
}
return Mono.fromDirect(publisher);
};
}
代码示例来源:origin: org.springframework/spring-messaging
@Override
public ListenableFuture<Void> connect(TcpConnectionHandler<P> handler, ReconnectStrategy strategy) {
Assert.notNull(handler, "TcpConnectionHandler is required");
Assert.notNull(strategy, "ReconnectStrategy is required");
if (this.stopping) {
return handleShuttingDownConnectFailure(handler);
}
// Report first connect to the ListenableFuture
MonoProcessor<Void> connectMono = MonoProcessor.create();
this.tcpClient
.handle(new ReactorNettyHandler(handler))
.connect()
.doOnNext(updateConnectMono(connectMono))
.doOnError(updateConnectMono(connectMono))
.doOnError(handler::afterConnectFailure) // report all connect failures to the handler
.flatMap(Connection::onDispose) // post-connect issues
.retryWhen(reconnectFunction(strategy))
.repeatWhen(reconnectFunction(strategy))
.subscribe();
return new MonoToListenableFutureAdapter<>(connectMono);
}
代码示例来源:origin: com.aol.cyclops/cyclops-reactor
/**
* @param whenFactory
* @return
* @see reactor.core.publisher.Mono#repeatWhen(java.util.function.Function)
*/
public final Flux<T> repeatWhen(Function<Flux<Long>, ? extends Publisher<?>> whenFactory) {
return boxed.repeatWhen(whenFactory);
}
/**
代码示例来源:origin: io.projectreactor/reactor-core
return this.repeatWhen(o -> repeatFactory.apply(o
.zipWith(iterations, 1, (c, i) -> i)))
.next();
代码示例来源:origin: apache/servicemix-bundles
@Override
public ListenableFuture<Void> connect(TcpConnectionHandler<P> handler, ReconnectStrategy strategy) {
Assert.notNull(handler, "TcpConnectionHandler is required");
Assert.notNull(strategy, "ReconnectStrategy is required");
if (this.stopping) {
return handleShuttingDownConnectFailure(handler);
}
// Report first connect to the ListenableFuture
MonoProcessor<Void> connectMono = MonoProcessor.create();
this.tcpClient
.handle(new ReactorNettyHandler(handler))
.connect()
.doOnNext(updateConnectMono(connectMono))
.doOnError(updateConnectMono(connectMono))
.doOnError(handler::afterConnectFailure) // report all connect failures to the handler
.flatMap(Connection::onDispose) // post-connect issues
.retryWhen(reconnectFunction(strategy))
.repeatWhen(reconnectFunction(strategy))
.subscribe();
return new MonoToListenableFutureAdapter<>(connectMono);
}
代码示例来源:origin: org.apache.servicemix.bundles/org.apache.servicemix.bundles.spring-messaging
@Override
public ListenableFuture<Void> connect(TcpConnectionHandler<P> handler, ReconnectStrategy strategy) {
Assert.notNull(handler, "TcpConnectionHandler is required");
Assert.notNull(strategy, "ReconnectStrategy is required");
if (this.stopping) {
return handleShuttingDownConnectFailure(handler);
}
// Report first connect to the ListenableFuture
MonoProcessor<Void> connectMono = MonoProcessor.create();
this.tcpClient
.handle(new ReactorNettyHandler(handler))
.connect()
.doOnNext(updateConnectMono(connectMono))
.doOnError(updateConnectMono(connectMono))
.doOnError(handler::afterConnectFailure) // report all connect failures to the handler
.flatMap(Connection::onDispose) // post-connect issues
.retryWhen(reconnectFunction(strategy))
.repeatWhen(reconnectFunction(strategy))
.subscribe();
return new MonoToListenableFutureAdapter<>(connectMono);
}
内容来源于网络,如有侵权,请联系作者删除!