reactor.core.publisher.Mono.doOnTerminate()方法的使用及代码示例

x33g5p2x  于2022-01-24 转载在 其他  
字(11.7k)|赞(0)|评价(0)|浏览(519)

本文整理了Java中reactor.core.publisher.Mono.doOnTerminate()方法的一些代码示例,展示了Mono.doOnTerminate()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Mono.doOnTerminate()方法的具体详情如下:
包路径:reactor.core.publisher.Mono
类名称:Mono
方法名:doOnTerminate

Mono.doOnTerminate介绍

[英]Add behavior triggered when the Mono terminates, either by completing successfully or with an error.
[中]添加Mono终止时触发的行为(通过成功完成或出现错误)。

代码示例

代码示例来源:origin: reactor/reactor-core

@Test
public void onMonoSuccessDoOnTerminate() {
  Mono<String> mp = Mono.just("test");
  AtomicInteger invoked = new AtomicInteger();
  mp.doOnTerminate(invoked::incrementAndGet)
   .subscribe();
  assertThat(invoked.get()).isEqualTo(1);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void onMonoRejectedDoOnTerminate() {
  Mono<String> mp = Mono.error(new Exception("test"));
  AtomicInteger invoked = new AtomicInteger();
  mp.doOnTerminate(invoked::incrementAndGet)
   .subscribe();
  assertThat(invoked.get()).isEqualTo(1);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void smokeTestDelay() {
  for (int i = 0; i < 20; i++) {
    Scheduler s = Schedulers.newElastic("test");
    AtomicLong start = new AtomicLong();
    AtomicLong end = new AtomicLong();
    try {
      StepVerifier.create(Mono
          .delay(Duration.ofMillis(100), s)
          .doOnSubscribe(sub -> start.set(System.nanoTime()))
          .doOnTerminate(() -> end.set(System.nanoTime()))
      )
            .expectSubscription()
            .expectNext(0L)
            .verifyComplete();
      long endValue = end.longValue();
      long startValue = start.longValue();
      long measuredDelay = endValue - startValue;
      long measuredDelayMs = TimeUnit.NANOSECONDS.toMillis(measuredDelay);
      assertThat(measuredDelayMs)
          .as("iteration %s, measured delay %s nanos, start at %s nanos, end at %s nanos", i, measuredDelay, startValue, endValue)
          .isGreaterThanOrEqualTo(100L)
          .isLessThan(200L);
    }
    finally {
      s.dispose();
    }
  }
}

代码示例来源:origin: reactor/reactor-core

@Test
public void smokeTestDelay() {
  for (int i = 0; i < 20; i++) {
    Scheduler s = Schedulers.newParallel("test");
    AtomicLong start = new AtomicLong();
    AtomicLong end = new AtomicLong();
    try {
      StepVerifier.create(Mono
          .delay(Duration.ofMillis(100), s)
          .doOnSubscribe(sub -> start.set(System.nanoTime()))
          .doOnTerminate(() -> end.set(System.nanoTime()))
      )
            .expectSubscription()
            .expectNext(0L)
            .verifyComplete();
      long endValue = end.longValue();
      long startValue = start.longValue();
      long measuredDelay = endValue - startValue;
      long measuredDelayMs = TimeUnit.NANOSECONDS.toMillis(measuredDelay);
      assertThat(measuredDelayMs)
          .as("iteration %s, measured delay %s nanos, start at %s nanos, end at %s nanos", i, measuredDelay, startValue, endValue)
          .isGreaterThanOrEqualTo(100L)
          .isLessThan(200L);
    }
    finally {
      s.dispose();
    }
  }
}

代码示例来源:origin: reactor/reactor-core

@Test
public void smokeTestDelay() {
  for (int i = 0; i < 20; i++) {
    Scheduler s = Schedulers.fromExecutorService(Executors.newScheduledThreadPool(1));
    AtomicLong start = new AtomicLong();
    AtomicLong end = new AtomicLong();
    try {
      StepVerifier.create(Mono
          .delay(Duration.ofMillis(100), s)
          .log()
          .doOnSubscribe(sub -> start.set(System.nanoTime()))
          .doOnTerminate(() -> end.set(System.nanoTime()))
      )
            .expectSubscription()
            .expectNext(0L)
            .verifyComplete();
      long endValue = end.longValue();
      long startValue = start.longValue();
      long measuredDelay = endValue - startValue;
      long measuredDelayMs = TimeUnit.NANOSECONDS.toMillis(measuredDelay);
      assertThat(measuredDelayMs)
          .as("iteration %s, measured delay %s nanos, start at %s nanos, end at %s nanos", i, measuredDelay, startValue, endValue)
          .isGreaterThanOrEqualTo(100L)
          .isLessThan(200L);
    }
    finally {
      s.dispose();
    }
  }
}

代码示例来源:origin: reactor/reactor-core

@Test
public void smokeTestDelay() {
  for (int i = 0; i < 20; i++) {
    Scheduler s = Schedulers.newSingle("test");
    AtomicLong start = new AtomicLong();
    AtomicLong end = new AtomicLong();
    try {
      StepVerifier.create(Mono
          .delay(Duration.ofMillis(100), s)
          .log()
          .doOnSubscribe(sub -> start.set(System.nanoTime()))
          .doOnTerminate(() -> end.set(System.nanoTime()))
      )
            .expectSubscription()
            .expectNext(0L)
            .verifyComplete();
      long endValue = end.longValue();
      long startValue = start.longValue();
      long measuredDelay = endValue - startValue;
      long measuredDelayMs = TimeUnit.NANOSECONDS.toMillis(measuredDelay);
      assertThat(measuredDelayMs)
          .as("iteration %s, measured delay %s nanos, start at %s nanos, end at %s nanos", i, measuredDelay, startValue, endValue)
          .isGreaterThanOrEqualTo(100L)
          .isLessThan(200L);
    }
    finally {
      s.dispose();
    }
  }
}

代码示例来源:origin: reactor/reactor-core

@Test
public void monoFromFluxThatIsItselfFromMono() {
  AtomicBoolean emitted = new AtomicBoolean();
  AtomicBoolean terminated = new AtomicBoolean();
  AtomicBoolean cancelled = new AtomicBoolean();
  AtomicBoolean succeeded = new AtomicBoolean();
  Mono<String> withCallback = Mono.just("foo")
                  .doOnNext(v -> emitted.set(true));
  Mono<String> original = withCallback
      .doOnCancel(() -> cancelled.set(true))
      .doOnSuccess(v -> succeeded.set(true))
      .doOnTerminate(() -> terminated.set(true))
      .hide();
  assertThat(withCallback).as("withCallback is not Callable")
            .isNotInstanceOf(Fuseable.ScalarCallable.class)
            .isNotInstanceOf(Callable.class);
  assertThat(original).as("original is not callable Mono")
           .isNotInstanceOf(Fuseable.class)
           .isNotInstanceOf(Fuseable.ScalarCallable.class)
           .isNotInstanceOf(Callable.class);
  Flux<String> firstConversion = Flux.from(original);
  Mono<String> secondConversion = Mono.from(firstConversion);
  assertThat(secondConversion.block()).isEqualTo("foo");
  assertThat(emitted).as("emitted").isTrue();
  assertThat(succeeded).as("succeeded").isTrue();
  assertThat(cancelled).as("cancelled").isFalse();
  assertThat(terminated).as("terminated").isTrue();
  assertThat(secondConversion).as("conversions negated").isSameAs(original);
}

代码示例来源:origin: reactor/reactor-core

.log("resultStream")
                         .collectList()
                         .doOnTerminate(doneSemaphore::release)
                         .toProcessor();
listPromise.subscribe();

代码示例来源:origin: spring-projects/spring-integration

private Mono<Void> doHandle(ServerWebExchange exchange) {
  return extractRequestBody(exchange)
      .doOnSubscribe(s -> this.activeCount.incrementAndGet())
      .cast(Object.class)
      .switchIfEmpty(Mono.just(exchange.getRequest().getQueryParams()))
      .map(body ->
          new RequestEntity<>(body, exchange.getRequest().getHeaders(),
              exchange.getRequest().getMethod(), exchange.getRequest().getURI()))
      .flatMap(entity -> buildMessage(entity, exchange))
      .flatMap(requestTuple -> {
        if (this.expectReply) {
          return sendAndReceiveMessageReactive(requestTuple.getT1())
              .flatMap(replyMessage -> populateResponse(exchange, replyMessage));
        }
        else {
          send(requestTuple.getT1());
          return setStatusCode(exchange, requestTuple.getT2());
        }
      })
      .doOnTerminate(this.activeCount::decrementAndGet);
}

代码示例来源:origin: com.aol.cyclops/cyclops-reactor

/**
 * @param onTerminate
 * @return
 * @see reactor.core.publisher.Mono#doOnTerminate(java.util.function.BiConsumer)
 */
public final Mono<T> doOnTerminate(BiConsumer<? super T, Throwable> onTerminate) {
  return boxed.doOnTerminate(onTerminate);
}
/**

代码示例来源:origin: rsocket/rsocket-java

@Test
void requesterStreamsTerminatedOnZeroErrorFrame() {
 TestDuplexConnection conn = new TestDuplexConnection();
 List<Throwable> errors = new ArrayList<>();
 RSocketClient rSocket =
   new RSocketClient(
     conn, DefaultPayload::create, errors::add, StreamIdSupplier.clientSupplier());
 String errorMsg = "error";
 Mono.delay(Duration.ofMillis(100))
   .doOnTerminate(
     () ->
       conn.addToReceivedBuffer(Frame.Error.from(0, new RejectedSetupException(errorMsg))))
   .subscribe();
 StepVerifier.create(rSocket.requestResponse(DefaultPayload.create("test")))
   .expectErrorMatches(
     err -> err instanceof RejectedSetupException && errorMsg.equals(err.getMessage()))
   .verify(Duration.ofSeconds(5));
 assertThat(errors).hasSize(1);
 assertThat(rSocket.isDisposed()).isTrue();
}

代码示例来源:origin: scalecube/scalecube-services

@Override
public Mono<Void> close() {
 return connectionProvider
   .disposeLater()
   .doOnTerminate(() -> LOGGER.info("Closed http-client-sdk transport"));
}

代码示例来源:origin: scalecube/scalecube-services

@Override
public Mono<Void> close() {
 return Mono.defer(
   () -> {
    // noinspection unchecked
    Mono<RSocket> curr = rSocketMonoUpdater.get(this);
    return (curr == null ? Mono.<Void>empty() : curr.flatMap(this::dispose))
      .doOnTerminate(() -> LOGGER.info("Closed rsocket client sdk transport"));
   });
}

代码示例来源:origin: scalecube/scalecube-services

@Override
public Mono<Void> close() {
 return Mono.defer(
   () -> {
    // noinspection unchecked
    Mono<WebsocketSession> curr = websocketMonoUpdater.get(this);
    return (curr == null ? Mono.<Void>empty() : curr.flatMap(WebsocketSession::close))
      .doOnTerminate(() -> LOGGER.info("Closed websocket client sdk transport"));
   });
}

代码示例来源:origin: scalecube/scalecube-services

@Override
public Mono<RSocket> accept(ConnectionSetupPayload setup, RSocket rsocket) {
 LOGGER.info("Accepted rsocket websocket: {}, connectionSetup: {}", rsocket, setup);
 rsocket
   .onClose()
   .doOnTerminate(() -> LOGGER.info("Client disconnected: {}", rsocket))
   .subscribe(null, th -> LOGGER.error("Exception on closing rsocket: {}", th));
 // Prepare message codec together with headers from metainfo
 HeadersCodec headersCodec = HeadersCodec.getInstance(setup.metadataMimeType());
 ServiceMessageCodec messageCodec = new ServiceMessageCodec(headersCodec);
 return Mono.just(new GatewayRSocket(serviceCall, metrics, messageCodec));
}

代码示例来源:origin: scalecube/scalecube-services

private Mono<Void> closeConnections() {
 return Mono.defer(
   () ->
     Mono.when(
         connections
           .stream()
           .map(
             connection -> {
              connection.dispose();
              return connection
                .onTerminate()
                .doOnError(e -> LOGGER.warn("Failed to close connection: " + e))
                .onErrorResume(e -> Mono.empty());
             })
           .collect(Collectors.toList()))
       .doOnTerminate(connections::clear));
}

代码示例来源:origin: io.scalecube/scalecube-services-transport-rsocket

private Mono<Void> closeConnections() {
 return Mono.defer(
   () ->
     Mono.when(
         connections
           .stream()
           .map(
             connection -> {
              connection.dispose();
              return connection
                .onTerminate()
                .doOnError(e -> LOGGER.warn("Failed to close connection: " + e))
                .onErrorResume(e -> Mono.empty());
             })
           .collect(Collectors.toList()))
       .doOnTerminate(connections::clear));
}

代码示例来源:origin: io.scalecube/scalecube-gateway-rsocket-websocket

@Override
public Mono<RSocket> accept(ConnectionSetupPayload setup, RSocket rsocket) {
 LOGGER.info("Accepted rsocket websocket: {}, connectionSetup: {}", rsocket, setup);
 rsocket
   .onClose()
   .doOnTerminate(() -> LOGGER.info("Client disconnected: {}", rsocket))
   .subscribe();
 // Prepare message codec together with headers from metainfo
 HeadersCodec headersCodec = HeadersCodec.getInstance(setup.metadataMimeType());
 ServiceMessageCodec messageCodec = new ServiceMessageCodec(headersCodec);
 return Mono.just(new GatewayRSocket(serviceCall, metrics, messageCodec));
}

代码示例来源:origin: io.projectreactor.ipc/reactor-netty

protected void shutdownFromJVM() {
    if (context.isDisposed()) {
      return;
    }

    final String hookDesc = Thread.currentThread().toString();

    context.dispose();
    context.onClose()
        .doOnError(e -> LOG.error("Stopped {} on {} with an error {} from JVM hook {}",
            description, context.address(), e, hookDesc))
        .doOnTerminate(() -> LOG.info("Stopped {} on {} from JVM hook {}",
            description, context.address(), hookDesc))
        .timeout(lifecycleTimeout, Mono.error(new TimeoutException(description +
            " couldn't be stopped within " + lifecycleTimeout.toMillis() + "ms")))
        .block();
  }
}

代码示例来源:origin: io.projectreactor.ipc/reactor-netty

/**
 * Shut down the {@link NettyContext} and wait for its termination, up to the
 * {@link #setLifecycleTimeout(Duration) lifecycle timeout}.
 */
public void shutdown() {
  if (context.isDisposed()) {
    return;
  }
  removeShutdownHook(); //only applies if not called from the hook's thread
  context.dispose();
  context.onClose()
      .doOnError(e -> LOG.error("Stopped {} on {} with an error {}", description, context.address(), e))
      .doOnTerminate(() -> LOG.info("Stopped {} on {}", description, context.address()))
      .timeout(lifecycleTimeout, Mono.error(new TimeoutException(description + " couldn't be stopped within " + lifecycleTimeout.toMillis() + "ms")))
      .block();
}

相关文章

Mono类方法