reactor.core.publisher.Mono.doAfterTerminate()方法的使用及代码示例

x33g5p2x  于2022-01-24 转载在 其他  
字(7.8k)|赞(0)|评价(0)|浏览(295)

本文整理了Java中reactor.core.publisher.Mono.doAfterTerminate()方法的一些代码示例,展示了Mono.doAfterTerminate()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Mono.doAfterTerminate()方法的具体详情如下:
包路径:reactor.core.publisher.Mono
类名称:Mono
方法名:doAfterTerminate

Mono.doAfterTerminate介绍

[英]Add behavior (side-effect) triggered after the Mono terminates, either by completing downstream successfully or with an error.
[中]添加Mono终止后触发的行为(副作用),要么成功完成下游操作,要么出错。

代码示例

代码示例来源:origin: reactor/reactor-core

@Test
public void normalEager() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  AtomicInteger cleanup = new AtomicInteger();
  Mono.using(() -> 1, r -> Mono.just(1), cleanup::set)
    .doAfterTerminate(() ->  Assert.assertEquals(0, cleanup.get()))
    .subscribe(ts);
  ts.assertValues(1)
   .assertComplete()
   .assertNoError();
  Assert.assertEquals(1, cleanup.get());
}

代码示例来源:origin: reactor/reactor-core

@Test
public void normal() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  AtomicInteger cleanup = new AtomicInteger();
  Mono.using(() -> 1, r -> Mono.just(1), cleanup::set, false)
    .doAfterTerminate(() ->  Assert.assertEquals(0, cleanup.get()))
    .subscribe(ts);
  ts.assertValues(1)
   .assertComplete()
   .assertNoError();
  Assert.assertEquals(1, cleanup.get());
}

代码示例来源:origin: reactor/reactor-core

@Test
public void afterTerminateForEmpty() {
  LongAdder invoked = new LongAdder();
  StepVerifier.create(Mono.<String>empty()
      .doAfterTerminate(() -> {
        invoked.increment();
      }))
        .expectComplete()
        .verify();
  assertEquals(1, invoked.intValue());
}

代码示例来源:origin: reactor/reactor-core

@Test
public void afterTerminateCallbackFailureInterruptsOnNextAndThrows() {
  LongAdder invoked = new LongAdder();
  try {
    StepVerifier.create(Mono.just("foo")
                .doAfterTerminate(() -> {
                  invoked.increment();
                  throw new IllegalArgumentException("boom");
                }))
          .expectNext("bar") //irrelevant
          .expectErrorMessage("baz") //irrelevant
          .verify();
  }
  catch (Throwable t) {
    Throwable e = Exceptions.unwrap(t);
    assertEquals(IllegalArgumentException.class, e.getClass());
    assertEquals("boom", e.getMessage());
  }
  assertEquals(1, invoked.intValue());
}

代码示例来源:origin: reactor/reactor-core

@Test
public void afterTerminateForOnError() {
  LongAdder invoked = new LongAdder();
  IllegalArgumentException err = new IllegalArgumentException("boom");
  StepVerifier.create(Mono.<String>error(err)
      .doAfterTerminate(invoked::increment))
        .expectErrorMessage("boom")
        .verify();
  assertEquals(1, invoked.intValue());
}

代码示例来源:origin: rsocket/rsocket-java

@Override
public Mono<Void> onClose() {
 return delegate.onClose().doAfterTerminate(close::increment);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void onAfterTerminateFusionConditional() {
  LongAdder invoked = new LongAdder();
  Mono<Integer> mono = Flux
      .range(1, 10)
      .reduce((a, b) -> a + b)
      .filter(v -> true)
      .doAfterTerminate(invoked::increment);
  StepVerifier.create(mono)
        .expectFusion()
        .expectNext(55)
        .expectComplete()
        .verify();
  assertEquals(1, invoked.intValue());
}

代码示例来源:origin: reactor/reactor-core

@Test
public void onAfterTerminateFusion() {
  LongAdder invoked = new LongAdder();
  Mono<Integer> mono = Flux
      .range(1, 10)
      .reduce((a, b) -> a + b)
      .doAfterTerminate(invoked::increment);
  StepVerifier.create(mono.log())
        .expectFusion()
        .expectNext(55)
        .expectComplete()
        .verify();
  assertEquals(1, invoked.intValue());
}

代码示例来源:origin: reactor/reactor-core

@Test
public void onAfterTerminateNormalConditional() {
  LongAdder invoked = new LongAdder();
  Mono<Integer> mono = Flux
      .range(1, 10)
      .reduce((a, b) -> a + b)
      .hide()
      .filter(v -> true)
      .doAfterTerminate(invoked::increment);
  StepVerifier.create(mono)
        .expectFusion(Fuseable.ANY, Fuseable.NONE)
        .expectNext(55)
        .expectComplete()
        .verify();
  assertEquals(1, invoked.intValue());
}

代码示例来源:origin: mulesoft/mule

@Override
public Publisher<Void> onFailure(MessagingException exception, Map<String, Object> parameters) {
 final CoreEvent event = exception.getEvent();
 final boolean isBackPressureError = event.getError()
   .map(e -> flowBackPressueErrorType.equals(e.getErrorType()))
   .orElse(false);
 SourceCallbackExecutor executor;
 if (isBackPressureError) {
  LOGGER.info("FLOW OVERLOAD - {}.", event.getError().get().getCause().getMessage());
  executor = onBackPressureExecutor;
  parameters = emptyMap();
  context.addVariable(BACK_PRESSURE_ACTION_CONTEXT_PARAM, backPressureAction);
 } else {
  executor = onErrorExecutor;
 }
 return from(executor.execute(event, parameters, context)).doAfterTerminate(this::rollback);
}

代码示例来源:origin: mulesoft/mule

/**
 * {@inheritDoc}
 */
@Override
public <T, A> CompletableFuture<Result<T, A>> executeAsync(String extension, String operation, OperationParameters parameters) {
 OperationMessageProcessor processor = createProcessor(extension, operation, parameters);
 return just(getEvent()).transform(processor)
   .map(event -> Result.<T, A>builder(event.getMessage()).build())
   .onErrorMap(Exceptions::unwrap)
   .doAfterTerminate(() -> disposeProcessor(processor)).toFuture();
}

代码示例来源:origin: mulesoft/mule

@Override
public void onTerminate(Either<MessagingException, CoreEvent> result) throws Exception {
 CoreEvent event = result.isRight() ? result.getRight() : result.getLeft().getEvent();
 from(onTerminateExecutor.execute(event, emptyMap(), context))
   .doAfterTerminate(() -> context.releaseConnection())
   .subscribe();
}

代码示例来源:origin: mulesoft/mule

.doOnSuccess(v -> onReconnectionSuccessful())
.doOnError(this::onReconnectionFailed)
.doAfterTerminate(() -> reconnecting.set(false))
.subscribe();

代码示例来源:origin: com.aol.cyclops/cyclops-reactor

/**
 * @param afterTerminate
 * @return
 * @see reactor.core.publisher.Mono#doAfterTerminate(java.util.function.BiConsumer)
 */
public final Mono<T> doAfterTerminate(BiConsumer<? super T, Throwable> afterTerminate) {
  return boxed.doAfterTerminate(afterTerminate);
}
/**

代码示例来源:origin: org.mule.runtime/mule-module-extensions-support

@Override
public Publisher<Void> onFailure(MessagingException exception, Map<String, Object> parameters) {
 final CoreEvent event = exception.getEvent();
 final boolean isBackPressureError = event.getError()
   .map(e -> flowBackPressueErrorType.equals(e.getErrorType()))
   .orElse(false);
 SourceCallbackExecutor executor;
 if (isBackPressureError) {
  LOGGER.info("FLOW OVERLOAD - {}.", event.getError().get().getCause().getMessage());
  executor = onBackPressureExecutor;
  parameters = emptyMap();
  context.addVariable(BACK_PRESSURE_ACTION_CONTEXT_PARAM, backPressureAction);
 } else {
  executor = onErrorExecutor;
 }
 return from(executor.execute(event, parameters, context)).doAfterTerminate(this::rollback);
}

代码示例来源:origin: org.mule.runtime/mule-module-extensions-support

@Override
public void onTerminate(Either<MessagingException, CoreEvent> result) throws Exception {
 CoreEvent event = result.isRight() ? result.getRight() : result.getLeft().getEvent();
 from(onTerminateExecutor.execute(event, emptyMap(), context))
   .doAfterTerminate(() -> context.releaseConnection())
   .subscribe();
}

代码示例来源:origin: org.mule.runtime/mule-module-extensions-support

/**
 * {@inheritDoc}
 */
@Override
public <T, A> CompletableFuture<Result<T, A>> executeAsync(String extension, String operation, OperationParameters parameters) {
 OperationMessageProcessor processor = createProcessor(extension, operation, parameters);
 Mono<Result<T, A>> resultMono = process(processor)
   .map(event -> Result.<T, A>builder(event.getMessage()).build())
   .onErrorMap(Exceptions::unwrap)
   .doAfterTerminate(() -> disposeProcessor(processor));
 return resultMono.toFuture();
}

代码示例来源:origin: spring-projects/sts4

})
.doOnNext(x -> log.info("Got {} completions", x.getItems().size()))
.doAfterTerminate(() -> log.info("Completion handling terminated!"))

代码示例来源:origin: org.mule.runtime/mule-module-extensions-support

.doOnSuccess(v -> onReconnectionSuccessful())
.doOnError(this::onReconnectionFailed)
.doAfterTerminate(() -> reconnecting.set(false))
.subscribe();

代码示例来源:origin: org.mule.runtime/mule-core

.doOnError(onFailure(flowConstruct, messageSource, phaseResultNotifier, terminateConsumer))
   .doAfterTerminate(() -> responseCompletion.complete(null))
   .subscribe();
} catch (Exception e) {

相关文章

Mono类方法