reactor.core.publisher.Mono.doOnEach()方法的使用及代码示例

x33g5p2x  于2022-01-24 转载在 其他  
字(11.4k)|赞(0)|评价(0)|浏览(636)

本文整理了Java中reactor.core.publisher.Mono.doOnEach()方法的一些代码示例,展示了Mono.doOnEach()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Mono.doOnEach()方法的具体详情如下:
包路径:reactor.core.publisher.Mono
类名称:Mono
方法名:doOnEach

Mono.doOnEach介绍

[英]Add behavior triggered when the Mono emits an item, fails with an error or completes successfully. All these events are represented as a Signalthat is passed to the side-effect callback. Note that this is an advanced operator, typically used for monitoring of a Mono. These Signal have a Context associated to them.
[中]当Mono发出一个项目、出现错误失败或成功完成时触发的添加行为。所有这些事件都表示为传递给副作用回调的信号。请注意,这是一个高级操作员,通常用于监控单声道。这些信号具有与其关联的上下文。

代码示例

代码示例来源:origin: reactor/reactor-core

@Test(expected = NullPointerException.class)
public void testDoOnEachSignalNullConsumer() {
  Mono.just(1).doOnEach(null);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void shallExecuteSideEffectsCallback() {
  Flux<Integer> result = Mono.just(Arrays.asList(1, 2))
                //.doOnEach(sig -> System.out.println("SIGNAL beforeMap " + sig))// <- if enabled than everything is fine
                .map(x -> x)
                .doOnEach(sig -> {throw new RuntimeException("expected");})
                .flatMapIterable(Function.identity());
  StepVerifier.create(result).expectError().verify();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void testDoOnEachEmpty() {
  List<Signal<Integer>> signals = new ArrayList<>(4);
  Mono<Integer> mono = Mono.<Integer>empty()
               .doOnEach(signals::add);
  StepVerifier.create(mono)
        .expectSubscription()
        .expectComplete()
        .verify();
  assertThat(signals.size(), is(1));
  assertTrue("onComplete expected", signals.get(0).isOnComplete());
}

代码示例来源:origin: reactor/reactor-core

@Test
public void testDoOnEachSignalWithError() {
  List<Signal<Integer>> signals = new ArrayList<>(4);
  Mono<Integer> mono = Mono.<Integer>error(new IllegalArgumentException("foo"))
      .doOnEach(signals::add);
  StepVerifier.create(mono)
        .expectSubscription()
        .expectErrorMessage("foo")
        .verify();
  assertThat(signals.size(), is(1));
  assertTrue("onError expected", signals.get(0).isOnError());
  assertThat("plain exception expected", signals.get(0).getThrowable().getMessage(),
      is("foo"));
}

代码示例来源:origin: reactor/reactor-core

@Test
public void testDoOnEachSignal() {
  List<Signal<Integer>> signals = new ArrayList<>(4);
  Mono<Integer> mono = Mono.just(1)
               .doOnEach(signals::add);
  StepVerifier.create(mono)
        .expectSubscription()
        .expectNext(1)
        .expectComplete()
        .verify();
  assertThat(signals.size(), is(2));
  assertThat("onNext", signals.get(0).get(), is(1));
  assertTrue("onComplete expected", signals.get(1).isOnComplete());
}

代码示例来源:origin: reactor/reactor-core

@Test
public void error() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  AtomicInteger onNext = new AtomicInteger();
  AtomicReference<Throwable> onError = new AtomicReference<>();
  AtomicBoolean onComplete = new AtomicBoolean();
  Mono.<Integer>error(new RuntimeException("forced failure"))
      .doOnEach(s -> {
        if (s.isOnNext()) {
          onNext.incrementAndGet();
        }
        else if (s.isOnError()) {
          onError.set(s.getThrowable());
        }
        else if (s.isOnComplete()) {
          onComplete.set(true);
        }
      })
      .subscribe(ts);
  assertThat(onNext.get()).isZero();
  assertThat(onError.get()).isInstanceOf(RuntimeException.class)
               .hasMessage("forced failure");
  assertThat(onComplete.get()).isFalse();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void normal() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  AtomicInteger onNext = new AtomicInteger();
  AtomicReference<Throwable> onError = new AtomicReference<>();
  AtomicBoolean onComplete = new AtomicBoolean();
  Mono.just(1)
    .hide()
    .doOnEach(s -> {
      if (s.isOnNext()) {
        onNext.incrementAndGet();
      }
      else if (s.isOnError()) {
        onError.set(s.getThrowable());
      }
      else if (s.isOnComplete()) {
        onComplete.set(true);
      }
    })
    .subscribe(ts);
  assertThat(onNext.get()).isEqualTo(1);
  assertThat(onError.get()).isNull();
  assertThat(onComplete.get()).isTrue();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void empty() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  AtomicInteger onNext = new AtomicInteger();
  AtomicReference<Throwable> onError = new AtomicReference<>();
  AtomicBoolean onComplete = new AtomicBoolean();
  Mono.<Integer>empty()
      .doOnEach(s -> {
        if (s.isOnNext()) {
          onNext.incrementAndGet();
        }
        else if (s.isOnError()) {
          onError.set(s.getThrowable());
        }
        else if (s.isOnComplete()) {
          onComplete.set(true);
        }
      })
      .subscribe(ts);
  assertThat(onNext.get()).isZero();
  assertThat(onError.get()).isNull();
  assertThat(onComplete.get()).isTrue();
}

代码示例来源:origin: reactor/reactor-core

@Test
  public void nextError() {
    List<Tuple2<Signal, Context>> signalsAndContext = new ArrayList<>();
    Mono.just(0)
      .map(i -> 10 / i)
      .doOnEach(s -> signalsAndContext.add(Tuples.of(s,s.getContext())))
      .subscriberContext(Context.of("foo", "bar"))
      .subscribe();

    assertThat(signalsAndContext)
        .hasSize(1)
        .allSatisfy(t2 -> {
          assertThat(t2.getT1())
              .isNotNull();
          assertThat(t2.getT2().getOrDefault("foo", "baz"))
              .isEqualTo("bar");
        });

    assertThat(signalsAndContext.stream().map(t2 -> t2.getT1().getType()))
        .containsExactly(SignalType.ON_ERROR);
  }
}

代码示例来源:origin: reactor/reactor-core

@Test
public void never() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  AtomicInteger onNext = new AtomicInteger();
  AtomicReference<Throwable> onError = new AtomicReference<>();
  AtomicBoolean onComplete = new AtomicBoolean();
  Mono.<Integer>never()
      .doOnEach(s -> {
        if (s.isOnNext()) {
          onNext.incrementAndGet();
        }
        else if (s.isOnError()) {
          onError.set(s.getThrowable());
        }
        else if (s.isOnComplete()) {
          onComplete.set(true);
        }
      })
      .subscribe(ts);
  assertThat(onNext.get()).isZero();
  assertThat(onError.get()).isNull();
  assertThat(onComplete.get()).isFalse();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void nextComplete() {
  List<Tuple2<Signal, Context>> signalsAndContext = new ArrayList<>();
  Mono.just(1)
    .hide()
    .doOnEach(s -> signalsAndContext.add(Tuples.of(s, s.getContext())))
    .subscriberContext(Context.of("foo", "bar"))
    .subscribe();
  assertThat(signalsAndContext)
      .hasSize(2)
      .allSatisfy(t2 -> {
        assertThat(t2.getT1())
            .isNotNull();
        assertThat(t2.getT2().getOrDefault("foo", "baz"))
            .isEqualTo("bar");
      });
  assertThat(signalsAndContext.stream().map(t2 -> t2.getT1().getType()))
      .containsExactly(SignalType.ON_NEXT, SignalType.ON_COMPLETE);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void consumerError() {
  LongAdder state = new LongAdder();
  Throwable err = new Exception("test");
  StepVerifier.create(
      Mono.just(1)
        .doOnEach(s -> {
          if (s.isOnNext()) {
            state.increment();
            throw Exceptions.propagate(err);
          }
        }))
        .expectErrorMessage("test")
        .verify();
  assertThat(state.intValue()).isEqualTo(1);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void consumerBubbleError() {
  LongAdder state = new LongAdder();
  Throwable err = new Exception("test");
  assertThatThrownBy(() ->
      StepVerifier.create(
          Mono.just(1)
            .doOnEach(s -> {
              if (s.isOnNext()) {
                state.increment();
                throw Exceptions.bubble(err);
              }
            }))
            .expectErrorMessage("test")
            .verify())
      .isInstanceOf(RuntimeException.class)
      .matches(Exceptions::isBubbling, "bubbling")
      .hasCause(err); //equivalent to unwrap for this case
  assertThat(state.intValue()).isEqualTo(1);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void testDoOnEachSignalToSubscriber() {
  AssertSubscriber<Integer> peekSubscriber = AssertSubscriber.create();
  Mono<Integer> mono = Mono.just(1)
               .doOnEach(s -> s.accept(peekSubscriber));
  StepVerifier.create(mono)
        .expectSubscription()
        .expectNext(1)
        .expectComplete()
        .verify();
  peekSubscriber.assertNotSubscribed();
  peekSubscriber.assertValues(1);
  peekSubscriber.assertComplete();
}

代码示例来源:origin: org.springframework.boot/spring-boot-actuator

@Override
public Mono<ClientResponse> filter(ClientRequest clientRequest,
    ExchangeFunction exchangeFunction) {
  return exchangeFunction.exchange(clientRequest).doOnEach((signal) -> {
    if (!signal.isOnComplete()) {
      Long startTime = signal.getContext().get(METRICS_WEBCLIENT_START_TIME);
      ClientResponse clientResponse = signal.get();
      Throwable throwable = signal.getThrowable();
      Iterable<Tag> tags = this.tagProvider.tags(clientRequest, clientResponse,
          throwable);
      Timer.builder(this.metricName).tags(tags)
          .description("Timer of WebClient operation")
          .register(this.meterRegistry)
          .record(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
    }
  }).subscriberContext((context) -> context.put(METRICS_WEBCLIENT_START_TIME,
      System.nanoTime()));
}

代码示例来源:origin: io.projectreactor.ipc/reactor-netty

private Mono<Void> applyWebsocketHandler(HttpClientWSOperations ops, Mono<Void> handshake,
    BiFunction<? super WebsocketInbound, ? super WebsocketOutbound, ? extends Publisher<Void>> websocketHandler) {
  if (websocketHandler != noopHandler()) {
    handshake =
        handshake.doOnEach(signal -> {
          if(!signal.hasError()) {
            websocketHandler.apply(ops, ops)
                    .subscribe(new WebsocketSubscriber(ops, signal.getContext()));
          }
        });
  }
  return handshake;
}

代码示例来源:origin: reactor/reactor-netty

final Mono<Void> withWebsocketSupport(String url,
    @Nullable String protocols,
    int maxFramePayloadLength,
    BiFunction<? super WebsocketInbound, ? super WebsocketOutbound, ? extends Publisher<Void>> websocketHandler) {
  Objects.requireNonNull(websocketHandler, "websocketHandler");
  if (markSentHeaders()) {
    WebsocketServerOperations
        ops = new WebsocketServerOperations(url, protocols, maxFramePayloadLength, this);
    if (rebind(ops)) {
      return FutureMono.from(ops.handshakerResult)
               .doOnEach(signal -> {
                 if(!signal.hasError() && (protocols == null || ops.selectedSubprotocol() != null)) {
                  websocketHandler.apply(ops, ops)
                          .subscribe(new WebsocketSubscriber(ops, signal.getContext()));
                }
               });
    }
  }
  else {
    log.error(format(channel(), "Cannot enable websocket if headers have already been sent"));
  }
  return Mono.error(new IllegalStateException("Failed to upgrade to websocket"));
}

代码示例来源:origin: io.projectreactor.netty/reactor-netty

final Mono<Void> withWebsocketSupport(String url,
    @Nullable String protocols,
    int maxFramePayloadLength,
    BiFunction<? super WebsocketInbound, ? super WebsocketOutbound, ? extends Publisher<Void>> websocketHandler) {
  Objects.requireNonNull(websocketHandler, "websocketHandler");
  if (markSentHeaders()) {
    WebsocketServerOperations
        ops = new WebsocketServerOperations(url, protocols, maxFramePayloadLength, this);
    if (rebind(ops)) {
      return FutureMono.from(ops.handshakerResult)
               .doOnEach(signal -> {
                 if(!signal.hasError() && (protocols == null || ops.selectedSubprotocol() != null)) {
                  websocketHandler.apply(ops, ops)
                          .subscribe(new WebsocketSubscriber(ops, signal.getContext()));
                }
               });
    }
  }
  else {
    log.error(format(channel(), "Cannot enable websocket if headers have already been sent"));
  }
  return Mono.error(new IllegalStateException("Failed to upgrade to websocket"));
}

代码示例来源:origin: io.projectreactor.ipc/reactor-netty

final Mono<Void> withWebsocketSupport(String url,
    String protocols,
    BiFunction<? super WebsocketInbound, ? super WebsocketOutbound, ? extends Publisher<Void>> websocketHandler) {
  Objects.requireNonNull(websocketHandler, "websocketHandler");
  if (markSentHeaders()) {
    HttpServerWSOperations ops = new HttpServerWSOperations(url, protocols, this);
    if (replace(ops)) {
      return FutureMono.from(ops.handshakerResult)
               .doOnEach(signal -> {
                 if(!signal.hasError()) {
                  websocketHandler.apply(ops, ops)
                          .subscribe(new WebsocketSubscriber(ops, signal.getContext()));
                }
               });
    }
  }
  else {
    log.error(format(channel(), "Cannot enable websocket if headers have already been sent"));
  }
  return Mono.error(new IllegalStateException("Failed to upgrade to websocket"));
}

相关文章

Mono类方法