reactor.core.publisher.Mono.doOnCancel()方法的使用及代码示例

x33g5p2x  于2022-01-24 转载在 其他  
字(8.5k)|赞(0)|评价(0)|浏览(497)

本文整理了Java中reactor.core.publisher.Mono.doOnCancel()方法的一些代码示例,展示了Mono.doOnCancel()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Mono.doOnCancel()方法的具体详情如下:
包路径:reactor.core.publisher.Mono
类名称:Mono
方法名:doOnCancel

Mono.doOnCancel介绍

[英]Add behavior triggered when the Mono is cancelled.
[中]添加取消Mono时触发的行为。

代码示例

代码示例来源:origin: spring-projects/spring-framework

@Override
public Mono<ClientResponse> exchange(ClientRequest clientRequest) {
  Assert.notNull(clientRequest, "ClientRequest must not be null");
  HttpMethod httpMethod = clientRequest.method();
  URI url = clientRequest.url();
  String logPrefix = clientRequest.logPrefix();
  return this.connector
      .connect(httpMethod, url, httpRequest -> clientRequest.writeTo(httpRequest, this.strategies))
      .doOnRequest(n -> logRequest(clientRequest))
      .doOnCancel(() -> logger.debug(logPrefix + "Cancel signal (to close connection)"))
      .map(httpResponse -> {
        logResponse(httpResponse, logPrefix);
        return new DefaultClientResponse(httpResponse, this.strategies, logPrefix);
      });
}

代码示例来源:origin: reactor/reactor-core

@Test
public void monoBlockDoesntCancel() {
  AtomicLong cancelCount = new AtomicLong();
  Mono.just("data")
    .doOnCancel(cancelCount::incrementAndGet)
    .block();
  assertThat(cancelCount.get()).isEqualTo(0);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void monoNotCancelledByMonoProcessor() {
  AtomicLong cancelCounter = new AtomicLong();
  MonoProcessor<String> monoProcessor = Mono.just("foo")
                       .doOnCancel(cancelCounter::incrementAndGet)
                       .toProcessor();
  monoProcessor.subscribe();
  assertThat(cancelCounter.get()).isEqualTo(0);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void monoBlockOptionalDoesntCancel() {
  AtomicLong cancelCount = new AtomicLong();
  Mono.just("data")
    .doOnCancel(cancelCount::incrementAndGet)
    .blockOptional();
  assertThat(cancelCount.get()).isEqualTo(0);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void testCancel() {
  AtomicLong cancelCount = new AtomicLong();
  Mono.delay(Duration.ofMillis(500))
    .doOnCancel(cancelCount::incrementAndGet)
    .subscribe(v -> {})
    .dispose();
  Assertions.assertThat(cancelCount.get()).isEqualTo(1);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void triggerSequenceHasSingleValueNotCancelled() {
  AtomicBoolean triggerCancelled = new AtomicBoolean();
  StepVerifier.create(Mono.just("foo")
              .delayUntil(
                  a -> Mono.just(1)
                       .doOnCancel(() -> triggerCancelled.set(true))))
        .expectNext("foo")
        .verifyComplete();
  assertThat(triggerCancelled.get()).isFalse();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void innerMonoNotCancelled() {
  AtomicInteger cancelCount = new AtomicInteger();
  StepVerifier.create(Mono.just(3)
              .filterWhen(v -> Mono.just(true)
                         .doOnCancel(cancelCount::incrementAndGet)))
        .expectNext(3)
        .verifyComplete();
  assertThat(cancelCount.get()).isEqualTo(0);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void hasElementCancel() throws InterruptedException {
  AtomicBoolean cancelled = new AtomicBoolean();
  Mono.just("foo").hide()
    .doOnCancel(() -> cancelled.set(true))
    .log()
    .hasElement()
    .subscribe(v -> {}, e -> {}, () -> {},
        Subscription::cancel);
  assertThat(cancelled.get()).isTrue();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void emptySources() {
  AtomicBoolean cancelled = new AtomicBoolean();
  Mono<String> empty1 = Mono.empty();
  Mono<String> empty2 = Mono.empty();
  Mono<String> empty3 = Mono.<String>empty().delaySubscription(Duration.ofMillis(500))
      .doOnCancel(() -> cancelled.set(true));
  Duration d = StepVerifier.create(Mono.zip(empty1, empty2, empty3))
        .verifyComplete();
  assertThat(cancelled).isTrue();
  assertThat(d).isLessThan(Duration.ofMillis(500));
}

代码示例来源:origin: reactor/reactor-core

@Test
public void monoSourceIsNotCancelled() {
  AtomicLong cancelCount = new AtomicLong();
  StepVerifier.create(Mono.just(1)
              .doOnCancel(cancelCount::incrementAndGet)
              .hasElement())
        .expectNext(true)
        .verifyComplete();
  assertThat(cancelCount.get()).isEqualTo(0);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void triggerSequenceHasSingleValueNotCancelled() {
  AtomicBoolean triggerCancelled = new AtomicBoolean();
  StepVerifier.create(Flux.just("foo")
              .delayUntil(
                  a -> Mono.just(1)
                       .doOnCancel(() -> triggerCancelled.set(true))))
        .expectNext("foo")
        .verifyComplete();
  assertThat(triggerCancelled.get()).isFalse();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void partialCancelDoesntCancelSource() {
  AtomicInteger cancelled = new AtomicInteger();
  Mono<Object> cached = Mono.never()
               .doOnCancel(cancelled::incrementAndGet)
               .cache(Duration.ofMillis(200));
  Disposable d1 = cached.subscribe();
  Disposable d2 = cached.subscribe();
  d1.dispose();
  assertThat(cancelled.get()).isEqualTo(0);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void innerMonoNotCancelled() {
  AtomicInteger cancelCount = new AtomicInteger();
  StepVerifier.create(Flux.range(1, 3)
              .filterWhen(v -> Mono.just(true)
                         .doOnCancel(cancelCount::incrementAndGet)))
        .expectNext(1, 2, 3)
        .verifyComplete();
  assertThat(cancelCount.get()).isEqualTo(0);
}

代码示例来源:origin: reactor/reactor-core

@Override
public Mono<T> mono() {
  return Mono.from(delegate)
        .doOnSubscribe(sub -> incrementAndGet(SUBSCRIBED))
        .doOnCancel(() -> incrementAndGet(CANCELLED))
        .doOnRequest(l -> incrementAndGet(REQUESTED));
}

代码示例来源:origin: reactor/reactor-core

@Test
public void delayErrorEmptySources() {
  AtomicBoolean cancelled = new AtomicBoolean();
  Mono<String> empty1 = Mono.empty();
  Mono<String> empty2 = Mono.empty();
  Mono<String> empty3 = Mono.<String>empty().delaySubscription(Duration.ofMillis(500))
      .doOnCancel(() -> cancelled.set(true));
  StepVerifier.create(Mono.zipDelayError(empty1, empty2, empty3))
        .expectSubscription()
        .expectNoEvent(Duration.ofMillis(400))
        .verifyComplete();
  assertThat(cancelled).isFalse();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void totalCancelDoesntCancelSource() {
  AtomicInteger cancelled = new AtomicInteger();
  Mono<Object> cached = Mono.never()
               .doOnCancel(cancelled::incrementAndGet)
               .cache(Duration.ofMillis(200));
  Disposable d1 = cached.subscribe();
  Disposable d2 = cached.subscribe();
  d1.dispose();
  d2.dispose();
  assertThat(cancelled.get()).isEqualTo(0);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void monoResourcePublisherIsNotCancelled() {
  AtomicBoolean cancelled = new AtomicBoolean();
  AtomicBoolean commitDone = new AtomicBoolean();
  AtomicBoolean rollbackDone = new AtomicBoolean();
  Mono<String> resourcePublisher = Mono.just("Resource")
                     .doOnCancel(() -> cancelled.set(true));
  Flux<String> test = Flux.usingWhen(resourcePublisher,
      Flux::just,
      tr -> Mono.fromRunnable(() -> commitDone.set(true)),
      tr -> Mono.fromRunnable(() -> rollbackDone.set(true)));
  StepVerifier.create(test)
        .expectNext("Resource")
        .expectComplete()
        .verifyThenAssertThat()
        .hasNotDroppedErrors();
  assertThat(commitDone).isTrue();
  assertThat(rollbackDone).isFalse();
  assertThat(cancelled).as("resource publisher was not cancelled").isFalse();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void monoResourcePublisherIsNotCancelled() {
  AtomicBoolean cancelled = new AtomicBoolean();
  AtomicBoolean commitDone = new AtomicBoolean();
  AtomicBoolean rollbackDone = new AtomicBoolean();
  Mono<String> resourcePublisher = Mono.just("Resource")
                     .doOnCancel(() -> cancelled.set(true));
  Mono<String> test = Mono.usingWhen(resourcePublisher,
      Mono::just,
      tr -> Mono.fromRunnable(() -> commitDone.set(true)),
      tr -> Mono.fromRunnable(() -> rollbackDone.set(true)));
  StepVerifier.create(test)
        .expectNext("Resource")
        .expectComplete()
        .verifyThenAssertThat()
        .hasNotDroppedErrors();
  assertThat(commitDone).isTrue();
  assertThat(rollbackDone).isFalse();
  assertThat(cancelled).as("resource publisher was not cancelled").isFalse();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void cancelUpstreamOnceWhenCancelled() {
  VirtualTimeScheduler vts = VirtualTimeScheduler.create();
  AtomicLong upstreamCancelCount = new AtomicLong();
  Mono<String> source = Mono.just("foo").log().hide()
      .doOnCancel(() -> upstreamCancelCount.incrementAndGet());
  StepVerifier.withVirtualTime(
      () -> new MonoDelayElement<>(source, 2, TimeUnit.SECONDS, vts),
      () -> vts, Long.MAX_VALUE)
        .expectSubscription()
        .expectNoEvent(Duration.ofSeconds(1))
        .thenCancel()
        .verify();
  vts.advanceTimeBy(Duration.ofHours(1));
  assertThat(upstreamCancelCount.get()).isEqualTo(1);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void normalCancel() {
  AtomicBoolean cancelCheck = new AtomicBoolean(false);
  StepVerifier.create(Mono.just(1).hide()
              .doOnCancel(() -> cancelCheck.set(true))
              .doFinally(this))
        .expectNoFusionSupport()
        .expectNext(1)
        .thenCancel()
        .verify();
  
  assertEquals("expected doFinally to be invoked exactly once", 1, calls);
  assertEquals(SignalType.CANCEL, signalType);
  assertTrue("expected tested mono to be cancelled", cancelCheck.get());
}

相关文章

Mono类方法