本文整理了Java中reactor.core.publisher.Mono.subscribeWith()
方法的一些代码示例,展示了Mono.subscribeWith()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Mono.subscribeWith()
方法的具体详情如下:
包路径:reactor.core.publisher.Mono
类名称:Mono
方法名:subscribeWith
[英]Subscribe the given Subscriber to this Mono and return said Subscriber (eg. a MonoProcessor).
[中]订阅此Mono并返回所述订户(如单处理器)。
代码示例来源:origin: reactor/reactor-core
/**
* Transform this {@link Mono} into a {@link CompletableFuture} completing on onNext or onComplete and failing on
* onError.
*
* <p>
* <img class="marble" src="doc-files/marbles/toFuture.svg" alt="">
*
* @return a {@link CompletableFuture}
*/
public final CompletableFuture<T> toFuture() {
return subscribeWith(new MonoToCompletableFuture<>());
}
代码示例来源:origin: spring-projects/spring-framework
/**
* Invoke the method for the given exchange.
* @param exchange the current exchange
* @param bindingContext the binding context to use
* @param providedArgs optional list of argument values to match by type
* @return a Mono with a {@link HandlerResult}.
* @throws ServerErrorException if method argument resolution or method invocation fails
*/
@Nullable
public HandlerResult invokeForHandlerResult(ServerWebExchange exchange,
BindingContext bindingContext, Object... providedArgs) {
MonoProcessor<HandlerResult> processor = MonoProcessor.create();
this.delegate.invoke(exchange, bindingContext, providedArgs).subscribeWith(processor);
if (processor.isTerminated()) {
Throwable ex = processor.getError();
if (ex != null) {
throw (ex instanceof ServerErrorException ? (ServerErrorException) ex :
new ServerErrorException("Failed to invoke: " + getShortLogMessage(), getMethod(), ex));
}
return processor.peek();
}
else {
// Should never happen...
throw new IllegalStateException(
"SyncInvocableHandlerMethod should have completed synchronously.");
}
}
代码示例来源:origin: reactor/reactor-core
@Test
public void whenMonoError() {
MonoProcessor<Tuple2<Integer, Integer>> mp = MonoProcessor.create();
StepVerifier.create(Mono.zip(Mono.<Integer>error(new Exception("test1")),
Mono.<Integer>error(new Exception("test2")))
.subscribeWith(mp))
.then(() -> assertThat(mp.isError()).isTrue())
.then(() -> assertThat(mp.isSuccess()).isFalse())
.then(() -> assertThat(mp.isTerminated()).isTrue())
.verifyErrorSatisfies(e -> assertThat(e).hasMessage("test1"));
}
代码示例来源:origin: reactor/reactor-core
@Test
public void firstMonoJust() {
MonoProcessor<Integer> mp = MonoProcessor.create();
StepVerifier.create(Mono.first(Mono.just(1), Mono.just(2))
.subscribeWith(mp))
.then(() -> assertThat(mp.isError()).isFalse())
.then(() -> assertThat(mp.isSuccess()).isTrue())
.then(() -> assertThat(mp.isTerminated()).isTrue())
.expectNext(1)
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void whenDelayJustMono() {
MonoProcessor<Tuple2<Integer, Integer>> mp = MonoProcessor.create();
StepVerifier.create(Mono.zipDelayError(Mono.just(1), Mono.just(2))
.subscribeWith(mp))
.then(() -> assertThat(mp.isError()).isFalse())
.then(() -> assertThat(mp.isSuccess()).isTrue())
.then(() -> assertThat(mp.isTerminated()).isTrue())
.assertNext(v -> assertThat(v.getT1() == 1 && v.getT2() == 2).isTrue())
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void whenDelayJustMono3() {
MonoProcessor<Tuple3<Integer, Integer, Integer>> mp = MonoProcessor.create();
StepVerifier.create(Mono.zipDelayError(Mono.just(1), Mono.just(2), Mono.just(3))
.subscribeWith(mp))
.then(() -> assertThat(mp.isError()).isFalse())
.then(() -> assertThat(mp.isSuccess()).isTrue())
.then(() -> assertThat(mp.isTerminated()).isTrue())
.assertNext(v -> assertThat(v.getT1() == 1 && v.getT2() == 2 && v.getT3() == 3).isTrue())
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void whenMonoJust() {
MonoProcessor<Tuple2<Integer, Integer>> mp = MonoProcessor.create();
StepVerifier.create(Mono.zip(Mono.just(1), Mono.just(2))
.subscribeWith(mp))
.then(() -> assertThat(mp.isError()).isFalse())
.then(() -> assertThat(mp.isSuccess()).isTrue())
.then(() -> assertThat(mp.isTerminated()).isTrue())
.assertNext(v -> assertThat(v.getT1() == 1 && v.getT2() == 2).isTrue())
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void normal() {
AtomicInteger n = new AtomicInteger();
Mono<Integer> m = Mono.fromSupplier(n::incrementAndGet);
m.subscribeWith(AssertSubscriber.create())
.assertValues(1)
.assertComplete();
m.subscribeWith(AssertSubscriber.create())
.assertValues(2)
.assertComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void filterMonoNot() {
MonoProcessor<Integer> mp = MonoProcessor.create();
StepVerifier.create(Mono.just(1).filter(s -> s % 2 == 0).subscribeWith(mp))
.then(() -> assertThat(mp.isError()).isFalse())
.then(() -> assertThat(mp.isSuccess()).isTrue())
.then(() -> assertThat(mp.peek()).isNull())
.then(() -> assertThat(mp.isTerminated()).isTrue())
.verifyComplete();
}
}
代码示例来源:origin: reactor/reactor-core
@Test
public void pairWiseIterable() {
Mono<Integer> f = Mono.first(Arrays.asList(Mono.just(1), Mono.just(2)))
.or(Mono.just(3));
Assert.assertTrue(f instanceof MonoFirst);
MonoFirst<Integer> s = (MonoFirst<Integer>) f;
Assert.assertTrue(s.array != null);
Assert.assertTrue(s.array.length == 2);
f.subscribeWith(AssertSubscriber.create())
.assertValues(1)
.assertComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void normal() {
Mono.just(1)
.handle((v, s) -> s.next(v * 2))
.subscribeWith(AssertSubscriber.create())
.assertContainValues(singleton(2))
.assertNoError()
.assertComplete();
}
@Test
代码示例来源:origin: reactor/reactor-core
@Test
public void otherwiseReturnErrorUnfilter2() {
MonoProcessor<Integer> mp = MonoProcessor.create();
StepVerifier.create(Mono.<Integer>error(new TestException())
.onErrorReturn(RuntimeException.class::isInstance, 1)
.subscribeWith(mp))
.then(() -> assertThat(mp.isError()).isTrue())
.then(() -> assertThat(mp.isSuccess()).isFalse())
.then(() -> assertThat(mp.isTerminated()).isTrue())
.verifyError(TestException.class);
}
}
代码示例来源:origin: reactor/reactor-core
@Test
public void otherwiseReturnErrorUnfilter() {
MonoProcessor<Integer> mp = MonoProcessor.create();
StepVerifier.create(Mono.<Integer>error(new TestException())
.onErrorReturn(RuntimeException.class, 1)
.subscribeWith(mp))
.then(() -> assertThat(mp.isError()).isTrue())
.then(() -> assertThat(mp.isSuccess()).isFalse())
.then(() -> assertThat(mp.isTerminated()).isTrue())
.verifyError(TestException.class);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void whenMonoCallable() {
MonoProcessor<Tuple2<Integer, Integer>> mp = MonoProcessor.create();
StepVerifier.create(Mono.zip(Mono.fromCallable(() -> 1),
Mono.fromCallable(() -> 2))
.subscribeWith(mp))
.then(() -> assertThat(mp.isError()).isFalse())
.then(() -> assertThat(mp.isSuccess()).isTrue())
.then(() -> assertThat(mp.isTerminated()).isTrue())
.assertNext(v -> assertThat(v.getT1() == 1 && v.getT2() == 2).isTrue())
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void callableEvaluatedTheRightTime() {
AtomicInteger count = new AtomicInteger();
Mono<Integer> p = Mono.fromCallable(count::incrementAndGet)
.subscribeOn(Schedulers.fromExecutorService(ForkJoinPool.commonPool()));
Assert.assertEquals(0, count.get());
p.subscribeWith(AssertSubscriber.create())
.await();
Assert.assertEquals(1, count.get());
}
代码示例来源:origin: reactor/reactor-core
@Test
public void callableEvaluatedTheRightTime() {
AtomicInteger count = new AtomicInteger();
Mono<Integer> p = Mono.fromCallable(count::incrementAndGet).subscribeOn(Schedulers.fromExecutorService(ForkJoinPool.commonPool()));
Assert.assertEquals(0, count.get());
p.subscribeWith(AssertSubscriber.create()).await();
Assert.assertEquals(1, count.get());
}
代码示例来源:origin: reactor/reactor-core
@Test
public void otherwiseReturnErrorFilter() {
MonoProcessor<Integer> mp = MonoProcessor.create();
StepVerifier.create(Mono.<Integer>error(new TestException())
.onErrorReturn(TestException.class, 1)
.subscribeWith(mp))
.then(() -> assertThat(mp.isError()).isFalse())
.then(() -> assertThat(mp.isSuccess()).isTrue())
.then(() -> assertThat(mp.isTerminated()).isTrue())
.expectNext(1)
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void filterNullMapResult() {
Mono.just(1)
.handle((v, s) -> { /*ignore*/ })
.subscribeWith(AssertSubscriber.create())
.assertValueCount(0)
.assertNoError()
.assertComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void pairWise() {
Mono<Void> f = Mono.just(1)
.and(Mono.just("test2"));
Assert.assertTrue(f instanceof MonoWhen);
MonoWhen s = (MonoWhen) f;
Assert.assertTrue(s.sources != null);
Assert.assertTrue(s.sources.length == 2);
f.subscribeWith(AssertSubscriber.create())
.assertComplete()
.assertNoValues();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void normalHide() {
Mono.just(1)
.hide()
.handle((v, s) -> s.next(v * 2))
.subscribeWith(AssertSubscriber.create())
.assertContainValues(singleton(2))
.assertNoError()
.assertComplete();
}
内容来源于网络,如有侵权,请联系作者删除!