本文整理了Java中reactor.core.publisher.Mono.doOnCancel()
方法的一些代码示例,展示了Mono.doOnCancel()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Mono.doOnCancel()
方法的具体详情如下:
包路径:reactor.core.publisher.Mono
类名称:Mono
方法名:doOnCancel
[英]Add behavior triggered when the Mono is cancelled.
[中]添加取消Mono时触发的行为。
代码示例来源:origin: spring-projects/spring-framework
@Override
public Mono<ClientResponse> exchange(ClientRequest clientRequest) {
Assert.notNull(clientRequest, "ClientRequest must not be null");
HttpMethod httpMethod = clientRequest.method();
URI url = clientRequest.url();
String logPrefix = clientRequest.logPrefix();
return this.connector
.connect(httpMethod, url, httpRequest -> clientRequest.writeTo(httpRequest, this.strategies))
.doOnRequest(n -> logRequest(clientRequest))
.doOnCancel(() -> logger.debug(logPrefix + "Cancel signal (to close connection)"))
.map(httpResponse -> {
logResponse(httpResponse, logPrefix);
return new DefaultClientResponse(httpResponse, this.strategies, logPrefix);
});
}
代码示例来源:origin: reactor/reactor-core
@Test
public void monoBlockDoesntCancel() {
AtomicLong cancelCount = new AtomicLong();
Mono.just("data")
.doOnCancel(cancelCount::incrementAndGet)
.block();
assertThat(cancelCount.get()).isEqualTo(0);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void monoNotCancelledByMonoProcessor() {
AtomicLong cancelCounter = new AtomicLong();
MonoProcessor<String> monoProcessor = Mono.just("foo")
.doOnCancel(cancelCounter::incrementAndGet)
.toProcessor();
monoProcessor.subscribe();
assertThat(cancelCounter.get()).isEqualTo(0);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void monoBlockOptionalDoesntCancel() {
AtomicLong cancelCount = new AtomicLong();
Mono.just("data")
.doOnCancel(cancelCount::incrementAndGet)
.blockOptional();
assertThat(cancelCount.get()).isEqualTo(0);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void testCancel() {
AtomicLong cancelCount = new AtomicLong();
Mono.delay(Duration.ofMillis(500))
.doOnCancel(cancelCount::incrementAndGet)
.subscribe(v -> {})
.dispose();
Assertions.assertThat(cancelCount.get()).isEqualTo(1);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void triggerSequenceHasSingleValueNotCancelled() {
AtomicBoolean triggerCancelled = new AtomicBoolean();
StepVerifier.create(Mono.just("foo")
.delayUntil(
a -> Mono.just(1)
.doOnCancel(() -> triggerCancelled.set(true))))
.expectNext("foo")
.verifyComplete();
assertThat(triggerCancelled.get()).isFalse();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void innerMonoNotCancelled() {
AtomicInteger cancelCount = new AtomicInteger();
StepVerifier.create(Mono.just(3)
.filterWhen(v -> Mono.just(true)
.doOnCancel(cancelCount::incrementAndGet)))
.expectNext(3)
.verifyComplete();
assertThat(cancelCount.get()).isEqualTo(0);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void hasElementCancel() throws InterruptedException {
AtomicBoolean cancelled = new AtomicBoolean();
Mono.just("foo").hide()
.doOnCancel(() -> cancelled.set(true))
.log()
.hasElement()
.subscribe(v -> {}, e -> {}, () -> {},
Subscription::cancel);
assertThat(cancelled.get()).isTrue();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void emptySources() {
AtomicBoolean cancelled = new AtomicBoolean();
Mono<String> empty1 = Mono.empty();
Mono<String> empty2 = Mono.empty();
Mono<String> empty3 = Mono.<String>empty().delaySubscription(Duration.ofMillis(500))
.doOnCancel(() -> cancelled.set(true));
Duration d = StepVerifier.create(Mono.zip(empty1, empty2, empty3))
.verifyComplete();
assertThat(cancelled).isTrue();
assertThat(d).isLessThan(Duration.ofMillis(500));
}
代码示例来源:origin: reactor/reactor-core
@Test
public void monoSourceIsNotCancelled() {
AtomicLong cancelCount = new AtomicLong();
StepVerifier.create(Mono.just(1)
.doOnCancel(cancelCount::incrementAndGet)
.hasElement())
.expectNext(true)
.verifyComplete();
assertThat(cancelCount.get()).isEqualTo(0);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void triggerSequenceHasSingleValueNotCancelled() {
AtomicBoolean triggerCancelled = new AtomicBoolean();
StepVerifier.create(Flux.just("foo")
.delayUntil(
a -> Mono.just(1)
.doOnCancel(() -> triggerCancelled.set(true))))
.expectNext("foo")
.verifyComplete();
assertThat(triggerCancelled.get()).isFalse();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void partialCancelDoesntCancelSource() {
AtomicInteger cancelled = new AtomicInteger();
Mono<Object> cached = Mono.never()
.doOnCancel(cancelled::incrementAndGet)
.cache(Duration.ofMillis(200));
Disposable d1 = cached.subscribe();
Disposable d2 = cached.subscribe();
d1.dispose();
assertThat(cancelled.get()).isEqualTo(0);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void innerMonoNotCancelled() {
AtomicInteger cancelCount = new AtomicInteger();
StepVerifier.create(Flux.range(1, 3)
.filterWhen(v -> Mono.just(true)
.doOnCancel(cancelCount::incrementAndGet)))
.expectNext(1, 2, 3)
.verifyComplete();
assertThat(cancelCount.get()).isEqualTo(0);
}
代码示例来源:origin: reactor/reactor-core
@Override
public Mono<T> mono() {
return Mono.from(delegate)
.doOnSubscribe(sub -> incrementAndGet(SUBSCRIBED))
.doOnCancel(() -> incrementAndGet(CANCELLED))
.doOnRequest(l -> incrementAndGet(REQUESTED));
}
代码示例来源:origin: reactor/reactor-core
@Test
public void delayErrorEmptySources() {
AtomicBoolean cancelled = new AtomicBoolean();
Mono<String> empty1 = Mono.empty();
Mono<String> empty2 = Mono.empty();
Mono<String> empty3 = Mono.<String>empty().delaySubscription(Duration.ofMillis(500))
.doOnCancel(() -> cancelled.set(true));
StepVerifier.create(Mono.zipDelayError(empty1, empty2, empty3))
.expectSubscription()
.expectNoEvent(Duration.ofMillis(400))
.verifyComplete();
assertThat(cancelled).isFalse();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void totalCancelDoesntCancelSource() {
AtomicInteger cancelled = new AtomicInteger();
Mono<Object> cached = Mono.never()
.doOnCancel(cancelled::incrementAndGet)
.cache(Duration.ofMillis(200));
Disposable d1 = cached.subscribe();
Disposable d2 = cached.subscribe();
d1.dispose();
d2.dispose();
assertThat(cancelled.get()).isEqualTo(0);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void monoResourcePublisherIsNotCancelled() {
AtomicBoolean cancelled = new AtomicBoolean();
AtomicBoolean commitDone = new AtomicBoolean();
AtomicBoolean rollbackDone = new AtomicBoolean();
Mono<String> resourcePublisher = Mono.just("Resource")
.doOnCancel(() -> cancelled.set(true));
Flux<String> test = Flux.usingWhen(resourcePublisher,
Flux::just,
tr -> Mono.fromRunnable(() -> commitDone.set(true)),
tr -> Mono.fromRunnable(() -> rollbackDone.set(true)));
StepVerifier.create(test)
.expectNext("Resource")
.expectComplete()
.verifyThenAssertThat()
.hasNotDroppedErrors();
assertThat(commitDone).isTrue();
assertThat(rollbackDone).isFalse();
assertThat(cancelled).as("resource publisher was not cancelled").isFalse();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void monoResourcePublisherIsNotCancelled() {
AtomicBoolean cancelled = new AtomicBoolean();
AtomicBoolean commitDone = new AtomicBoolean();
AtomicBoolean rollbackDone = new AtomicBoolean();
Mono<String> resourcePublisher = Mono.just("Resource")
.doOnCancel(() -> cancelled.set(true));
Mono<String> test = Mono.usingWhen(resourcePublisher,
Mono::just,
tr -> Mono.fromRunnable(() -> commitDone.set(true)),
tr -> Mono.fromRunnable(() -> rollbackDone.set(true)));
StepVerifier.create(test)
.expectNext("Resource")
.expectComplete()
.verifyThenAssertThat()
.hasNotDroppedErrors();
assertThat(commitDone).isTrue();
assertThat(rollbackDone).isFalse();
assertThat(cancelled).as("resource publisher was not cancelled").isFalse();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void cancelUpstreamOnceWhenCancelled() {
VirtualTimeScheduler vts = VirtualTimeScheduler.create();
AtomicLong upstreamCancelCount = new AtomicLong();
Mono<String> source = Mono.just("foo").log().hide()
.doOnCancel(() -> upstreamCancelCount.incrementAndGet());
StepVerifier.withVirtualTime(
() -> new MonoDelayElement<>(source, 2, TimeUnit.SECONDS, vts),
() -> vts, Long.MAX_VALUE)
.expectSubscription()
.expectNoEvent(Duration.ofSeconds(1))
.thenCancel()
.verify();
vts.advanceTimeBy(Duration.ofHours(1));
assertThat(upstreamCancelCount.get()).isEqualTo(1);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void normalCancel() {
AtomicBoolean cancelCheck = new AtomicBoolean(false);
StepVerifier.create(Mono.just(1).hide()
.doOnCancel(() -> cancelCheck.set(true))
.doFinally(this))
.expectNoFusionSupport()
.expectNext(1)
.thenCancel()
.verify();
assertEquals("expected doFinally to be invoked exactly once", 1, calls);
assertEquals(SignalType.CANCEL, signalType);
assertTrue("expected tested mono to be cancelled", cancelCheck.get());
}
内容来源于网络,如有侵权,请联系作者删除!