本文整理了Java中reactor.core.publisher.Mono.doOnTerminate()
方法的一些代码示例,展示了Mono.doOnTerminate()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Mono.doOnTerminate()
方法的具体详情如下:
包路径:reactor.core.publisher.Mono
类名称:Mono
方法名:doOnTerminate
[英]Add behavior triggered when the Mono terminates, either by completing successfully or with an error.
[中]添加Mono终止时触发的行为(通过成功完成或出现错误)。
代码示例来源:origin: reactor/reactor-core
@Test
public void onMonoSuccessDoOnTerminate() {
Mono<String> mp = Mono.just("test");
AtomicInteger invoked = new AtomicInteger();
mp.doOnTerminate(invoked::incrementAndGet)
.subscribe();
assertThat(invoked.get()).isEqualTo(1);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void onMonoRejectedDoOnTerminate() {
Mono<String> mp = Mono.error(new Exception("test"));
AtomicInteger invoked = new AtomicInteger();
mp.doOnTerminate(invoked::incrementAndGet)
.subscribe();
assertThat(invoked.get()).isEqualTo(1);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void smokeTestDelay() {
for (int i = 0; i < 20; i++) {
Scheduler s = Schedulers.newElastic("test");
AtomicLong start = new AtomicLong();
AtomicLong end = new AtomicLong();
try {
StepVerifier.create(Mono
.delay(Duration.ofMillis(100), s)
.doOnSubscribe(sub -> start.set(System.nanoTime()))
.doOnTerminate(() -> end.set(System.nanoTime()))
)
.expectSubscription()
.expectNext(0L)
.verifyComplete();
long endValue = end.longValue();
long startValue = start.longValue();
long measuredDelay = endValue - startValue;
long measuredDelayMs = TimeUnit.NANOSECONDS.toMillis(measuredDelay);
assertThat(measuredDelayMs)
.as("iteration %s, measured delay %s nanos, start at %s nanos, end at %s nanos", i, measuredDelay, startValue, endValue)
.isGreaterThanOrEqualTo(100L)
.isLessThan(200L);
}
finally {
s.dispose();
}
}
}
代码示例来源:origin: reactor/reactor-core
@Test
public void smokeTestDelay() {
for (int i = 0; i < 20; i++) {
Scheduler s = Schedulers.newParallel("test");
AtomicLong start = new AtomicLong();
AtomicLong end = new AtomicLong();
try {
StepVerifier.create(Mono
.delay(Duration.ofMillis(100), s)
.doOnSubscribe(sub -> start.set(System.nanoTime()))
.doOnTerminate(() -> end.set(System.nanoTime()))
)
.expectSubscription()
.expectNext(0L)
.verifyComplete();
long endValue = end.longValue();
long startValue = start.longValue();
long measuredDelay = endValue - startValue;
long measuredDelayMs = TimeUnit.NANOSECONDS.toMillis(measuredDelay);
assertThat(measuredDelayMs)
.as("iteration %s, measured delay %s nanos, start at %s nanos, end at %s nanos", i, measuredDelay, startValue, endValue)
.isGreaterThanOrEqualTo(100L)
.isLessThan(200L);
}
finally {
s.dispose();
}
}
}
代码示例来源:origin: reactor/reactor-core
@Test
public void smokeTestDelay() {
for (int i = 0; i < 20; i++) {
Scheduler s = Schedulers.fromExecutorService(Executors.newScheduledThreadPool(1));
AtomicLong start = new AtomicLong();
AtomicLong end = new AtomicLong();
try {
StepVerifier.create(Mono
.delay(Duration.ofMillis(100), s)
.log()
.doOnSubscribe(sub -> start.set(System.nanoTime()))
.doOnTerminate(() -> end.set(System.nanoTime()))
)
.expectSubscription()
.expectNext(0L)
.verifyComplete();
long endValue = end.longValue();
long startValue = start.longValue();
long measuredDelay = endValue - startValue;
long measuredDelayMs = TimeUnit.NANOSECONDS.toMillis(measuredDelay);
assertThat(measuredDelayMs)
.as("iteration %s, measured delay %s nanos, start at %s nanos, end at %s nanos", i, measuredDelay, startValue, endValue)
.isGreaterThanOrEqualTo(100L)
.isLessThan(200L);
}
finally {
s.dispose();
}
}
}
代码示例来源:origin: reactor/reactor-core
@Test
public void smokeTestDelay() {
for (int i = 0; i < 20; i++) {
Scheduler s = Schedulers.newSingle("test");
AtomicLong start = new AtomicLong();
AtomicLong end = new AtomicLong();
try {
StepVerifier.create(Mono
.delay(Duration.ofMillis(100), s)
.log()
.doOnSubscribe(sub -> start.set(System.nanoTime()))
.doOnTerminate(() -> end.set(System.nanoTime()))
)
.expectSubscription()
.expectNext(0L)
.verifyComplete();
long endValue = end.longValue();
long startValue = start.longValue();
long measuredDelay = endValue - startValue;
long measuredDelayMs = TimeUnit.NANOSECONDS.toMillis(measuredDelay);
assertThat(measuredDelayMs)
.as("iteration %s, measured delay %s nanos, start at %s nanos, end at %s nanos", i, measuredDelay, startValue, endValue)
.isGreaterThanOrEqualTo(100L)
.isLessThan(200L);
}
finally {
s.dispose();
}
}
}
代码示例来源:origin: reactor/reactor-core
@Test
public void monoFromFluxThatIsItselfFromMono() {
AtomicBoolean emitted = new AtomicBoolean();
AtomicBoolean terminated = new AtomicBoolean();
AtomicBoolean cancelled = new AtomicBoolean();
AtomicBoolean succeeded = new AtomicBoolean();
Mono<String> withCallback = Mono.just("foo")
.doOnNext(v -> emitted.set(true));
Mono<String> original = withCallback
.doOnCancel(() -> cancelled.set(true))
.doOnSuccess(v -> succeeded.set(true))
.doOnTerminate(() -> terminated.set(true))
.hide();
assertThat(withCallback).as("withCallback is not Callable")
.isNotInstanceOf(Fuseable.ScalarCallable.class)
.isNotInstanceOf(Callable.class);
assertThat(original).as("original is not callable Mono")
.isNotInstanceOf(Fuseable.class)
.isNotInstanceOf(Fuseable.ScalarCallable.class)
.isNotInstanceOf(Callable.class);
Flux<String> firstConversion = Flux.from(original);
Mono<String> secondConversion = Mono.from(firstConversion);
assertThat(secondConversion.block()).isEqualTo("foo");
assertThat(emitted).as("emitted").isTrue();
assertThat(succeeded).as("succeeded").isTrue();
assertThat(cancelled).as("cancelled").isFalse();
assertThat(terminated).as("terminated").isTrue();
assertThat(secondConversion).as("conversions negated").isSameAs(original);
}
代码示例来源:origin: reactor/reactor-core
.log("resultStream")
.collectList()
.doOnTerminate(doneSemaphore::release)
.toProcessor();
listPromise.subscribe();
代码示例来源:origin: spring-projects/spring-integration
private Mono<Void> doHandle(ServerWebExchange exchange) {
return extractRequestBody(exchange)
.doOnSubscribe(s -> this.activeCount.incrementAndGet())
.cast(Object.class)
.switchIfEmpty(Mono.just(exchange.getRequest().getQueryParams()))
.map(body ->
new RequestEntity<>(body, exchange.getRequest().getHeaders(),
exchange.getRequest().getMethod(), exchange.getRequest().getURI()))
.flatMap(entity -> buildMessage(entity, exchange))
.flatMap(requestTuple -> {
if (this.expectReply) {
return sendAndReceiveMessageReactive(requestTuple.getT1())
.flatMap(replyMessage -> populateResponse(exchange, replyMessage));
}
else {
send(requestTuple.getT1());
return setStatusCode(exchange, requestTuple.getT2());
}
})
.doOnTerminate(this.activeCount::decrementAndGet);
}
代码示例来源:origin: com.aol.cyclops/cyclops-reactor
/**
* @param onTerminate
* @return
* @see reactor.core.publisher.Mono#doOnTerminate(java.util.function.BiConsumer)
*/
public final Mono<T> doOnTerminate(BiConsumer<? super T, Throwable> onTerminate) {
return boxed.doOnTerminate(onTerminate);
}
/**
代码示例来源:origin: rsocket/rsocket-java
@Test
void requesterStreamsTerminatedOnZeroErrorFrame() {
TestDuplexConnection conn = new TestDuplexConnection();
List<Throwable> errors = new ArrayList<>();
RSocketClient rSocket =
new RSocketClient(
conn, DefaultPayload::create, errors::add, StreamIdSupplier.clientSupplier());
String errorMsg = "error";
Mono.delay(Duration.ofMillis(100))
.doOnTerminate(
() ->
conn.addToReceivedBuffer(Frame.Error.from(0, new RejectedSetupException(errorMsg))))
.subscribe();
StepVerifier.create(rSocket.requestResponse(DefaultPayload.create("test")))
.expectErrorMatches(
err -> err instanceof RejectedSetupException && errorMsg.equals(err.getMessage()))
.verify(Duration.ofSeconds(5));
assertThat(errors).hasSize(1);
assertThat(rSocket.isDisposed()).isTrue();
}
代码示例来源:origin: scalecube/scalecube-services
@Override
public Mono<Void> close() {
return connectionProvider
.disposeLater()
.doOnTerminate(() -> LOGGER.info("Closed http-client-sdk transport"));
}
代码示例来源:origin: scalecube/scalecube-services
@Override
public Mono<Void> close() {
return Mono.defer(
() -> {
// noinspection unchecked
Mono<RSocket> curr = rSocketMonoUpdater.get(this);
return (curr == null ? Mono.<Void>empty() : curr.flatMap(this::dispose))
.doOnTerminate(() -> LOGGER.info("Closed rsocket client sdk transport"));
});
}
代码示例来源:origin: scalecube/scalecube-services
@Override
public Mono<Void> close() {
return Mono.defer(
() -> {
// noinspection unchecked
Mono<WebsocketSession> curr = websocketMonoUpdater.get(this);
return (curr == null ? Mono.<Void>empty() : curr.flatMap(WebsocketSession::close))
.doOnTerminate(() -> LOGGER.info("Closed websocket client sdk transport"));
});
}
代码示例来源:origin: scalecube/scalecube-services
@Override
public Mono<RSocket> accept(ConnectionSetupPayload setup, RSocket rsocket) {
LOGGER.info("Accepted rsocket websocket: {}, connectionSetup: {}", rsocket, setup);
rsocket
.onClose()
.doOnTerminate(() -> LOGGER.info("Client disconnected: {}", rsocket))
.subscribe(null, th -> LOGGER.error("Exception on closing rsocket: {}", th));
// Prepare message codec together with headers from metainfo
HeadersCodec headersCodec = HeadersCodec.getInstance(setup.metadataMimeType());
ServiceMessageCodec messageCodec = new ServiceMessageCodec(headersCodec);
return Mono.just(new GatewayRSocket(serviceCall, metrics, messageCodec));
}
代码示例来源:origin: scalecube/scalecube-services
private Mono<Void> closeConnections() {
return Mono.defer(
() ->
Mono.when(
connections
.stream()
.map(
connection -> {
connection.dispose();
return connection
.onTerminate()
.doOnError(e -> LOGGER.warn("Failed to close connection: " + e))
.onErrorResume(e -> Mono.empty());
})
.collect(Collectors.toList()))
.doOnTerminate(connections::clear));
}
代码示例来源:origin: io.scalecube/scalecube-services-transport-rsocket
private Mono<Void> closeConnections() {
return Mono.defer(
() ->
Mono.when(
connections
.stream()
.map(
connection -> {
connection.dispose();
return connection
.onTerminate()
.doOnError(e -> LOGGER.warn("Failed to close connection: " + e))
.onErrorResume(e -> Mono.empty());
})
.collect(Collectors.toList()))
.doOnTerminate(connections::clear));
}
代码示例来源:origin: io.scalecube/scalecube-gateway-rsocket-websocket
@Override
public Mono<RSocket> accept(ConnectionSetupPayload setup, RSocket rsocket) {
LOGGER.info("Accepted rsocket websocket: {}, connectionSetup: {}", rsocket, setup);
rsocket
.onClose()
.doOnTerminate(() -> LOGGER.info("Client disconnected: {}", rsocket))
.subscribe();
// Prepare message codec together with headers from metainfo
HeadersCodec headersCodec = HeadersCodec.getInstance(setup.metadataMimeType());
ServiceMessageCodec messageCodec = new ServiceMessageCodec(headersCodec);
return Mono.just(new GatewayRSocket(serviceCall, metrics, messageCodec));
}
代码示例来源:origin: io.projectreactor.ipc/reactor-netty
protected void shutdownFromJVM() {
if (context.isDisposed()) {
return;
}
final String hookDesc = Thread.currentThread().toString();
context.dispose();
context.onClose()
.doOnError(e -> LOG.error("Stopped {} on {} with an error {} from JVM hook {}",
description, context.address(), e, hookDesc))
.doOnTerminate(() -> LOG.info("Stopped {} on {} from JVM hook {}",
description, context.address(), hookDesc))
.timeout(lifecycleTimeout, Mono.error(new TimeoutException(description +
" couldn't be stopped within " + lifecycleTimeout.toMillis() + "ms")))
.block();
}
}
代码示例来源:origin: io.projectreactor.ipc/reactor-netty
/**
* Shut down the {@link NettyContext} and wait for its termination, up to the
* {@link #setLifecycleTimeout(Duration) lifecycle timeout}.
*/
public void shutdown() {
if (context.isDisposed()) {
return;
}
removeShutdownHook(); //only applies if not called from the hook's thread
context.dispose();
context.onClose()
.doOnError(e -> LOG.error("Stopped {} on {} with an error {}", description, context.address(), e))
.doOnTerminate(() -> LOG.info("Stopped {} on {}", description, context.address()))
.timeout(lifecycleTimeout, Mono.error(new TimeoutException(description + " couldn't be stopped within " + lifecycleTimeout.toMillis() + "ms")))
.block();
}
内容来源于网络,如有侵权,请联系作者删除!