reactor.core.publisher.Mono.log()方法的使用及代码示例

x33g5p2x  于2022-01-24 转载在 其他  
字(9.5k)|赞(0)|评价(0)|浏览(592)

本文整理了Java中reactor.core.publisher.Mono.log()方法的一些代码示例,展示了Mono.log()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Mono.log()方法的具体详情如下:
包路径:reactor.core.publisher.Mono
类名称:Mono
方法名:log

Mono.log介绍

[英]Observe all Reactive Streams signals and trace them using Logger support. Default will use Level#INFO and java.util.logging. If SLF4J is available, it will be used instead.

The default log category will be "reactor.Mono", followed by a suffix generated from the source operator, e.g. "reactor.Mono.Map".
[中]观察所有反应流信号,并使用记录器支持进行跟踪。默认设置将使用Level#INFO和java。util。登录中。如果SLF4J可用,将改用它。
默认日志类别将是“reactor.Mono”,后面是源操作符生成的后缀,例如“reactor.Mono.Map”。

代码示例

代码示例来源:origin: reactor/reactor-core

/**
 * Observe Reactive Streams signals matching the passed filter {@code options} and
 * trace them using a specific user-provided {@link Logger}, at {@link Level#INFO} level.
 * <p>
 * <img class="marble" src="doc-files/marbles/logForMono.svg" alt="">
 *
 * @param logger the {@link Logger} to use, instead of resolving one through a category.
 *
 * @return a new {@link Mono} that logs signals
 */
public final Mono<T> log(Logger logger) {
  return log(logger, Level.INFO, false);
}

代码示例来源:origin: reactor/reactor-core

/**
 * Observe all Reactive Streams signals and use {@link Logger} support to handle trace implementation. Default will
 * use {@link Level#INFO} and java.util.logging. If SLF4J is available, it will be used instead.
 *
 * <p>
 * <img class="marble" src="doc-files/marbles/logForMono.svg" alt="">
 *
 * @param category to be mapped into logger configuration (e.g. org.springframework
 * .reactor). If category ends with "." like "reactor.", a generated operator
 * suffix will complete, e.g. "reactor.Flux.Map".
 *
 * @return a new {@link Mono}
 */
public final Mono<T> log(@Nullable String category) {
  return log(category, Level.INFO);
}

代码示例来源:origin: codecentric/spring-boot-admin

protected Mono<Instance> doUpdateStatus(Instance instance) {
  if (!instance.isRegistered()) {
    return Mono.empty();
  }
  log.debug("Update status for {}", instance);
  return instanceWebClient.instance(instance)
              .get()
              .uri(Endpoint.HEALTH)
              .exchange()
              .log(log.getName(), Level.FINEST)
              .flatMap(this::convertStatusInfo)
              .doOnError(ex -> logError(instance, ex))
              .onErrorResume(this::handleError)
              .map(instance::withStatusInfo);
}

代码示例来源:origin: codecentric/spring-boot-admin

protected Mono<Instance> doUpdateInfo(Instance instance) {
  if (instance.getStatusInfo().isOffline() || instance.getStatusInfo().isUnknown()) {
    return Mono.empty();
  }
  if (!instance.getEndpoints().isPresent(Endpoint.INFO)) {
    return Mono.empty();
  }
  log.debug("Update info for {}", instance);
  return instanceWebClient.instance(instance)
              .get()
              .uri(Endpoint.INFO)
              .exchange()
              .log(log.getName(), Level.FINEST)
              .flatMap(response -> convertInfo(instance, response))
              .onErrorResume(ex -> Mono.just(convertInfo(instance, ex)))
              .map(instance::withInfo);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void promiseDelays() throws Exception {
  Tuple2<Long, String> h = Mono.delay(Duration.ofMillis(3000))
                 .log("time1")
                 .map(d -> "Spring wins")
                 .or(Mono.delay(Duration.ofMillis(2000)).log("time2").map(d -> "Spring Reactive"))
                 .flatMap(t -> Mono.just(t+ " world"))
                 .elapsed()
                 .block();
  assertThat("Alternate mono not seen", h.getT2(), is("Spring Reactive world"));
  System.out.println(h.getT1());
}

代码示例来源:origin: reactor/reactor-core

@Test
public void monoLogWithGivenLogger() {
  Level level = Level.WARNING;
  Mono<String> source = Mono.just("foo");
  Logger mockLogger = Mockito.mock(Logger.class);
  source.log(mockLogger, level, false, SignalType.ON_NEXT)
     .subscribe();
  verify(mockLogger, only()).warn(anyString(), eq(SignalType.ON_NEXT),
      eq("foo"));
  verifyNoMoreInteractions(mockLogger);
}

代码示例来源:origin: reactor/reactor-core

private void demonstrateLogError() {
  Loggers.getLogger("logError.default")
      .warn("The following logs should demonstrate similar error output, but respectively at ERROR, DEBUG and TRACE levels");
  Mono<Object> error = Mono.error(new IllegalStateException("boom"));
  error.log("logError.default")
     .subscribe(v -> {}, e -> {});
  error.log("logError.fine", Level.FINE)
     .subscribe(v -> {}, e -> {});
  error.log("logError.finest", Level.FINEST)
     .subscribe(v -> {}, e -> {});
}

代码示例来源:origin: reactor/reactor-core

@Test
public void hasElementCancel() throws InterruptedException {
  AtomicBoolean cancelled = new AtomicBoolean();
  Mono.just("foo").hide()
    .doOnCancel(() -> cancelled.set(true))
    .log()
    .hasElement()
    .subscribe(v -> {}, e -> {}, () -> {},
        Subscription::cancel);
  assertThat(cancelled.get()).isTrue();
}

代码示例来源:origin: spring-cloud/spring-cloud-gateway

@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
  // NOTICE: nothing in "pre" filter stage as CLIENT_RESPONSE_ATTR is not added
  // until the WebHandler is run
  return chain.filter(exchange).then(Mono.defer(() -> {
    ClientResponse clientResponse = exchange.getAttribute(CLIENT_RESPONSE_ATTR);
    if (clientResponse == null) {
      return Mono.empty();
    }
    log.trace("WebClientWriteResponseFilter start");
    ServerHttpResponse response = exchange.getResponse();
    return response.writeWith(clientResponse.body(BodyExtractors.toDataBuffers())).log("webClient response");
  }));
}

代码示例来源:origin: reactor/reactor-core

@Test
public void raceSubscribeAndCache() {
  AtomicInteger count = new AtomicInteger();
  Mono<Integer> source = Mono.fromCallable(count::getAndIncrement);
  for (int i = 0; i < 500; i++) {
    Mono<Integer> cached;
    if (i == 0) {
      cached = source.log().cache(Duration.ofSeconds(2));
    }
    else {
      cached = source.cache(Duration.ofSeconds(2));
    }
    RaceTestUtils.race(cached::subscribe, cached::subscribe);
  }
  assertThat(count.get()).isEqualTo(500);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void cancelUpstreamOnceWhenCancelled() {
  VirtualTimeScheduler vts = VirtualTimeScheduler.create();
  AtomicLong upstreamCancelCount = new AtomicLong();
  Mono<String> source = Mono.just("foo").log().hide()
      .doOnCancel(() -> upstreamCancelCount.incrementAndGet());
  StepVerifier.withVirtualTime(
      () -> new MonoDelayElement<>(source, 2, TimeUnit.SECONDS, vts),
      () -> vts, Long.MAX_VALUE)
        .expectSubscription()
        .expectNoEvent(Duration.ofSeconds(1))
        .thenCancel()
        .verify();
  vts.advanceTimeBy(Duration.ofHours(1));
  assertThat(upstreamCancelCount.get()).isEqualTo(1);
}

代码示例来源:origin: reactor/reactor-core

@Test(timeout = 5000L)
public void emptyIsImmediate() {
  Mono<String> source = Mono.<String>empty().log().hide();
  Duration d = StepVerifier.create(new MonoDelayElement<>(source, 10, TimeUnit.SECONDS,
      defaultSchedulerForDelay()).log())
        .expectSubscription()
        .verifyComplete();
  assertThat(d).isLessThan(Duration.ofSeconds(1));
}

代码示例来源:origin: reactor/reactor-core

@Test
public void normalIsDelayed() {
  Mono<String> source = Mono.just("foo").log().hide();
  StepVerifier.withVirtualTime(() -> new MonoDelayElement<>(source, 2, TimeUnit.SECONDS,
      defaultSchedulerForDelay()).log())
        .expectSubscription()
        .expectNoEvent(Duration.ofSeconds(2))
        .expectNext("foo")
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void contextGetMono() throws InterruptedException {
  StepVerifier.create(Mono.just(1)
              .log()
              .handle((d, c) -> c.next(c.currentContext().get("test") + "" + d))
              .handle((d, c) -> c.next(c.currentContext().get("test2") + "" + d))
              .subscriberContext(ctx -> ctx.put("test2", "bar"))
              .subscriberContext(ctx -> ctx.put("test", "foo"))
              .log())
        .expectNext("barfoo1")
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void test() throws Exception {
  Flux.just("red", "white", "blue")
    .log("source")
    .flatMap(value -> Mono.fromCallable(() -> {
                Thread.sleep(1000);
                return value;
              }).subscribeOn(Schedulers.elastic()))
    .log("merged")
    .collect(Result::new, Result::add)
    .doOnNext(Result::stop)
    .log("accumulated")
    .toFuture()
    .get();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void contextGetHideMono() throws InterruptedException {
  StepVerifier.create(Mono.just(1)
              .hide()
              .log()
              .handle((d, c) -> c.next(c.currentContext().get("test") + "" + d))
              .handle((d, c) -> c.next(c.currentContext().get("test2") + "" + d))
              .subscriberContext(ctx -> ctx.put("test", "foo"))
              .subscriberContext(ctx -> ctx.put("test2", "bar"))
              .log())
        .expectNext("barfoo1")
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void verifyVirtualTimeNoEventNever() {
  StepVerifier.withVirtualTime(() -> Mono.never()
                      .log())
        .expectSubscription()
        .expectNoEvent(Duration.ofDays(10000))
        .thenCancel()
        .verify();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void callableReturnsNull() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  Mono.<Integer>fromCallable(() -> null).log().flux()
                     .subscribe(ts);
  ts.assertNoValues()
   .assertNoError()
   .assertComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void verifyVirtualTimeNoEventNeverError() {
  assertThatExceptionOfType(AssertionError.class)
      .isThrownBy(() -> StepVerifier.withVirtualTime(() -> Mono.never()
                                   .log())
                     .expectNoEvent(Duration.ofDays(10000))
                     .thenCancel()
                     .verify())
      .withMessageStartingWith("expectation failed (expected no event: onSubscribe(");
}

代码示例来源:origin: reactor/reactor-core

@Test
public void onAfterTerminateFusion() {
  LongAdder invoked = new LongAdder();
  Mono<Integer> mono = Flux
      .range(1, 10)
      .reduce((a, b) -> a + b)
      .doAfterTerminate(invoked::increment);
  StepVerifier.create(mono.log())
        .expectFusion()
        .expectNext(55)
        .expectComplete()
        .verify();
  assertEquals(1, invoked.intValue());
}

相关文章

Mono类方法