本文整理了Java中reactor.core.publisher.Mono.doAfterTerminate()
方法的一些代码示例,展示了Mono.doAfterTerminate()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Mono.doAfterTerminate()
方法的具体详情如下:
包路径:reactor.core.publisher.Mono
类名称:Mono
方法名:doAfterTerminate
[英]Add behavior (side-effect) triggered after the Mono terminates, either by completing downstream successfully or with an error.
[中]添加Mono终止后触发的行为(副作用),要么成功完成下游操作,要么出错。
代码示例来源:origin: reactor/reactor-core
@Test
public void normalEager() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
AtomicInteger cleanup = new AtomicInteger();
Mono.using(() -> 1, r -> Mono.just(1), cleanup::set)
.doAfterTerminate(() -> Assert.assertEquals(0, cleanup.get()))
.subscribe(ts);
ts.assertValues(1)
.assertComplete()
.assertNoError();
Assert.assertEquals(1, cleanup.get());
}
代码示例来源:origin: reactor/reactor-core
@Test
public void normal() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
AtomicInteger cleanup = new AtomicInteger();
Mono.using(() -> 1, r -> Mono.just(1), cleanup::set, false)
.doAfterTerminate(() -> Assert.assertEquals(0, cleanup.get()))
.subscribe(ts);
ts.assertValues(1)
.assertComplete()
.assertNoError();
Assert.assertEquals(1, cleanup.get());
}
代码示例来源:origin: reactor/reactor-core
@Test
public void afterTerminateForEmpty() {
LongAdder invoked = new LongAdder();
StepVerifier.create(Mono.<String>empty()
.doAfterTerminate(() -> {
invoked.increment();
}))
.expectComplete()
.verify();
assertEquals(1, invoked.intValue());
}
代码示例来源:origin: reactor/reactor-core
@Test
public void afterTerminateCallbackFailureInterruptsOnNextAndThrows() {
LongAdder invoked = new LongAdder();
try {
StepVerifier.create(Mono.just("foo")
.doAfterTerminate(() -> {
invoked.increment();
throw new IllegalArgumentException("boom");
}))
.expectNext("bar") //irrelevant
.expectErrorMessage("baz") //irrelevant
.verify();
}
catch (Throwable t) {
Throwable e = Exceptions.unwrap(t);
assertEquals(IllegalArgumentException.class, e.getClass());
assertEquals("boom", e.getMessage());
}
assertEquals(1, invoked.intValue());
}
代码示例来源:origin: reactor/reactor-core
@Test
public void afterTerminateForOnError() {
LongAdder invoked = new LongAdder();
IllegalArgumentException err = new IllegalArgumentException("boom");
StepVerifier.create(Mono.<String>error(err)
.doAfterTerminate(invoked::increment))
.expectErrorMessage("boom")
.verify();
assertEquals(1, invoked.intValue());
}
代码示例来源:origin: rsocket/rsocket-java
@Override
public Mono<Void> onClose() {
return delegate.onClose().doAfterTerminate(close::increment);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void onAfterTerminateFusionConditional() {
LongAdder invoked = new LongAdder();
Mono<Integer> mono = Flux
.range(1, 10)
.reduce((a, b) -> a + b)
.filter(v -> true)
.doAfterTerminate(invoked::increment);
StepVerifier.create(mono)
.expectFusion()
.expectNext(55)
.expectComplete()
.verify();
assertEquals(1, invoked.intValue());
}
代码示例来源:origin: reactor/reactor-core
@Test
public void onAfterTerminateFusion() {
LongAdder invoked = new LongAdder();
Mono<Integer> mono = Flux
.range(1, 10)
.reduce((a, b) -> a + b)
.doAfterTerminate(invoked::increment);
StepVerifier.create(mono.log())
.expectFusion()
.expectNext(55)
.expectComplete()
.verify();
assertEquals(1, invoked.intValue());
}
代码示例来源:origin: reactor/reactor-core
@Test
public void onAfterTerminateNormalConditional() {
LongAdder invoked = new LongAdder();
Mono<Integer> mono = Flux
.range(1, 10)
.reduce((a, b) -> a + b)
.hide()
.filter(v -> true)
.doAfterTerminate(invoked::increment);
StepVerifier.create(mono)
.expectFusion(Fuseable.ANY, Fuseable.NONE)
.expectNext(55)
.expectComplete()
.verify();
assertEquals(1, invoked.intValue());
}
代码示例来源:origin: mulesoft/mule
@Override
public Publisher<Void> onFailure(MessagingException exception, Map<String, Object> parameters) {
final CoreEvent event = exception.getEvent();
final boolean isBackPressureError = event.getError()
.map(e -> flowBackPressueErrorType.equals(e.getErrorType()))
.orElse(false);
SourceCallbackExecutor executor;
if (isBackPressureError) {
LOGGER.info("FLOW OVERLOAD - {}.", event.getError().get().getCause().getMessage());
executor = onBackPressureExecutor;
parameters = emptyMap();
context.addVariable(BACK_PRESSURE_ACTION_CONTEXT_PARAM, backPressureAction);
} else {
executor = onErrorExecutor;
}
return from(executor.execute(event, parameters, context)).doAfterTerminate(this::rollback);
}
代码示例来源:origin: mulesoft/mule
/**
* {@inheritDoc}
*/
@Override
public <T, A> CompletableFuture<Result<T, A>> executeAsync(String extension, String operation, OperationParameters parameters) {
OperationMessageProcessor processor = createProcessor(extension, operation, parameters);
return just(getEvent()).transform(processor)
.map(event -> Result.<T, A>builder(event.getMessage()).build())
.onErrorMap(Exceptions::unwrap)
.doAfterTerminate(() -> disposeProcessor(processor)).toFuture();
}
代码示例来源:origin: mulesoft/mule
@Override
public void onTerminate(Either<MessagingException, CoreEvent> result) throws Exception {
CoreEvent event = result.isRight() ? result.getRight() : result.getLeft().getEvent();
from(onTerminateExecutor.execute(event, emptyMap(), context))
.doAfterTerminate(() -> context.releaseConnection())
.subscribe();
}
代码示例来源:origin: mulesoft/mule
.doOnSuccess(v -> onReconnectionSuccessful())
.doOnError(this::onReconnectionFailed)
.doAfterTerminate(() -> reconnecting.set(false))
.subscribe();
代码示例来源:origin: com.aol.cyclops/cyclops-reactor
/**
* @param afterTerminate
* @return
* @see reactor.core.publisher.Mono#doAfterTerminate(java.util.function.BiConsumer)
*/
public final Mono<T> doAfterTerminate(BiConsumer<? super T, Throwable> afterTerminate) {
return boxed.doAfterTerminate(afterTerminate);
}
/**
代码示例来源:origin: org.mule.runtime/mule-module-extensions-support
@Override
public Publisher<Void> onFailure(MessagingException exception, Map<String, Object> parameters) {
final CoreEvent event = exception.getEvent();
final boolean isBackPressureError = event.getError()
.map(e -> flowBackPressueErrorType.equals(e.getErrorType()))
.orElse(false);
SourceCallbackExecutor executor;
if (isBackPressureError) {
LOGGER.info("FLOW OVERLOAD - {}.", event.getError().get().getCause().getMessage());
executor = onBackPressureExecutor;
parameters = emptyMap();
context.addVariable(BACK_PRESSURE_ACTION_CONTEXT_PARAM, backPressureAction);
} else {
executor = onErrorExecutor;
}
return from(executor.execute(event, parameters, context)).doAfterTerminate(this::rollback);
}
代码示例来源:origin: org.mule.runtime/mule-module-extensions-support
@Override
public void onTerminate(Either<MessagingException, CoreEvent> result) throws Exception {
CoreEvent event = result.isRight() ? result.getRight() : result.getLeft().getEvent();
from(onTerminateExecutor.execute(event, emptyMap(), context))
.doAfterTerminate(() -> context.releaseConnection())
.subscribe();
}
代码示例来源:origin: org.mule.runtime/mule-module-extensions-support
/**
* {@inheritDoc}
*/
@Override
public <T, A> CompletableFuture<Result<T, A>> executeAsync(String extension, String operation, OperationParameters parameters) {
OperationMessageProcessor processor = createProcessor(extension, operation, parameters);
Mono<Result<T, A>> resultMono = process(processor)
.map(event -> Result.<T, A>builder(event.getMessage()).build())
.onErrorMap(Exceptions::unwrap)
.doAfterTerminate(() -> disposeProcessor(processor));
return resultMono.toFuture();
}
代码示例来源:origin: spring-projects/sts4
})
.doOnNext(x -> log.info("Got {} completions", x.getItems().size()))
.doAfterTerminate(() -> log.info("Completion handling terminated!"))
代码示例来源:origin: org.mule.runtime/mule-module-extensions-support
.doOnSuccess(v -> onReconnectionSuccessful())
.doOnError(this::onReconnectionFailed)
.doAfterTerminate(() -> reconnecting.set(false))
.subscribe();
代码示例来源:origin: org.mule.runtime/mule-core
.doOnError(onFailure(flowConstruct, messageSource, phaseResultNotifier, terminateConsumer))
.doAfterTerminate(() -> responseCompletion.complete(null))
.subscribe();
} catch (Exception e) {
内容来源于网络,如有侵权,请联系作者删除!