本文整理了Java中reactor.core.publisher.Mono.toProcessor()
方法的一些代码示例,展示了Mono.toProcessor()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Mono.toProcessor()
方法的具体详情如下:
包路径:reactor.core.publisher.Mono
类名称:Mono
方法名:toProcessor
[英]Wrap this Mono into a MonoProcessor (turning it hot and allowing to block, cancel, as well as many other operations). Note that the MonoProcessoris subscribed to its parent source if any.
[中]
代码示例来源:origin: spring-projects/spring-framework
public MonoToListenableFutureAdapter(Mono<T> mono) {
Assert.notNull(mono, "Mono must not be null");
this.processor = mono
.doOnSuccess(this.registry::success)
.doOnError(this.registry::failure)
.toProcessor();
}
代码示例来源:origin: org.springframework/spring-core
public MonoToListenableFutureAdapter(Mono<T> mono) {
Assert.notNull(mono, "Mono must not be null");
this.processor = mono
.doOnSuccess(this.registry::success)
.doOnError(this.registry::failure)
.toProcessor();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void monoNotCancelledByMonoProcessor() {
AtomicLong cancelCounter = new AtomicLong();
MonoProcessor<String> monoProcessor = Mono.just("foo")
.doOnCancel(cancelCounter::incrementAndGet)
.toProcessor();
monoProcessor.subscribe();
assertThat(cancelCounter.get()).isEqualTo(0);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void monoProcessorBlockIsUnbounded() {
long start = System.nanoTime();
String result = Mono.just("foo")
.delayElement(Duration.ofMillis(500))
.toProcessor()
.block();
assertThat(result).isEqualTo("foo");
assertThat(Duration.ofNanos(System.nanoTime() - start))
.isGreaterThanOrEqualTo(Duration.ofMillis(500));
}
代码示例来源:origin: reactor/reactor-core
@Test
public void monoToProcessorConnects() {
TestPublisher<String> tp = TestPublisher.create();
MonoProcessor<String> connectedProcessor = tp.mono().toProcessor();
assertThat(connectedProcessor.subscription).isNotNull();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void onMonoRejectedDoOnErrorClazzNot() {
Mono<String> mp = Mono.error(new TestException());
AtomicReference<Throwable> ref = new AtomicReference<>();
MonoProcessor<String> processor = mp.doOnError(RuntimeException.class, ref::set)
.toProcessor();
processor.subscribe();
assertThat(processor.getError()).isInstanceOf(TestException.class);
assertThat(ref.get()).isNull();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void monoProcessorBlockZeroIsImmediateTimeout() {
long start = System.nanoTime();
assertThatExceptionOfType(IllegalStateException.class)
.isThrownBy(() -> Mono.just("foo")
.delayElement(Duration.ofMillis(500))
.toProcessor()
.block(Duration.ZERO))
.withMessage("Timeout on Mono blocking read");
assertThat(Duration.ofNanos(System.nanoTime() - start))
.isLessThan(Duration.ofMillis(500));
}
代码示例来源:origin: reactor/reactor-core
@Test
public void MonoProcessorThenFulfill() {
MonoProcessor<Integer> mp = MonoProcessor.create();
mp.onNext(1);
MonoProcessor<Integer> mp2 = mp.flatMap(s -> Mono.just(s * 2))
.toProcessor();
mp2.subscribe();
assertThat(mp2.isTerminated()).isTrue();
assertThat(mp2.isSuccess()).isTrue();
assertThat(mp2.peek()).isEqualTo(2);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void monoProcessorBlockNegativeIsImmediateTimeout() {
long start = System.nanoTime();
assertThatExceptionOfType(IllegalStateException.class)
.isThrownBy(() -> Mono.just("foo")
.delayElement(Duration.ofMillis(500))
.toProcessor()
.block(Duration.ofSeconds(-1)))
.withMessage("Timeout on Mono blocking read");
assertThat(Duration.ofNanos(System.nanoTime() - start))
.isLessThan(Duration.ofMillis(500));
}
代码示例来源:origin: reactor/reactor-core
@Test
public void fluxCanBeEnforcedToDispatchValuesWithKeysDistinctFromPredecessors() {
// "A Flux can be enforced to dispatch values with keys distinct from their immediate predecessors keys"
// given:"a composable with values 1 to 5 with duplicate keys"
Flux<Integer> s = Flux.fromIterable(Arrays.asList(2, 4, 3, 5, 2, 5));
// when:"the values are filtered and result is collected"
MonoProcessor<List<Integer>> tap = s.distinctUntilChanged(it -> it % 2 == 0)
.collectList()
.toProcessor();
// then:"collected must remove duplicates"
assertThat(tap.block()).containsExactly(2, 3, 2, 5);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void MonoProcessorMapFulfill() {
MonoProcessor<Integer> mp = MonoProcessor.create();
mp.onNext(1);
MonoProcessor<Integer> mp2 = mp.map(s -> s * 2)
.toProcessor();
mp2.subscribe();
assertThat(mp2.isTerminated()).isTrue();
assertThat(mp2.isSuccess()).isTrue();
assertThat(mp2.peek()).isEqualTo(2);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void fluxCanBeEnforcedToDispatchValuesHavingDistinctKeys() {
// "A Flux can be enforced to dispatch values having distinct keys"
// given: "a composable with values 1 to 4 with duplicate keys"
Flux<Integer> s = Flux.fromIterable(Arrays.asList(1, 2, 3, 1, 2, 3, 4));
// when: "the values are filtered and result is collected"
MonoProcessor<List<Integer>> tap = s.distinct(it -> it % 3)
.collectList()
.toProcessor();
tap.subscribe();
// then: "collected should be without duplicates"
assertThat(tap.block()).containsExactly(1, 2, 3);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void monoToProcessorReusesInstance() {
MonoProcessor<String> monoProcessor = Mono.just("foo")
.toProcessor();
assertThat(monoProcessor)
.isSameAs(monoProcessor.toProcessor())
.isSameAs(monoProcessor.subscribe());
}
代码示例来源:origin: reactor/reactor-core
@Test
public void cancel() {
TestPublisher<String> cancelTester = TestPublisher.create();
MonoProcessor<Integer> processor = cancelTester.mono()
.flatMap(s -> Mono.just(s.length()))
.toProcessor();
processor.subscribe();
processor.cancel();
cancelTester.assertCancelled();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void fluxCanBeEnforcedToDispatchValuesDistinctFromPredecessors() {
// "A Flux can be enforced to dispatch values distinct from their immediate predecessors"
// given:"a composable with values 1 to 3 with duplicates"
Flux<Integer> s = Flux.fromIterable(Arrays.asList(1, 1, 2, 2, 3));
// when:"the values are filtered and result is collected"
MonoProcessor<List<Integer>> tap = s.distinctUntilChanged()
.collectList()
.toProcessor();
tap.subscribe();
// then:"collected must remove duplicates"
assertThat(tap.block()).containsExactly(1, 2, 3);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void monoToProcessorChain() {
StepVerifier.withVirtualTime(() -> Mono.just("foo")
.toProcessor()
.delayElement(Duration.ofMillis(500)))
.expectSubscription()
.expectNoEvent(Duration.ofMillis(500))
.expectNext("foo")
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void cancel() {
TestPublisher<String> cancelTester = TestPublisher.create();
MonoProcessor<Boolean> processor = cancelTester.flux()
.any(s -> s.length() > 100)
.toProcessor();
processor.subscribe();
processor.cancel();
cancelTester.assertCancelled();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void cancel() {
TestPublisher<String> cancelTester = TestPublisher.create();
MonoProcessor<String> processor = cancelTester.flux()
.next()
.toProcessor();
processor.subscribe();
processor.cancel();
cancelTester.assertCancelled();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void cancel() {
TestPublisher<String> cancelTester = TestPublisher.create();
MonoProcessor<Void> processor = cancelTester.flux()
.then()
.toProcessor();
processor.subscribe();
processor.cancel();
cancelTester.assertCancelled();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void cancel() {
TestPublisher<String> cancelTester = TestPublisher.create();
MonoProcessor<String> processor = cancelTester.flux()
.elementAt(1000)
.toProcessor();
processor.subscribe();
processor.cancel();
cancelTester.assertCancelled();
}
内容来源于网络,如有侵权,请联系作者删除!