reactor.core.publisher.Mono.doOnSuccessOrError()方法的使用及代码示例

x33g5p2x  于2022-01-24 转载在 其他  
字(12.0k)|赞(0)|评价(0)|浏览(288)

本文整理了Java中reactor.core.publisher.Mono.doOnSuccessOrError()方法的一些代码示例,展示了Mono.doOnSuccessOrError()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Mono.doOnSuccessOrError()方法的具体详情如下:
包路径:reactor.core.publisher.Mono
类名称:Mono
方法名:doOnSuccessOrError

Mono.doOnSuccessOrError介绍

[英]Add behavior triggered when the Mono terminates, either by completing successfully or with an error.

  • null, null : completing without data
  • T, null : completing with data
  • null, Throwable : failing with/without data
    [中]

代码示例

代码示例来源:origin: spring-cloud/spring-cloud-gateway

@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
  Sample sample = Timer.start(meterRegistry);
  return chain.filter(exchange).doOnSuccessOrError((aVoid, ex) -> {
    endTimerRespectingCommit(exchange, sample);
  });
}

代码示例来源:origin: reactor/reactor-core

@Test
public void onMonoRejectedDoOnSuccessOrError() {
  Mono<String> mp = Mono.error(new Exception("test"));
  AtomicReference<Throwable> ref = new AtomicReference<>();
  mp.doOnSuccessOrError((s, f) -> ref.set(f))
   .subscribe();
  assertThat(ref.get()).hasMessage("test");
}

代码示例来源:origin: reactor/reactor-core

@Test
public void onMonoSuccessDoOnSuccessOrError() {
  Mono<String> mp = Mono.just("test");
  AtomicReference<String> ref = new AtomicReference<>();
  mp.doOnSuccessOrError((s, f) -> ref.set(s))
   .subscribe();
  assertThat(ref.get()).isEqualToIgnoringCase("test");
}

代码示例来源:origin: line/armeria

Mono<Void> handle(ServiceRequestContext ctx, HttpRequest req, CompletableFuture<HttpResponse> future,
           @Nullable String serverHeader) {
    final ArmeriaServerHttpRequest convertedRequest;
    try {
      convertedRequest = new ArmeriaServerHttpRequest(ctx, req, factoryWrapper);
    } catch (Exception e) {
      logger.warn("{} Invalid request path: {}", ctx, req.path(), e);
      future.complete(HttpResponse.of(HttpStatus.BAD_REQUEST));
      return Mono.empty();
    }

    final ArmeriaServerHttpResponse convertedResponse =
        new ArmeriaServerHttpResponse(ctx, future, factoryWrapper, serverHeader);
    return httpHandler.handle(convertedRequest, convertedResponse)
             .doOnSuccessOrError((unused, cause) -> {
               if (cause != null) {
                 logger.debug("{} Failed to handle a request", ctx, cause);
                 convertedResponse.setComplete(cause).subscribe();
               } else {
                 convertedResponse.setComplete().subscribe();
               }
             });
  }
}

代码示例来源:origin: reactor/reactor-core

@Test
public void completeOnNextWithoutCancel() {
  AtomicInteger onCancel = new AtomicInteger();
  AtomicInteger sourceOnCancel = new AtomicInteger();
  AtomicInteger onTerminate = new AtomicInteger();
  AtomicInteger sourceOnTerminate = new AtomicInteger();
  Mono<String> source = Mono.<String>fromDirect(s -> {
    s.onSubscribe(Operators.emptySubscription());
    s.onNext("foo");
  })
  .doOnCancel(sourceOnCancel::incrementAndGet)
  .doOnSuccessOrError((v, e) -> sourceOnTerminate.incrementAndGet());
  StepVerifier.withVirtualTime(() -> new MonoDelayElement<>(source,
      2,
      TimeUnit.SECONDS,
      defaultSchedulerForDelay())
      .doOnCancel(onCancel::incrementAndGet)
      .doOnSuccessOrError((v, e) -> onTerminate.incrementAndGet()))
        .expectSubscription()
        .expectNoEvent(Duration.ofSeconds(2))
        .expectNext("foo")
        .verifyComplete();
  assertThat(onTerminate.get()).isEqualTo(1);
  assertThat(sourceOnTerminate.get()).isEqualTo(1);
  assertThat(onCancel.get()).isEqualTo(0);
  assertThat(sourceOnCancel.get()).isEqualTo(0);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void onSuccessOrErrorForEmpty() {
  LongAdder invoked = new LongAdder();
  AtomicReference<String> value = new AtomicReference<>();
  AtomicReference<Throwable> error = new AtomicReference<>();
  StepVerifier.create(Mono.<String>empty()
      .doOnSuccessOrError((v, t) -> {
        invoked.increment();
        value.set(v);
        error.set(t);
      }))
        .expectComplete()
        .verify();
  assertEquals(1, invoked.intValue());
  assertEquals(null, value.get());
  assertEquals(null, error.get());
}

代码示例来源:origin: reactor/reactor-core

@Test
public void onSuccessOrErrorCallbackFailureInterruptsOnNext() {
  LongAdder invoked = new LongAdder();
  StepVerifier.create(Mono.just("foo")
              .doOnSuccessOrError((v, t) -> {
                invoked.increment();
                throw new IllegalArgumentException(v);
              }))
        .expectErrorMessage("foo")
        .verify();
  assertEquals(1, invoked.intValue());
}

代码示例来源:origin: reactor/reactor-core

@Test
public void onSuccessOrErrorForOnError() {
  LongAdder invoked = new LongAdder();
  AtomicReference<String> value = new AtomicReference<>();
  AtomicReference<Throwable> error = new AtomicReference<>();
  IllegalArgumentException err = new IllegalArgumentException("boom");
  StepVerifier.create(Mono.<String>error(err)
              .doOnSuccessOrError((v, t) -> {
                invoked.increment();
                value.set(v);
                error.set(t);
              }))
        .expectErrorMessage("boom")
        .verify();
  assertEquals(1, invoked.intValue());
  assertEquals(null, value.get());
  assertEquals(err, error.get());
}

代码示例来源:origin: spring-cloud/spring-cloud-gateway

@Test
public void echo() throws Exception {
  int count = 100;
  Flux<String> input = Flux.range(1, count).map(index -> "msg-" + index);
  ReplayProcessor<Object> output = ReplayProcessor.create(count);
  client.execute(getUrl("/echo"),
      session -> {
        logger.debug("Starting to send messages");
        return session
            .send(input.doOnNext(s -> logger.debug("outbound " + s)).map(s -> session.textMessage(s)))
            .thenMany(session.receive().take(count).map(WebSocketMessage::getPayloadAsText))
            .subscribeWith(output)
            .doOnNext(s -> logger.debug("inbound " + s))
            .then()
            .doOnSuccessOrError((aVoid, ex) ->
                logger.debug("Done with " + (ex != null ? ex.getMessage() : "success")));
      })
      .block(Duration.ofMillis(5000));
  assertEquals(input.collectList().block(Duration.ofMillis(5000)),
      output.collectList().block(Duration.ofMillis(5000)));
}

代码示例来源:origin: spring-cloud/spring-cloud-gateway

@Test
public void echoForHttp() throws Exception {
  int count = 100;
  Flux<String> input = Flux.range(1, count).map(index -> "msg-" + index);
  ReplayProcessor<Object> output = ReplayProcessor.create(count);
  client.execute(getHttpUrl("/echoForHttp"),
      session -> {
        logger.debug("Starting to send messages");
        return session
            .send(input.doOnNext(s -> logger.debug("outbound " + s)).map(s -> session.textMessage(s)))
            .thenMany(session.receive().take(count).map(WebSocketMessage::getPayloadAsText))
            .subscribeWith(output)
            .doOnNext(s -> logger.debug("inbound " + s))
            .then()
            .doOnSuccessOrError((aVoid, ex) ->
                logger.debug("Done with " + (ex != null ? ex.getMessage() : "success")));
      })
      .block(Duration.ofMillis(5000));
  assertEquals(input.collectList().block(Duration.ofMillis(5000)),
      output.collectList().block(Duration.ofMillis(5000)));
}

代码示例来源:origin: spring-cloud/spring-cloud-gateway

public GatewayFilter apply(Repeat<ServerWebExchange> repeat, Retry<ServerWebExchange> retry) {
  return (exchange, chain) -> {
    trace("Entering retry-filter");
    // chain.filter returns a Mono<Void>
    Publisher<Void> publisher = chain.filter(exchange)
        //.log("retry-filter", Level.INFO)
        .doOnSuccessOrError((aVoid, throwable) -> {
          int iteration = exchange.getAttributeOrDefault(RETRY_ITERATION_KEY, -1);
          int newIteration = iteration + 1;
          trace("setting new iteration in attr %d", newIteration);
          exchange.getAttributes().put(RETRY_ITERATION_KEY, newIteration);
        });
    if (retry != null) {
      // retryWhen returns a Mono<Void>
      // retry needs to go before repeat
      publisher = ((Mono<Void>)publisher).retryWhen(retry.withApplicationContext(exchange));
    }
    if (repeat != null) {
      // repeatWhen returns a Flux<Void>
      // so this needs to be last and the variable a Publisher<Void>
      publisher = ((Mono<Void>)publisher).repeatWhen(repeat.withApplicationContext(exchange));
    }
    return Mono.fromDirect(publisher);
  };
}

代码示例来源:origin: reactor/reactor-core

@Test
public void onSuccessOrErrorFusion() {
  LongAdder invoked = new LongAdder();
  AtomicBoolean completedEmpty = new AtomicBoolean();
  AtomicReference<Throwable> error = new AtomicReference<>();
  Mono<Integer> mono = Flux
      .range(1, 10)
      .reduce((a, b) -> a + b)
      .doOnSuccessOrError((v, t) -> {
        if (v == null && t == null) completedEmpty.set(true);
        if (t != null) error.set(t);
        invoked.increment();
      });
  StepVerifier.create(mono)
        .expectFusion()
        .expectNext(55)
        .expectComplete()
        .verify();
  assertFalse("unexpected empty completion", completedEmpty.get());
  assertEquals(1, invoked.intValue());
  assertEquals("unexpected error", null, error.get());
}

代码示例来源:origin: reactor/reactor-core

@Test
public void onSuccessOrErrorNormal() {
  LongAdder invoked = new LongAdder();
  AtomicBoolean completedEmpty = new AtomicBoolean();
  AtomicReference<Throwable> error = new AtomicReference<>();
  Mono<Integer> mono = Flux
      .range(1, 10)
      .reduce((a, b) -> a + b)
      .hide()
      .doOnSuccessOrError((v, t) -> {
        if (v == null && t == null) completedEmpty.set(true);
        if (t != null) error.set(t);
        invoked.increment();
      });
  StepVerifier.create(mono)
        .expectFusion(Fuseable.ANY, Fuseable.NONE)
        .expectNext(55)
        .expectComplete()
        .verify();
  assertFalse("unexpected empty completion", completedEmpty.get());
  assertEquals(1, invoked.intValue());
  assertEquals("unexpected error", null, error.get());
}

代码示例来源:origin: reactor/reactor-core

@Test
public void onSuccessOrErrorFusionConditional() {
  LongAdder invoked = new LongAdder();
  AtomicBoolean completedEmpty = new AtomicBoolean();
  AtomicReference<Throwable> error = new AtomicReference<>();
  Mono<Integer> mono = Flux
      .range(1, 10)
      .reduce((a, b) -> a + b)
      .filter(v -> true)
      .doOnSuccessOrError((v, t) -> {
        if (v == null && t == null) completedEmpty.set(true);
        if (t != null) error.set(t);
        invoked.increment();
      });
  StepVerifier.create(mono)
        .expectFusion()
        .expectNext(55)
        .expectComplete()
        .verify();
  assertFalse("unexpected empty completion", completedEmpty.get());
  assertEquals(1, invoked.intValue());
  assertEquals("unexpected error", null, error.get());
}

代码示例来源:origin: spring-projects/spring-framework

@Override
public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {
  Mono<Integer> requestSizeMono = request.getBody().
      reduce(0, (integer, dataBuffer) -> integer +
          dataBuffer.readableByteCount()).
      doOnSuccessOrError((size, throwable) -> {
        assertNull(throwable);
        assertEquals(REQUEST_SIZE, (long) size);
      });
  response.getHeaders().setContentLength(RESPONSE_SIZE);
  return requestSizeMono.then(response.writeWith(multipleChunks()));
}

代码示例来源:origin: reactor/reactor-core

@Test
public void onSuccessOrErrorNormalConditional() {
  LongAdder invoked = new LongAdder();
  AtomicBoolean completedEmpty = new AtomicBoolean();
  AtomicReference<Throwable> error = new AtomicReference<>();
  Mono<Integer> mono = Flux
      .range(1, 10)
      .reduce((a, b) -> a + b)
      .hide()
      .filter(v -> true)
      .doOnSuccessOrError((v, t) -> {
        if (v == null && t == null) completedEmpty.set(true);
        if (t != null) error.set(t);
        invoked.increment();
      });
  StepVerifier.create(mono)
        .expectFusion(Fuseable.ANY, Fuseable.NONE)
        .expectNext(55)
        .expectComplete()
        .verify();
  assertFalse("unexpected empty completion", completedEmpty.get());
  assertEquals(1, invoked.intValue());
  assertEquals("unexpected error", null, error.get());
}

代码示例来源:origin: mulesoft/mule

}, sink::error);
})
  .doOnSuccessOrError((value, e) -> {
   try {
    after(context, value, executedInterceptors);

代码示例来源:origin: scalecube/scalecube-services

/**
 * Method to send normal response.
 *
 * @param response response
 * @return mono void
 */
public Mono<Void> send(GatewayMessage response) {
 return Mono.defer(
   () ->
     outbound
       .sendObject(Mono.just(response).map(codec::encode).map(TextWebSocketFrame::new))
       .then()
       .doOnSuccessOrError((avoid, th) -> logSend(response, th)));
}

代码示例来源:origin: org.mule.runtime/mule-core

@Override
 public Publisher<CoreEvent> apply(Publisher<CoreEvent> publisher) {
  return from(publisher).concatMap(event -> Mono.just(event).doOnNext(pushSubFlowFlowStackElement())
    .transform(s -> super.apply(s)).doOnSuccessOrError((result, throwable) -> popSubFlowFlowStackElement().accept(event)));
 }
}

代码示例来源:origin: com.vmware.card-connectors/core-test

@Override
public Mono<ClientHttpResponse> connect(HttpMethod method, URI uri, Function<? super ClientHttpRequest, Mono<Void>> requestCallback) {
  int phase = phaser.getPhase();
  phaser.register();
  return reactorClientHttpConnector.connect(method, uri, requestCallback)
      .doOnSuccessOrError((response, throwable) -> phaser.arriveAndDeregister())
      .delayUntil(response -> awaitAdvance(phase));
}

相关文章

Mono类方法