本文整理了Java中reactor.core.publisher.Mono.log()
方法的一些代码示例,展示了Mono.log()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Mono.log()
方法的具体详情如下:
包路径:reactor.core.publisher.Mono
类名称:Mono
方法名:log
[英]Observe all Reactive Streams signals and trace them using Logger support. Default will use Level#INFO and java.util.logging. If SLF4J is available, it will be used instead.
The default log category will be "reactor.Mono", followed by a suffix generated from the source operator, e.g. "reactor.Mono.Map".
[中]观察所有反应流信号,并使用记录器支持进行跟踪。默认设置将使用Level#INFO和java。util。登录中。如果SLF4J可用,将改用它。
默认日志类别将是“reactor.Mono”,后面是源操作符生成的后缀,例如“reactor.Mono.Map”。
代码示例来源:origin: reactor/reactor-core
/**
* Observe Reactive Streams signals matching the passed filter {@code options} and
* trace them using a specific user-provided {@link Logger}, at {@link Level#INFO} level.
* <p>
* <img class="marble" src="doc-files/marbles/logForMono.svg" alt="">
*
* @param logger the {@link Logger} to use, instead of resolving one through a category.
*
* @return a new {@link Mono} that logs signals
*/
public final Mono<T> log(Logger logger) {
return log(logger, Level.INFO, false);
}
代码示例来源:origin: reactor/reactor-core
/**
* Observe all Reactive Streams signals and use {@link Logger} support to handle trace implementation. Default will
* use {@link Level#INFO} and java.util.logging. If SLF4J is available, it will be used instead.
*
* <p>
* <img class="marble" src="doc-files/marbles/logForMono.svg" alt="">
*
* @param category to be mapped into logger configuration (e.g. org.springframework
* .reactor). If category ends with "." like "reactor.", a generated operator
* suffix will complete, e.g. "reactor.Flux.Map".
*
* @return a new {@link Mono}
*/
public final Mono<T> log(@Nullable String category) {
return log(category, Level.INFO);
}
代码示例来源:origin: codecentric/spring-boot-admin
protected Mono<Instance> doUpdateStatus(Instance instance) {
if (!instance.isRegistered()) {
return Mono.empty();
}
log.debug("Update status for {}", instance);
return instanceWebClient.instance(instance)
.get()
.uri(Endpoint.HEALTH)
.exchange()
.log(log.getName(), Level.FINEST)
.flatMap(this::convertStatusInfo)
.doOnError(ex -> logError(instance, ex))
.onErrorResume(this::handleError)
.map(instance::withStatusInfo);
}
代码示例来源:origin: codecentric/spring-boot-admin
protected Mono<Instance> doUpdateInfo(Instance instance) {
if (instance.getStatusInfo().isOffline() || instance.getStatusInfo().isUnknown()) {
return Mono.empty();
}
if (!instance.getEndpoints().isPresent(Endpoint.INFO)) {
return Mono.empty();
}
log.debug("Update info for {}", instance);
return instanceWebClient.instance(instance)
.get()
.uri(Endpoint.INFO)
.exchange()
.log(log.getName(), Level.FINEST)
.flatMap(response -> convertInfo(instance, response))
.onErrorResume(ex -> Mono.just(convertInfo(instance, ex)))
.map(instance::withInfo);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void promiseDelays() throws Exception {
Tuple2<Long, String> h = Mono.delay(Duration.ofMillis(3000))
.log("time1")
.map(d -> "Spring wins")
.or(Mono.delay(Duration.ofMillis(2000)).log("time2").map(d -> "Spring Reactive"))
.flatMap(t -> Mono.just(t+ " world"))
.elapsed()
.block();
assertThat("Alternate mono not seen", h.getT2(), is("Spring Reactive world"));
System.out.println(h.getT1());
}
代码示例来源:origin: reactor/reactor-core
@Test
public void monoLogWithGivenLogger() {
Level level = Level.WARNING;
Mono<String> source = Mono.just("foo");
Logger mockLogger = Mockito.mock(Logger.class);
source.log(mockLogger, level, false, SignalType.ON_NEXT)
.subscribe();
verify(mockLogger, only()).warn(anyString(), eq(SignalType.ON_NEXT),
eq("foo"));
verifyNoMoreInteractions(mockLogger);
}
代码示例来源:origin: reactor/reactor-core
private void demonstrateLogError() {
Loggers.getLogger("logError.default")
.warn("The following logs should demonstrate similar error output, but respectively at ERROR, DEBUG and TRACE levels");
Mono<Object> error = Mono.error(new IllegalStateException("boom"));
error.log("logError.default")
.subscribe(v -> {}, e -> {});
error.log("logError.fine", Level.FINE)
.subscribe(v -> {}, e -> {});
error.log("logError.finest", Level.FINEST)
.subscribe(v -> {}, e -> {});
}
代码示例来源:origin: reactor/reactor-core
@Test
public void hasElementCancel() throws InterruptedException {
AtomicBoolean cancelled = new AtomicBoolean();
Mono.just("foo").hide()
.doOnCancel(() -> cancelled.set(true))
.log()
.hasElement()
.subscribe(v -> {}, e -> {}, () -> {},
Subscription::cancel);
assertThat(cancelled.get()).isTrue();
}
代码示例来源:origin: spring-cloud/spring-cloud-gateway
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
// NOTICE: nothing in "pre" filter stage as CLIENT_RESPONSE_ATTR is not added
// until the WebHandler is run
return chain.filter(exchange).then(Mono.defer(() -> {
ClientResponse clientResponse = exchange.getAttribute(CLIENT_RESPONSE_ATTR);
if (clientResponse == null) {
return Mono.empty();
}
log.trace("WebClientWriteResponseFilter start");
ServerHttpResponse response = exchange.getResponse();
return response.writeWith(clientResponse.body(BodyExtractors.toDataBuffers())).log("webClient response");
}));
}
代码示例来源:origin: reactor/reactor-core
@Test
public void raceSubscribeAndCache() {
AtomicInteger count = new AtomicInteger();
Mono<Integer> source = Mono.fromCallable(count::getAndIncrement);
for (int i = 0; i < 500; i++) {
Mono<Integer> cached;
if (i == 0) {
cached = source.log().cache(Duration.ofSeconds(2));
}
else {
cached = source.cache(Duration.ofSeconds(2));
}
RaceTestUtils.race(cached::subscribe, cached::subscribe);
}
assertThat(count.get()).isEqualTo(500);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void cancelUpstreamOnceWhenCancelled() {
VirtualTimeScheduler vts = VirtualTimeScheduler.create();
AtomicLong upstreamCancelCount = new AtomicLong();
Mono<String> source = Mono.just("foo").log().hide()
.doOnCancel(() -> upstreamCancelCount.incrementAndGet());
StepVerifier.withVirtualTime(
() -> new MonoDelayElement<>(source, 2, TimeUnit.SECONDS, vts),
() -> vts, Long.MAX_VALUE)
.expectSubscription()
.expectNoEvent(Duration.ofSeconds(1))
.thenCancel()
.verify();
vts.advanceTimeBy(Duration.ofHours(1));
assertThat(upstreamCancelCount.get()).isEqualTo(1);
}
代码示例来源:origin: reactor/reactor-core
@Test(timeout = 5000L)
public void emptyIsImmediate() {
Mono<String> source = Mono.<String>empty().log().hide();
Duration d = StepVerifier.create(new MonoDelayElement<>(source, 10, TimeUnit.SECONDS,
defaultSchedulerForDelay()).log())
.expectSubscription()
.verifyComplete();
assertThat(d).isLessThan(Duration.ofSeconds(1));
}
代码示例来源:origin: reactor/reactor-core
@Test
public void normalIsDelayed() {
Mono<String> source = Mono.just("foo").log().hide();
StepVerifier.withVirtualTime(() -> new MonoDelayElement<>(source, 2, TimeUnit.SECONDS,
defaultSchedulerForDelay()).log())
.expectSubscription()
.expectNoEvent(Duration.ofSeconds(2))
.expectNext("foo")
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void contextGetMono() throws InterruptedException {
StepVerifier.create(Mono.just(1)
.log()
.handle((d, c) -> c.next(c.currentContext().get("test") + "" + d))
.handle((d, c) -> c.next(c.currentContext().get("test2") + "" + d))
.subscriberContext(ctx -> ctx.put("test2", "bar"))
.subscriberContext(ctx -> ctx.put("test", "foo"))
.log())
.expectNext("barfoo1")
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void test() throws Exception {
Flux.just("red", "white", "blue")
.log("source")
.flatMap(value -> Mono.fromCallable(() -> {
Thread.sleep(1000);
return value;
}).subscribeOn(Schedulers.elastic()))
.log("merged")
.collect(Result::new, Result::add)
.doOnNext(Result::stop)
.log("accumulated")
.toFuture()
.get();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void contextGetHideMono() throws InterruptedException {
StepVerifier.create(Mono.just(1)
.hide()
.log()
.handle((d, c) -> c.next(c.currentContext().get("test") + "" + d))
.handle((d, c) -> c.next(c.currentContext().get("test2") + "" + d))
.subscriberContext(ctx -> ctx.put("test", "foo"))
.subscriberContext(ctx -> ctx.put("test2", "bar"))
.log())
.expectNext("barfoo1")
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void verifyVirtualTimeNoEventNever() {
StepVerifier.withVirtualTime(() -> Mono.never()
.log())
.expectSubscription()
.expectNoEvent(Duration.ofDays(10000))
.thenCancel()
.verify();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void callableReturnsNull() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
Mono.<Integer>fromCallable(() -> null).log().flux()
.subscribe(ts);
ts.assertNoValues()
.assertNoError()
.assertComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void verifyVirtualTimeNoEventNeverError() {
assertThatExceptionOfType(AssertionError.class)
.isThrownBy(() -> StepVerifier.withVirtualTime(() -> Mono.never()
.log())
.expectNoEvent(Duration.ofDays(10000))
.thenCancel()
.verify())
.withMessageStartingWith("expectation failed (expected no event: onSubscribe(");
}
代码示例来源:origin: reactor/reactor-core
@Test
public void onAfterTerminateFusion() {
LongAdder invoked = new LongAdder();
Mono<Integer> mono = Flux
.range(1, 10)
.reduce((a, b) -> a + b)
.doAfterTerminate(invoked::increment);
StepVerifier.create(mono.log())
.expectFusion()
.expectNext(55)
.expectComplete()
.verify();
assertEquals(1, invoked.intValue());
}
内容来源于网络,如有侵权,请联系作者删除!