reactor.core.publisher.Mono.onErrorReturn()方法的使用及代码示例

x33g5p2x  于2022-01-24 转载在 其他  
字(11.0k)|赞(0)|评价(0)|浏览(526)

本文整理了Java中reactor.core.publisher.Mono.onErrorReturn()方法的一些代码示例,展示了Mono.onErrorReturn()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Mono.onErrorReturn()方法的具体详情如下:
包路径:reactor.core.publisher.Mono
类名称:Mono
方法名:onErrorReturn

Mono.onErrorReturn介绍

[英]Simply emit a captured fallback value when an error of the specified type is observed on this Mono.
[中]当在此Mono上观察到指定类型的错误时,只需发出捕获的回退值。

代码示例

代码示例来源:origin: spring-projects/spring-data-elasticsearch

/**
 * Customization hook on the actual execution result {@link Publisher}. <br />
 *
 * @param request the already prepared {@link GetRequest} ready to be executed.
 * @return a {@link Mono} emitting the result of the operation.
 */
protected Mono<Boolean> doExists(GetRequest request) {
  return Mono.from(execute(client -> client.exists(request))) //
      .onErrorReturn(NoSuchIndexException.class, false);
}

代码示例来源:origin: spring-projects/spring-data-redis

@Override
public Flux<MultiValueResponse<HGetCommand, ByteBuffer>> hMGet(Publisher<HGetCommand> commands) {
  return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {
    Assert.notNull(command.getKey(), "Key must not be null!");
    Assert.notNull(command.getFields(), "Fields must not be null!");
    Mono<List<KeyValue<ByteBuffer, ByteBuffer>>> result;
    if (command.getFields().size() == 1) {
      ByteBuffer key = command.getFields().iterator().next();
      result = cmd.hget(command.getKey(), key.duplicate()).map(value -> KeyValue.fromNullable(key, value))
          .map(Collections::singletonList).onErrorReturn(Collections.emptyList());
    } else {
      result = cmd.hmget(command.getKey(), command.getFields().stream().toArray(ByteBuffer[]::new)).collectList();
    }
    return result.map(value -> new MultiValueResponse<>(command,
        value.stream().map(keyValue -> keyValue.getValueOrElse(null)).collect(Collectors.toList())));
  }));
}

代码示例来源:origin: reactor/reactor-core

void simpleFlux(){
  Flux.just(1)
    .map(d -> d + 1)
    .doOnNext(d -> {throw new RuntimeException("test");})
    .collectList()
    .onErrorReturn(Collections.singletonList(2))
    .block();
}

代码示例来源:origin: reactor/reactor-core

/**
 * Switch to a fallback {@link Flux} as soon as no item is emitted within the
 * given {@link Duration} from the previous emission (or the subscription for the
 * first item), as measured on the specified {@link Scheduler}.
 * <p>
 * If the given {@link Publisher} is null, signal a {@link TimeoutException} instead.
 *
 * <p>
 * <img class="marble" src="doc-files/marbles/timeoutFallbackForFlux.svg" alt="">
 *
 * @param timeout the timeout {@link Duration} between two signals from this {@link Flux}
 * @param fallback the fallback {@link Publisher} to subscribe when a timeout occurs
 * @param timer a time-capable {@link Scheduler} instance to run on
 *
 * @return a {@link Flux} that will fallback to a different {@link Publisher} in case of a per-item timeout
 */
public final Flux<T> timeout(Duration timeout,
    @Nullable Publisher<? extends T> fallback,
    Scheduler timer) {
  final Mono<Long> _timer = Mono.delay(timeout, timer).onErrorReturn(0L);
  final Function<T, Publisher<Long>> rest = o -> _timer;
  if(fallback == null) {
    return timeout(_timer, rest, timeout.toMillis() + "ms");
  }
  return timeout(_timer, rest, fallback);
}

代码示例来源:origin: reactor/reactor-core

/**
 * Switch to a fallback {@link Mono} in case an item doesn't arrive before the given period,
 * as measured on the provided {@link Scheduler}.
 *
 * <p> If the given {@link Mono} is null, signal a {@link TimeoutException}.
 *
 * <p>
 * <img class="marble" src="doc-files/marbles/timeoutFallbackForMono.svg" alt="">
 *
 * @param timeout the timeout before the onNext signal from this {@link Mono}
 * @param fallback the fallback {@link Mono} to subscribe when a timeout occurs
 * @param timer a time-capable {@link Scheduler} instance to run on
 *
 * @return an expirable {@link Mono} with a fallback {@link Mono}
 */
public final Mono<T> timeout(Duration timeout, @Nullable Mono<? extends T> fallback,
    Scheduler timer) {
  final Mono<Long> _timer = Mono.delay(timeout, timer).onErrorReturn(0L);
  if(fallback == null) {
    return onAssembly(new MonoTimeout<>(this, _timer, timeout.toMillis() + "ms"));
  }
  return onAssembly(new MonoTimeout<>(this, _timer, fallback));
}

代码示例来源:origin: reactor/reactor-core

@Test
public void errorFiltered2() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  Mono.<Integer>error(new RuntimeException("forced failure"))
      .onErrorReturn(e -> e.getMessage().equals("forced failure"), 2)
      .subscribe(ts);
  ts.assertValues(2)
   .assertNoError()
   .assertComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
  public void otherwiseReturnErrorUnfilter2() {
    MonoProcessor<Integer> mp = MonoProcessor.create();
    StepVerifier.create(Mono.<Integer>error(new TestException())
        .onErrorReturn(RuntimeException.class::isInstance, 1)
        .subscribeWith(mp))
          .then(() -> assertThat(mp.isError()).isTrue())
          .then(() -> assertThat(mp.isSuccess()).isFalse())
          .then(() -> assertThat(mp.isTerminated()).isTrue())
          .verifyError(TestException.class);
  }
}

代码示例来源:origin: spring-projects/spring-data-elasticsearch

.onErrorReturn(ConnectException.class, ClientResponse.create(HttpStatus.SERVICE_UNAVAILABLE).build());

代码示例来源:origin: reactor/reactor-core

@Test
public void otherwiseReturnErrorUnfilter() {
  MonoProcessor<Integer> mp = MonoProcessor.create();
  StepVerifier.create(Mono.<Integer>error(new TestException())
      .onErrorReturn(RuntimeException.class, 1)
      .subscribeWith(mp))
        .then(() -> assertThat(mp.isError()).isTrue())
        .then(() -> assertThat(mp.isSuccess()).isFalse())
        .then(() -> assertThat(mp.isTerminated()).isTrue())
        .verifyError(TestException.class);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void otherwiseReturnErrorFilter2() {
  MonoProcessor<Integer> mp = MonoProcessor.create();
  StepVerifier.create(Mono.<Integer>error(new TestException())
      .onErrorReturn(TestException.class::isInstance, 1)
      .subscribeWith(mp))
        .then(() -> assertThat(mp.isError()).isFalse())
        .then(() -> assertThat(mp.isSuccess()).isTrue())
        .then(() -> assertThat(mp.isTerminated()).isTrue())
        .expectNext(1)
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void otherwiseReturnErrorFilter() {
  MonoProcessor<Integer> mp = MonoProcessor.create();
  StepVerifier.create(Mono.<Integer>error(new TestException())
      .onErrorReturn(TestException.class, 1)
      .subscribeWith(mp))
        .then(() -> assertThat(mp.isError()).isFalse())
        .then(() -> assertThat(mp.isSuccess()).isTrue())
        .then(() -> assertThat(mp.isTerminated()).isTrue())
        .expectNext(1)
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void errorFiltered3() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  Mono.<Integer>error(new RuntimeException("forced failure"))
      .onErrorReturn(RuntimeException.class, 2)
      .subscribe(ts);
  ts.assertValues(2)
   .assertNoError()
   .assertComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void error2() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  Mono.<Integer>error(new RuntimeException("forced failure")).onErrorReturn(2)
                                .subscribe(ts);
  ts.assertValues(2)
   .assertNoError()
   .assertComplete();
}

代码示例来源:origin: reactor/reactor-core

.map(it -> it.get(String.class))
      .doOnNext(probeContextValue::set)
      .onErrorReturn("fail")
);
Mono<String> contextHandler = probe.mono();

代码示例来源:origin: reactor/reactor-core

@Test
@Parameters(method = "sources01")
public void contextPropagationOnCancel(Flux<String> source) {
  TestResource testResource = new TestResource();
  AtomicReference<Throwable> errorRef = new AtomicReference<>();
  PublisherProbe<String> probe = PublisherProbe.of(
      Mono.subscriberContext()
        .map(it -> it.get(String.class))
        .doOnError(errorRef::set)
        .onErrorReturn("fail")
  );
  Mono<String> cancelHandler = probe.mono();
  Flux.usingWhen(Mono.just(testResource),
      r -> source,
      TestResource::commit,
      TestResource::rollback,
      cancel -> cancelHandler)
    .subscriberContext(Context.of(String.class, "contextual"))
    .take(1)
    .as(StepVerifier::create)
    .expectNextCount(1)
    .verifyComplete();
  testResource.rollbackProbe.assertWasNotSubscribed();
  testResource.commitProbe.assertWasNotSubscribed();
  probe.assertWasSubscribed();
  assertThat(errorRef).hasValue(null);
}

代码示例来源:origin: reactor/reactor-core

@Test
@Parameters(method = "sources01")
public void contextPropagationOnCancelWithNoHandler(Flux<String> source) {
  TestResource testResource = new TestResource();
  AtomicReference<Throwable> errorRef = new AtomicReference<>();
  PublisherProbe<String> probe = PublisherProbe.of(
      Mono.subscriberContext()
        .map(it -> it.get(String.class))
        .doOnError(errorRef::set)
        .onErrorReturn("fail")
  );
  Mono<String> cancelHandler = probe.mono();
  Flux.usingWhen(Mono.just(testResource),
      r -> source,
      commit -> cancelHandler,
      TestResource::rollback,
      null)
    .subscriberContext(Context.of(String.class, "contextual"))
    .take(1)
    .as(StepVerifier::create)
    .expectNextCount(1)
    .verifyComplete();
  testResource.rollbackProbe.assertWasNotSubscribed();
  testResource.commitProbe.assertWasNotSubscribed();
  probe.assertWasSubscribed();
  assertThat(errorRef).hasValue(null);
}

代码示例来源:origin: reactor/reactor-core

.map(it -> it.get(String.class))
      .doOnNext(probeContextValue::set)
      .onErrorReturn("fail")
);
Mono<String> contextHandler = probe.mono();

代码示例来源:origin: spring-projects/spring-restdocs

private List<OperationRequestPart> extractRequestParts(ExchangeResult result) {
  if (!ClassUtils.isPresent(
      "org.synchronoss.cloud.nio.multipart.NioMultipartParserListener",
      getClass().getClassLoader())) {
    return Collections.emptyList();
  }
  return new MultipartHttpMessageReader(new SynchronossPartHttpMessageReader())
      .readMono(ResolvableType.forClass(Part.class),
          new ExchangeResultReactiveHttpInputMessage(result),
          Collections.emptyMap())
      .onErrorReturn(new LinkedMultiValueMap<>()).block().values().stream()
      .flatMap((parts) -> parts.stream().map(this::createOperationRequestPart))
      .collect(Collectors.toList());
}

代码示例来源:origin: rsocket/rsocket-java

.requestResponse(DefaultPayload.create("Hello"))
.map(Payload::getDataUtf8)
.onErrorReturn("error")
.doOnNext(System.out::println)
.block();
.requestResponse(DefaultPayload.create("Hello"))
.map(Payload::getDataUtf8)
.onErrorReturn("error")
.doOnNext(System.out::println)
.block();
.requestResponse(DefaultPayload.create("Hello"))
.map(Payload::getDataUtf8)
.onErrorReturn("error")
.doOnNext(System.out::println)
.block();

代码示例来源:origin: rsocket/rsocket-java

@Test(timeout = 5_000L)
public void testRequestResponseErrors() {
 handler =
   new AbstractRSocket() {
    boolean first = true;
    @Override
    public Mono<Payload> requestResponse(Payload payload) {
     if (first) {
      first = false;
      return Mono.error(new RuntimeException("EX"));
     } else {
      return Mono.just(DefaultPayload.create("SUCCESS"));
     }
    }
   };
 RSocket client = buildClient();
 Payload response1 =
   client
     .requestResponse(DefaultPayload.create("REQUEST", "META"))
     .onErrorReturn(DefaultPayload.create("ERROR"))
     .block();
 Payload response2 =
   client
     .requestResponse(DefaultPayload.create("REQUEST", "META"))
     .onErrorReturn(DefaultPayload.create("ERROR"))
     .block();
 assertEquals("ERROR", response1.getDataUtf8());
 assertEquals("SUCCESS", response2.getDataUtf8());
}

相关文章

Mono类方法