reactor.core.publisher.Mono.toProcessor()方法的使用及代码示例

x33g5p2x  于2022-01-24 转载在 其他  
字(7.6k)|赞(0)|评价(0)|浏览(610)

本文整理了Java中reactor.core.publisher.Mono.toProcessor()方法的一些代码示例,展示了Mono.toProcessor()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Mono.toProcessor()方法的具体详情如下:
包路径:reactor.core.publisher.Mono
类名称:Mono
方法名:toProcessor

Mono.toProcessor介绍

[英]Wrap this Mono into a MonoProcessor (turning it hot and allowing to block, cancel, as well as many other operations). Note that the MonoProcessoris subscribed to its parent source if any.
[中]

代码示例

代码示例来源:origin: spring-projects/spring-framework

public MonoToListenableFutureAdapter(Mono<T> mono) {
  Assert.notNull(mono, "Mono must not be null");
  this.processor = mono
      .doOnSuccess(this.registry::success)
      .doOnError(this.registry::failure)
      .toProcessor();
}

代码示例来源:origin: org.springframework/spring-core

public MonoToListenableFutureAdapter(Mono<T> mono) {
  Assert.notNull(mono, "Mono must not be null");
  this.processor = mono
      .doOnSuccess(this.registry::success)
      .doOnError(this.registry::failure)
      .toProcessor();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void monoNotCancelledByMonoProcessor() {
  AtomicLong cancelCounter = new AtomicLong();
  MonoProcessor<String> monoProcessor = Mono.just("foo")
                       .doOnCancel(cancelCounter::incrementAndGet)
                       .toProcessor();
  monoProcessor.subscribe();
  assertThat(cancelCounter.get()).isEqualTo(0);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void monoProcessorBlockIsUnbounded() {
  long start = System.nanoTime();
  String result = Mono.just("foo")
            .delayElement(Duration.ofMillis(500))
            .toProcessor()
            .block();
  assertThat(result).isEqualTo("foo");
  assertThat(Duration.ofNanos(System.nanoTime() - start))
      .isGreaterThanOrEqualTo(Duration.ofMillis(500));
}

代码示例来源:origin: reactor/reactor-core

@Test
public void monoToProcessorConnects() {
  TestPublisher<String> tp = TestPublisher.create();
  MonoProcessor<String> connectedProcessor = tp.mono().toProcessor();
  assertThat(connectedProcessor.subscription).isNotNull();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void onMonoRejectedDoOnErrorClazzNot() {
  Mono<String> mp = Mono.error(new TestException());
  AtomicReference<Throwable> ref = new AtomicReference<>();
  MonoProcessor<String> processor = mp.doOnError(RuntimeException.class, ref::set)
                    .toProcessor();
  processor.subscribe();
  assertThat(processor.getError()).isInstanceOf(TestException.class);
  assertThat(ref.get()).isNull();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void monoProcessorBlockZeroIsImmediateTimeout() {
  long start = System.nanoTime();
  assertThatExceptionOfType(IllegalStateException.class)
      .isThrownBy(() -> Mono.just("foo")
                 .delayElement(Duration.ofMillis(500))
                 .toProcessor()
                 .block(Duration.ZERO))
      .withMessage("Timeout on Mono blocking read");
  assertThat(Duration.ofNanos(System.nanoTime() - start))
      .isLessThan(Duration.ofMillis(500));
}

代码示例来源:origin: reactor/reactor-core

@Test
public void MonoProcessorThenFulfill() {
  MonoProcessor<Integer> mp = MonoProcessor.create();
  mp.onNext(1);
  MonoProcessor<Integer> mp2 = mp.flatMap(s -> Mono.just(s * 2))
                  .toProcessor();
  mp2.subscribe();
  assertThat(mp2.isTerminated()).isTrue();
  assertThat(mp2.isSuccess()).isTrue();
  assertThat(mp2.peek()).isEqualTo(2);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void monoProcessorBlockNegativeIsImmediateTimeout() {
  long start = System.nanoTime();
  assertThatExceptionOfType(IllegalStateException.class)
      .isThrownBy(() -> Mono.just("foo")
                 .delayElement(Duration.ofMillis(500))
                 .toProcessor()
                 .block(Duration.ofSeconds(-1)))
      .withMessage("Timeout on Mono blocking read");
  assertThat(Duration.ofNanos(System.nanoTime() - start))
      .isLessThan(Duration.ofMillis(500));
}

代码示例来源:origin: reactor/reactor-core

@Test
  public void fluxCanBeEnforcedToDispatchValuesWithKeysDistinctFromPredecessors() {
//        "A Flux can be enforced to dispatch values with keys distinct from their immediate predecessors keys"
//        given:"a composable with values 1 to 5 with duplicate keys"
    Flux<Integer> s = Flux.fromIterable(Arrays.asList(2, 4, 3, 5, 2, 5));

//        when:"the values are filtered and result is collected"
    MonoProcessor<List<Integer>> tap = s.distinctUntilChanged(it -> it % 2 == 0)
                      .collectList()
                      .toProcessor();

//        then:"collected must remove duplicates"
    assertThat(tap.block()).containsExactly(2, 3, 2, 5);
  }

代码示例来源:origin: reactor/reactor-core

@Test
public void MonoProcessorMapFulfill() {
  MonoProcessor<Integer> mp = MonoProcessor.create();
  mp.onNext(1);
  MonoProcessor<Integer> mp2 = mp.map(s -> s * 2)
                  .toProcessor();
  mp2.subscribe();
  assertThat(mp2.isTerminated()).isTrue();
  assertThat(mp2.isSuccess()).isTrue();
  assertThat(mp2.peek()).isEqualTo(2);
}

代码示例来源:origin: reactor/reactor-core

@Test
  public void fluxCanBeEnforcedToDispatchValuesHavingDistinctKeys() {
//        "A Flux can be enforced to dispatch values having distinct keys"
//        given: "a composable with values 1 to 4 with duplicate keys"
    Flux<Integer> s = Flux.fromIterable(Arrays.asList(1, 2, 3, 1, 2, 3, 4));

//        when: "the values are filtered and result is collected"
    MonoProcessor<List<Integer>> tap = s.distinct(it -> it % 3)
                      .collectList()
                      .toProcessor();
    tap.subscribe();

//        then: "collected should be without duplicates"
    assertThat(tap.block()).containsExactly(1, 2, 3);
  }

代码示例来源:origin: reactor/reactor-core

@Test
public void monoToProcessorReusesInstance() {
  MonoProcessor<String> monoProcessor = Mono.just("foo")
                       .toProcessor();
  assertThat(monoProcessor)
      .isSameAs(monoProcessor.toProcessor())
      .isSameAs(monoProcessor.subscribe());
}

代码示例来源:origin: reactor/reactor-core

@Test
public void cancel() {
  TestPublisher<String> cancelTester = TestPublisher.create();
  MonoProcessor<Integer> processor = cancelTester.mono()
                          .flatMap(s -> Mono.just(s.length()))
                          .toProcessor();
  processor.subscribe();
  processor.cancel();
  cancelTester.assertCancelled();
}

代码示例来源:origin: reactor/reactor-core

@Test
  public void fluxCanBeEnforcedToDispatchValuesDistinctFromPredecessors() {
//        "A Flux can be enforced to dispatch values distinct from their immediate predecessors"
//        given:"a composable with values 1 to 3 with duplicates"
    Flux<Integer> s = Flux.fromIterable(Arrays.asList(1, 1, 2, 2, 3));

//        when:"the values are filtered and result is collected"
    MonoProcessor<List<Integer>> tap = s.distinctUntilChanged()
                      .collectList()
                      .toProcessor();
    tap.subscribe();

//        then:"collected must remove duplicates"
    assertThat(tap.block()).containsExactly(1, 2, 3);
  }

代码示例来源:origin: reactor/reactor-core

@Test
public void monoToProcessorChain() {
  StepVerifier.withVirtualTime(() -> Mono.just("foo")
                      .toProcessor()
                      .delayElement(Duration.ofMillis(500)))
        .expectSubscription()
        .expectNoEvent(Duration.ofMillis(500))
        .expectNext("foo")
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void cancel() {
  TestPublisher<String> cancelTester = TestPublisher.create();
  MonoProcessor<Boolean> processor = cancelTester.flux()
                          .any(s -> s.length() > 100)
                          .toProcessor();
  processor.subscribe();
  processor.cancel();
  cancelTester.assertCancelled();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void cancel() {
  TestPublisher<String> cancelTester = TestPublisher.create();
  MonoProcessor<String> processor = cancelTester.flux()
                         .next()
                         .toProcessor();
  processor.subscribe();
  processor.cancel();
  cancelTester.assertCancelled();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void cancel() {
  TestPublisher<String> cancelTester = TestPublisher.create();
  MonoProcessor<Void> processor = cancelTester.flux()
                        .then()
                        .toProcessor();
  processor.subscribe();
  processor.cancel();
  cancelTester.assertCancelled();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void cancel() {
  TestPublisher<String> cancelTester = TestPublisher.create();
  MonoProcessor<String> processor = cancelTester.flux()
                         .elementAt(1000)
                         .toProcessor();
  processor.subscribe();
  processor.cancel();
  cancelTester.assertCancelled();
}

相关文章

Mono类方法