本文整理了Java中reactor.core.publisher.Mono.doOnEach()
方法的一些代码示例,展示了Mono.doOnEach()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Mono.doOnEach()
方法的具体详情如下:
包路径:reactor.core.publisher.Mono
类名称:Mono
方法名:doOnEach
[英]Add behavior triggered when the Mono emits an item, fails with an error or completes successfully. All these events are represented as a Signalthat is passed to the side-effect callback. Note that this is an advanced operator, typically used for monitoring of a Mono. These Signal have a Context associated to them.
[中]当Mono发出一个项目、出现错误失败或成功完成时触发的添加行为。所有这些事件都表示为传递给副作用回调的信号。请注意,这是一个高级操作员,通常用于监控单声道。这些信号具有与其关联的上下文。
代码示例来源:origin: reactor/reactor-core
@Test(expected = NullPointerException.class)
public void testDoOnEachSignalNullConsumer() {
Mono.just(1).doOnEach(null);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void shallExecuteSideEffectsCallback() {
Flux<Integer> result = Mono.just(Arrays.asList(1, 2))
//.doOnEach(sig -> System.out.println("SIGNAL beforeMap " + sig))// <- if enabled than everything is fine
.map(x -> x)
.doOnEach(sig -> {throw new RuntimeException("expected");})
.flatMapIterable(Function.identity());
StepVerifier.create(result).expectError().verify();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void testDoOnEachEmpty() {
List<Signal<Integer>> signals = new ArrayList<>(4);
Mono<Integer> mono = Mono.<Integer>empty()
.doOnEach(signals::add);
StepVerifier.create(mono)
.expectSubscription()
.expectComplete()
.verify();
assertThat(signals.size(), is(1));
assertTrue("onComplete expected", signals.get(0).isOnComplete());
}
代码示例来源:origin: reactor/reactor-core
@Test
public void testDoOnEachSignalWithError() {
List<Signal<Integer>> signals = new ArrayList<>(4);
Mono<Integer> mono = Mono.<Integer>error(new IllegalArgumentException("foo"))
.doOnEach(signals::add);
StepVerifier.create(mono)
.expectSubscription()
.expectErrorMessage("foo")
.verify();
assertThat(signals.size(), is(1));
assertTrue("onError expected", signals.get(0).isOnError());
assertThat("plain exception expected", signals.get(0).getThrowable().getMessage(),
is("foo"));
}
代码示例来源:origin: reactor/reactor-core
@Test
public void testDoOnEachSignal() {
List<Signal<Integer>> signals = new ArrayList<>(4);
Mono<Integer> mono = Mono.just(1)
.doOnEach(signals::add);
StepVerifier.create(mono)
.expectSubscription()
.expectNext(1)
.expectComplete()
.verify();
assertThat(signals.size(), is(2));
assertThat("onNext", signals.get(0).get(), is(1));
assertTrue("onComplete expected", signals.get(1).isOnComplete());
}
代码示例来源:origin: reactor/reactor-core
@Test
public void error() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
AtomicInteger onNext = new AtomicInteger();
AtomicReference<Throwable> onError = new AtomicReference<>();
AtomicBoolean onComplete = new AtomicBoolean();
Mono.<Integer>error(new RuntimeException("forced failure"))
.doOnEach(s -> {
if (s.isOnNext()) {
onNext.incrementAndGet();
}
else if (s.isOnError()) {
onError.set(s.getThrowable());
}
else if (s.isOnComplete()) {
onComplete.set(true);
}
})
.subscribe(ts);
assertThat(onNext.get()).isZero();
assertThat(onError.get()).isInstanceOf(RuntimeException.class)
.hasMessage("forced failure");
assertThat(onComplete.get()).isFalse();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void normal() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
AtomicInteger onNext = new AtomicInteger();
AtomicReference<Throwable> onError = new AtomicReference<>();
AtomicBoolean onComplete = new AtomicBoolean();
Mono.just(1)
.hide()
.doOnEach(s -> {
if (s.isOnNext()) {
onNext.incrementAndGet();
}
else if (s.isOnError()) {
onError.set(s.getThrowable());
}
else if (s.isOnComplete()) {
onComplete.set(true);
}
})
.subscribe(ts);
assertThat(onNext.get()).isEqualTo(1);
assertThat(onError.get()).isNull();
assertThat(onComplete.get()).isTrue();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void empty() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
AtomicInteger onNext = new AtomicInteger();
AtomicReference<Throwable> onError = new AtomicReference<>();
AtomicBoolean onComplete = new AtomicBoolean();
Mono.<Integer>empty()
.doOnEach(s -> {
if (s.isOnNext()) {
onNext.incrementAndGet();
}
else if (s.isOnError()) {
onError.set(s.getThrowable());
}
else if (s.isOnComplete()) {
onComplete.set(true);
}
})
.subscribe(ts);
assertThat(onNext.get()).isZero();
assertThat(onError.get()).isNull();
assertThat(onComplete.get()).isTrue();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void nextError() {
List<Tuple2<Signal, Context>> signalsAndContext = new ArrayList<>();
Mono.just(0)
.map(i -> 10 / i)
.doOnEach(s -> signalsAndContext.add(Tuples.of(s,s.getContext())))
.subscriberContext(Context.of("foo", "bar"))
.subscribe();
assertThat(signalsAndContext)
.hasSize(1)
.allSatisfy(t2 -> {
assertThat(t2.getT1())
.isNotNull();
assertThat(t2.getT2().getOrDefault("foo", "baz"))
.isEqualTo("bar");
});
assertThat(signalsAndContext.stream().map(t2 -> t2.getT1().getType()))
.containsExactly(SignalType.ON_ERROR);
}
}
代码示例来源:origin: reactor/reactor-core
@Test
public void never() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
AtomicInteger onNext = new AtomicInteger();
AtomicReference<Throwable> onError = new AtomicReference<>();
AtomicBoolean onComplete = new AtomicBoolean();
Mono.<Integer>never()
.doOnEach(s -> {
if (s.isOnNext()) {
onNext.incrementAndGet();
}
else if (s.isOnError()) {
onError.set(s.getThrowable());
}
else if (s.isOnComplete()) {
onComplete.set(true);
}
})
.subscribe(ts);
assertThat(onNext.get()).isZero();
assertThat(onError.get()).isNull();
assertThat(onComplete.get()).isFalse();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void nextComplete() {
List<Tuple2<Signal, Context>> signalsAndContext = new ArrayList<>();
Mono.just(1)
.hide()
.doOnEach(s -> signalsAndContext.add(Tuples.of(s, s.getContext())))
.subscriberContext(Context.of("foo", "bar"))
.subscribe();
assertThat(signalsAndContext)
.hasSize(2)
.allSatisfy(t2 -> {
assertThat(t2.getT1())
.isNotNull();
assertThat(t2.getT2().getOrDefault("foo", "baz"))
.isEqualTo("bar");
});
assertThat(signalsAndContext.stream().map(t2 -> t2.getT1().getType()))
.containsExactly(SignalType.ON_NEXT, SignalType.ON_COMPLETE);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void consumerError() {
LongAdder state = new LongAdder();
Throwable err = new Exception("test");
StepVerifier.create(
Mono.just(1)
.doOnEach(s -> {
if (s.isOnNext()) {
state.increment();
throw Exceptions.propagate(err);
}
}))
.expectErrorMessage("test")
.verify();
assertThat(state.intValue()).isEqualTo(1);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void consumerBubbleError() {
LongAdder state = new LongAdder();
Throwable err = new Exception("test");
assertThatThrownBy(() ->
StepVerifier.create(
Mono.just(1)
.doOnEach(s -> {
if (s.isOnNext()) {
state.increment();
throw Exceptions.bubble(err);
}
}))
.expectErrorMessage("test")
.verify())
.isInstanceOf(RuntimeException.class)
.matches(Exceptions::isBubbling, "bubbling")
.hasCause(err); //equivalent to unwrap for this case
assertThat(state.intValue()).isEqualTo(1);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void testDoOnEachSignalToSubscriber() {
AssertSubscriber<Integer> peekSubscriber = AssertSubscriber.create();
Mono<Integer> mono = Mono.just(1)
.doOnEach(s -> s.accept(peekSubscriber));
StepVerifier.create(mono)
.expectSubscription()
.expectNext(1)
.expectComplete()
.verify();
peekSubscriber.assertNotSubscribed();
peekSubscriber.assertValues(1);
peekSubscriber.assertComplete();
}
代码示例来源:origin: org.springframework.boot/spring-boot-actuator
@Override
public Mono<ClientResponse> filter(ClientRequest clientRequest,
ExchangeFunction exchangeFunction) {
return exchangeFunction.exchange(clientRequest).doOnEach((signal) -> {
if (!signal.isOnComplete()) {
Long startTime = signal.getContext().get(METRICS_WEBCLIENT_START_TIME);
ClientResponse clientResponse = signal.get();
Throwable throwable = signal.getThrowable();
Iterable<Tag> tags = this.tagProvider.tags(clientRequest, clientResponse,
throwable);
Timer.builder(this.metricName).tags(tags)
.description("Timer of WebClient operation")
.register(this.meterRegistry)
.record(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
}
}).subscriberContext((context) -> context.put(METRICS_WEBCLIENT_START_TIME,
System.nanoTime()));
}
代码示例来源:origin: io.projectreactor.ipc/reactor-netty
private Mono<Void> applyWebsocketHandler(HttpClientWSOperations ops, Mono<Void> handshake,
BiFunction<? super WebsocketInbound, ? super WebsocketOutbound, ? extends Publisher<Void>> websocketHandler) {
if (websocketHandler != noopHandler()) {
handshake =
handshake.doOnEach(signal -> {
if(!signal.hasError()) {
websocketHandler.apply(ops, ops)
.subscribe(new WebsocketSubscriber(ops, signal.getContext()));
}
});
}
return handshake;
}
代码示例来源:origin: reactor/reactor-netty
final Mono<Void> withWebsocketSupport(String url,
@Nullable String protocols,
int maxFramePayloadLength,
BiFunction<? super WebsocketInbound, ? super WebsocketOutbound, ? extends Publisher<Void>> websocketHandler) {
Objects.requireNonNull(websocketHandler, "websocketHandler");
if (markSentHeaders()) {
WebsocketServerOperations
ops = new WebsocketServerOperations(url, protocols, maxFramePayloadLength, this);
if (rebind(ops)) {
return FutureMono.from(ops.handshakerResult)
.doOnEach(signal -> {
if(!signal.hasError() && (protocols == null || ops.selectedSubprotocol() != null)) {
websocketHandler.apply(ops, ops)
.subscribe(new WebsocketSubscriber(ops, signal.getContext()));
}
});
}
}
else {
log.error(format(channel(), "Cannot enable websocket if headers have already been sent"));
}
return Mono.error(new IllegalStateException("Failed to upgrade to websocket"));
}
代码示例来源:origin: io.projectreactor.netty/reactor-netty
final Mono<Void> withWebsocketSupport(String url,
@Nullable String protocols,
int maxFramePayloadLength,
BiFunction<? super WebsocketInbound, ? super WebsocketOutbound, ? extends Publisher<Void>> websocketHandler) {
Objects.requireNonNull(websocketHandler, "websocketHandler");
if (markSentHeaders()) {
WebsocketServerOperations
ops = new WebsocketServerOperations(url, protocols, maxFramePayloadLength, this);
if (rebind(ops)) {
return FutureMono.from(ops.handshakerResult)
.doOnEach(signal -> {
if(!signal.hasError() && (protocols == null || ops.selectedSubprotocol() != null)) {
websocketHandler.apply(ops, ops)
.subscribe(new WebsocketSubscriber(ops, signal.getContext()));
}
});
}
}
else {
log.error(format(channel(), "Cannot enable websocket if headers have already been sent"));
}
return Mono.error(new IllegalStateException("Failed to upgrade to websocket"));
}
代码示例来源:origin: io.projectreactor.ipc/reactor-netty
final Mono<Void> withWebsocketSupport(String url,
String protocols,
BiFunction<? super WebsocketInbound, ? super WebsocketOutbound, ? extends Publisher<Void>> websocketHandler) {
Objects.requireNonNull(websocketHandler, "websocketHandler");
if (markSentHeaders()) {
HttpServerWSOperations ops = new HttpServerWSOperations(url, protocols, this);
if (replace(ops)) {
return FutureMono.from(ops.handshakerResult)
.doOnEach(signal -> {
if(!signal.hasError()) {
websocketHandler.apply(ops, ops)
.subscribe(new WebsocketSubscriber(ops, signal.getContext()));
}
});
}
}
else {
log.error(format(channel(), "Cannot enable websocket if headers have already been sent"));
}
return Mono.error(new IllegalStateException("Failed to upgrade to websocket"));
}
内容来源于网络,如有侵权,请联系作者删除!