reactor.core.publisher.Mono.subscribeWith()方法的使用及代码示例

x33g5p2x  于2022-01-24 转载在 其他  
字(8.8k)|赞(0)|评价(0)|浏览(385)

本文整理了Java中reactor.core.publisher.Mono.subscribeWith()方法的一些代码示例,展示了Mono.subscribeWith()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Mono.subscribeWith()方法的具体详情如下:
包路径:reactor.core.publisher.Mono
类名称:Mono
方法名:subscribeWith

Mono.subscribeWith介绍

[英]Subscribe the given Subscriber to this Mono and return said Subscriber (eg. a MonoProcessor).
[中]订阅此Mono并返回所述订户(如单处理器)。

代码示例

代码示例来源:origin: reactor/reactor-core

/**
 * Transform this {@link Mono} into a {@link CompletableFuture} completing on onNext or onComplete and failing on
 * onError.
 *
 * <p>
 * <img class="marble" src="doc-files/marbles/toFuture.svg" alt="">
 *
 * @return a {@link CompletableFuture}
 */
public final CompletableFuture<T> toFuture() {
  return subscribeWith(new MonoToCompletableFuture<>());
}

代码示例来源:origin: spring-projects/spring-framework

/**
 * Invoke the method for the given exchange.
 * @param exchange the current exchange
 * @param bindingContext the binding context to use
 * @param providedArgs optional list of argument values to match by type
 * @return a Mono with a {@link HandlerResult}.
 * @throws ServerErrorException if method argument resolution or method invocation fails
 */
@Nullable
public HandlerResult invokeForHandlerResult(ServerWebExchange exchange,
    BindingContext bindingContext, Object... providedArgs) {
  MonoProcessor<HandlerResult> processor = MonoProcessor.create();
  this.delegate.invoke(exchange, bindingContext, providedArgs).subscribeWith(processor);
  if (processor.isTerminated()) {
    Throwable ex = processor.getError();
    if (ex != null) {
      throw (ex instanceof ServerErrorException ? (ServerErrorException) ex :
          new ServerErrorException("Failed to invoke: " + getShortLogMessage(), getMethod(), ex));
    }
    return processor.peek();
  }
  else {
    // Should never happen...
    throw new IllegalStateException(
        "SyncInvocableHandlerMethod should have completed synchronously.");
  }
}

代码示例来源:origin: reactor/reactor-core

@Test
public void whenMonoError() {
  MonoProcessor<Tuple2<Integer, Integer>> mp = MonoProcessor.create();
  StepVerifier.create(Mono.zip(Mono.<Integer>error(new Exception("test1")),
      Mono.<Integer>error(new Exception("test2")))
              .subscribeWith(mp))
        .then(() -> assertThat(mp.isError()).isTrue())
        .then(() -> assertThat(mp.isSuccess()).isFalse())
        .then(() -> assertThat(mp.isTerminated()).isTrue())
        .verifyErrorSatisfies(e -> assertThat(e).hasMessage("test1"));
}

代码示例来源:origin: reactor/reactor-core

@Test
public void firstMonoJust() {
  MonoProcessor<Integer> mp = MonoProcessor.create();
  StepVerifier.create(Mono.first(Mono.just(1), Mono.just(2))
              .subscribeWith(mp))
        .then(() -> assertThat(mp.isError()).isFalse())
        .then(() -> assertThat(mp.isSuccess()).isTrue())
        .then(() -> assertThat(mp.isTerminated()).isTrue())
        .expectNext(1)
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void whenDelayJustMono() {
  MonoProcessor<Tuple2<Integer, Integer>> mp = MonoProcessor.create();
  StepVerifier.create(Mono.zipDelayError(Mono.just(1), Mono.just(2))
              .subscribeWith(mp))
        .then(() -> assertThat(mp.isError()).isFalse())
        .then(() -> assertThat(mp.isSuccess()).isTrue())
        .then(() -> assertThat(mp.isTerminated()).isTrue())
        .assertNext(v -> assertThat(v.getT1() == 1 && v.getT2() == 2).isTrue())
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void whenDelayJustMono3() {
  MonoProcessor<Tuple3<Integer, Integer, Integer>> mp = MonoProcessor.create();
  StepVerifier.create(Mono.zipDelayError(Mono.just(1), Mono.just(2), Mono.just(3))
              .subscribeWith(mp))
        .then(() -> assertThat(mp.isError()).isFalse())
        .then(() -> assertThat(mp.isSuccess()).isTrue())
        .then(() -> assertThat(mp.isTerminated()).isTrue())
        .assertNext(v -> assertThat(v.getT1() == 1 && v.getT2() == 2 && v.getT3() == 3).isTrue())
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void whenMonoJust() {
  MonoProcessor<Tuple2<Integer, Integer>> mp = MonoProcessor.create();
  StepVerifier.create(Mono.zip(Mono.just(1), Mono.just(2))
              .subscribeWith(mp))
        .then(() -> assertThat(mp.isError()).isFalse())
        .then(() -> assertThat(mp.isSuccess()).isTrue())
        .then(() -> assertThat(mp.isTerminated()).isTrue())
        .assertNext(v -> assertThat(v.getT1() == 1 && v.getT2() == 2).isTrue())
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void normal() {
  AtomicInteger n = new AtomicInteger();
  Mono<Integer> m = Mono.fromSupplier(n::incrementAndGet);
  m.subscribeWith(AssertSubscriber.create())
      .assertValues(1)
      .assertComplete();
  m.subscribeWith(AssertSubscriber.create())
      .assertValues(2)
      .assertComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
  public void filterMonoNot() {
    MonoProcessor<Integer> mp = MonoProcessor.create();
    StepVerifier.create(Mono.just(1).filter(s -> s % 2 == 0).subscribeWith(mp))
          .then(() -> assertThat(mp.isError()).isFalse())
          .then(() -> assertThat(mp.isSuccess()).isTrue())
          .then(() -> assertThat(mp.peek()).isNull())
          .then(() -> assertThat(mp.isTerminated()).isTrue())
          .verifyComplete();
  }
}

代码示例来源:origin: reactor/reactor-core

@Test
public void pairWiseIterable() {
  Mono<Integer> f = Mono.first(Arrays.asList(Mono.just(1), Mono.just(2)))
             .or(Mono.just(3));
  Assert.assertTrue(f instanceof MonoFirst);
  MonoFirst<Integer> s = (MonoFirst<Integer>) f;
  Assert.assertTrue(s.array != null);
  Assert.assertTrue(s.array.length == 2);
  f.subscribeWith(AssertSubscriber.create())
   .assertValues(1)
   .assertComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void normal() {
  Mono.just(1)
    .handle((v, s) -> s.next(v * 2))
    .subscribeWith(AssertSubscriber.create())
    .assertContainValues(singleton(2))
    .assertNoError()
    .assertComplete();
}
@Test

代码示例来源:origin: reactor/reactor-core

@Test
  public void otherwiseReturnErrorUnfilter2() {
    MonoProcessor<Integer> mp = MonoProcessor.create();
    StepVerifier.create(Mono.<Integer>error(new TestException())
        .onErrorReturn(RuntimeException.class::isInstance, 1)
        .subscribeWith(mp))
          .then(() -> assertThat(mp.isError()).isTrue())
          .then(() -> assertThat(mp.isSuccess()).isFalse())
          .then(() -> assertThat(mp.isTerminated()).isTrue())
          .verifyError(TestException.class);
  }
}

代码示例来源:origin: reactor/reactor-core

@Test
public void otherwiseReturnErrorUnfilter() {
  MonoProcessor<Integer> mp = MonoProcessor.create();
  StepVerifier.create(Mono.<Integer>error(new TestException())
      .onErrorReturn(RuntimeException.class, 1)
      .subscribeWith(mp))
        .then(() -> assertThat(mp.isError()).isTrue())
        .then(() -> assertThat(mp.isSuccess()).isFalse())
        .then(() -> assertThat(mp.isTerminated()).isTrue())
        .verifyError(TestException.class);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void whenMonoCallable() {
  MonoProcessor<Tuple2<Integer, Integer>> mp = MonoProcessor.create();
  StepVerifier.create(Mono.zip(Mono.fromCallable(() -> 1),
      Mono.fromCallable(() -> 2))
              .subscribeWith(mp))
        .then(() -> assertThat(mp.isError()).isFalse())
        .then(() -> assertThat(mp.isSuccess()).isTrue())
        .then(() -> assertThat(mp.isTerminated()).isTrue())
        .assertNext(v -> assertThat(v.getT1() == 1 && v.getT2() == 2).isTrue())
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void callableEvaluatedTheRightTime() {
  AtomicInteger count = new AtomicInteger();
  Mono<Integer> p = Mono.fromCallable(count::incrementAndGet)
             .subscribeOn(Schedulers.fromExecutorService(ForkJoinPool.commonPool()));
  Assert.assertEquals(0, count.get());
  p.subscribeWith(AssertSubscriber.create())
   .await();
  Assert.assertEquals(1, count.get());
}

代码示例来源:origin: reactor/reactor-core

@Test
public void callableEvaluatedTheRightTime() {
  AtomicInteger count = new AtomicInteger();
  Mono<Integer> p = Mono.fromCallable(count::incrementAndGet).subscribeOn(Schedulers.fromExecutorService(ForkJoinPool.commonPool()));
  Assert.assertEquals(0, count.get());
  p.subscribeWith(AssertSubscriber.create()).await();
  Assert.assertEquals(1, count.get());
}

代码示例来源:origin: reactor/reactor-core

@Test
public void otherwiseReturnErrorFilter() {
  MonoProcessor<Integer> mp = MonoProcessor.create();
  StepVerifier.create(Mono.<Integer>error(new TestException())
      .onErrorReturn(TestException.class, 1)
      .subscribeWith(mp))
        .then(() -> assertThat(mp.isError()).isFalse())
        .then(() -> assertThat(mp.isSuccess()).isTrue())
        .then(() -> assertThat(mp.isTerminated()).isTrue())
        .expectNext(1)
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void filterNullMapResult() {
  Mono.just(1)
    .handle((v, s) -> { /*ignore*/ })
    .subscribeWith(AssertSubscriber.create())
    .assertValueCount(0)
    .assertNoError()
    .assertComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void pairWise() {
  Mono<Void> f = Mono.just(1)
            .and(Mono.just("test2"));
  Assert.assertTrue(f instanceof MonoWhen);
  MonoWhen s = (MonoWhen) f;
  Assert.assertTrue(s.sources != null);
  Assert.assertTrue(s.sources.length == 2);
  f.subscribeWith(AssertSubscriber.create())
   .assertComplete()
  .assertNoValues();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void normalHide() {
  Mono.just(1)
    .hide()
    .handle((v, s) -> s.next(v * 2))
    .subscribeWith(AssertSubscriber.create())
    .assertContainValues(singleton(2))
    .assertNoError()
    .assertComplete();
}

相关文章

Mono类方法