本文整理了Java中reactor.core.publisher.Mono.onErrorReturn()
方法的一些代码示例,展示了Mono.onErrorReturn()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Mono.onErrorReturn()
方法的具体详情如下:
包路径:reactor.core.publisher.Mono
类名称:Mono
方法名:onErrorReturn
[英]Simply emit a captured fallback value when an error of the specified type is observed on this Mono.
[中]当在此Mono上观察到指定类型的错误时,只需发出捕获的回退值。
代码示例来源:origin: spring-projects/spring-data-elasticsearch
/**
* Customization hook on the actual execution result {@link Publisher}. <br />
*
* @param request the already prepared {@link GetRequest} ready to be executed.
* @return a {@link Mono} emitting the result of the operation.
*/
protected Mono<Boolean> doExists(GetRequest request) {
return Mono.from(execute(client -> client.exists(request))) //
.onErrorReturn(NoSuchIndexException.class, false);
}
代码示例来源:origin: spring-projects/spring-data-redis
@Override
public Flux<MultiValueResponse<HGetCommand, ByteBuffer>> hMGet(Publisher<HGetCommand> commands) {
return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {
Assert.notNull(command.getKey(), "Key must not be null!");
Assert.notNull(command.getFields(), "Fields must not be null!");
Mono<List<KeyValue<ByteBuffer, ByteBuffer>>> result;
if (command.getFields().size() == 1) {
ByteBuffer key = command.getFields().iterator().next();
result = cmd.hget(command.getKey(), key.duplicate()).map(value -> KeyValue.fromNullable(key, value))
.map(Collections::singletonList).onErrorReturn(Collections.emptyList());
} else {
result = cmd.hmget(command.getKey(), command.getFields().stream().toArray(ByteBuffer[]::new)).collectList();
}
return result.map(value -> new MultiValueResponse<>(command,
value.stream().map(keyValue -> keyValue.getValueOrElse(null)).collect(Collectors.toList())));
}));
}
代码示例来源:origin: reactor/reactor-core
void simpleFlux(){
Flux.just(1)
.map(d -> d + 1)
.doOnNext(d -> {throw new RuntimeException("test");})
.collectList()
.onErrorReturn(Collections.singletonList(2))
.block();
}
代码示例来源:origin: reactor/reactor-core
/**
* Switch to a fallback {@link Flux} as soon as no item is emitted within the
* given {@link Duration} from the previous emission (or the subscription for the
* first item), as measured on the specified {@link Scheduler}.
* <p>
* If the given {@link Publisher} is null, signal a {@link TimeoutException} instead.
*
* <p>
* <img class="marble" src="doc-files/marbles/timeoutFallbackForFlux.svg" alt="">
*
* @param timeout the timeout {@link Duration} between two signals from this {@link Flux}
* @param fallback the fallback {@link Publisher} to subscribe when a timeout occurs
* @param timer a time-capable {@link Scheduler} instance to run on
*
* @return a {@link Flux} that will fallback to a different {@link Publisher} in case of a per-item timeout
*/
public final Flux<T> timeout(Duration timeout,
@Nullable Publisher<? extends T> fallback,
Scheduler timer) {
final Mono<Long> _timer = Mono.delay(timeout, timer).onErrorReturn(0L);
final Function<T, Publisher<Long>> rest = o -> _timer;
if(fallback == null) {
return timeout(_timer, rest, timeout.toMillis() + "ms");
}
return timeout(_timer, rest, fallback);
}
代码示例来源:origin: reactor/reactor-core
/**
* Switch to a fallback {@link Mono} in case an item doesn't arrive before the given period,
* as measured on the provided {@link Scheduler}.
*
* <p> If the given {@link Mono} is null, signal a {@link TimeoutException}.
*
* <p>
* <img class="marble" src="doc-files/marbles/timeoutFallbackForMono.svg" alt="">
*
* @param timeout the timeout before the onNext signal from this {@link Mono}
* @param fallback the fallback {@link Mono} to subscribe when a timeout occurs
* @param timer a time-capable {@link Scheduler} instance to run on
*
* @return an expirable {@link Mono} with a fallback {@link Mono}
*/
public final Mono<T> timeout(Duration timeout, @Nullable Mono<? extends T> fallback,
Scheduler timer) {
final Mono<Long> _timer = Mono.delay(timeout, timer).onErrorReturn(0L);
if(fallback == null) {
return onAssembly(new MonoTimeout<>(this, _timer, timeout.toMillis() + "ms"));
}
return onAssembly(new MonoTimeout<>(this, _timer, fallback));
}
代码示例来源:origin: reactor/reactor-core
@Test
public void errorFiltered2() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
Mono.<Integer>error(new RuntimeException("forced failure"))
.onErrorReturn(e -> e.getMessage().equals("forced failure"), 2)
.subscribe(ts);
ts.assertValues(2)
.assertNoError()
.assertComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void otherwiseReturnErrorUnfilter2() {
MonoProcessor<Integer> mp = MonoProcessor.create();
StepVerifier.create(Mono.<Integer>error(new TestException())
.onErrorReturn(RuntimeException.class::isInstance, 1)
.subscribeWith(mp))
.then(() -> assertThat(mp.isError()).isTrue())
.then(() -> assertThat(mp.isSuccess()).isFalse())
.then(() -> assertThat(mp.isTerminated()).isTrue())
.verifyError(TestException.class);
}
}
代码示例来源:origin: spring-projects/spring-data-elasticsearch
.onErrorReturn(ConnectException.class, ClientResponse.create(HttpStatus.SERVICE_UNAVAILABLE).build());
代码示例来源:origin: reactor/reactor-core
@Test
public void otherwiseReturnErrorUnfilter() {
MonoProcessor<Integer> mp = MonoProcessor.create();
StepVerifier.create(Mono.<Integer>error(new TestException())
.onErrorReturn(RuntimeException.class, 1)
.subscribeWith(mp))
.then(() -> assertThat(mp.isError()).isTrue())
.then(() -> assertThat(mp.isSuccess()).isFalse())
.then(() -> assertThat(mp.isTerminated()).isTrue())
.verifyError(TestException.class);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void otherwiseReturnErrorFilter2() {
MonoProcessor<Integer> mp = MonoProcessor.create();
StepVerifier.create(Mono.<Integer>error(new TestException())
.onErrorReturn(TestException.class::isInstance, 1)
.subscribeWith(mp))
.then(() -> assertThat(mp.isError()).isFalse())
.then(() -> assertThat(mp.isSuccess()).isTrue())
.then(() -> assertThat(mp.isTerminated()).isTrue())
.expectNext(1)
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void otherwiseReturnErrorFilter() {
MonoProcessor<Integer> mp = MonoProcessor.create();
StepVerifier.create(Mono.<Integer>error(new TestException())
.onErrorReturn(TestException.class, 1)
.subscribeWith(mp))
.then(() -> assertThat(mp.isError()).isFalse())
.then(() -> assertThat(mp.isSuccess()).isTrue())
.then(() -> assertThat(mp.isTerminated()).isTrue())
.expectNext(1)
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void errorFiltered3() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
Mono.<Integer>error(new RuntimeException("forced failure"))
.onErrorReturn(RuntimeException.class, 2)
.subscribe(ts);
ts.assertValues(2)
.assertNoError()
.assertComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void error2() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
Mono.<Integer>error(new RuntimeException("forced failure")).onErrorReturn(2)
.subscribe(ts);
ts.assertValues(2)
.assertNoError()
.assertComplete();
}
代码示例来源:origin: reactor/reactor-core
.map(it -> it.get(String.class))
.doOnNext(probeContextValue::set)
.onErrorReturn("fail")
);
Mono<String> contextHandler = probe.mono();
代码示例来源:origin: reactor/reactor-core
@Test
@Parameters(method = "sources01")
public void contextPropagationOnCancel(Flux<String> source) {
TestResource testResource = new TestResource();
AtomicReference<Throwable> errorRef = new AtomicReference<>();
PublisherProbe<String> probe = PublisherProbe.of(
Mono.subscriberContext()
.map(it -> it.get(String.class))
.doOnError(errorRef::set)
.onErrorReturn("fail")
);
Mono<String> cancelHandler = probe.mono();
Flux.usingWhen(Mono.just(testResource),
r -> source,
TestResource::commit,
TestResource::rollback,
cancel -> cancelHandler)
.subscriberContext(Context.of(String.class, "contextual"))
.take(1)
.as(StepVerifier::create)
.expectNextCount(1)
.verifyComplete();
testResource.rollbackProbe.assertWasNotSubscribed();
testResource.commitProbe.assertWasNotSubscribed();
probe.assertWasSubscribed();
assertThat(errorRef).hasValue(null);
}
代码示例来源:origin: reactor/reactor-core
@Test
@Parameters(method = "sources01")
public void contextPropagationOnCancelWithNoHandler(Flux<String> source) {
TestResource testResource = new TestResource();
AtomicReference<Throwable> errorRef = new AtomicReference<>();
PublisherProbe<String> probe = PublisherProbe.of(
Mono.subscriberContext()
.map(it -> it.get(String.class))
.doOnError(errorRef::set)
.onErrorReturn("fail")
);
Mono<String> cancelHandler = probe.mono();
Flux.usingWhen(Mono.just(testResource),
r -> source,
commit -> cancelHandler,
TestResource::rollback,
null)
.subscriberContext(Context.of(String.class, "contextual"))
.take(1)
.as(StepVerifier::create)
.expectNextCount(1)
.verifyComplete();
testResource.rollbackProbe.assertWasNotSubscribed();
testResource.commitProbe.assertWasNotSubscribed();
probe.assertWasSubscribed();
assertThat(errorRef).hasValue(null);
}
代码示例来源:origin: reactor/reactor-core
.map(it -> it.get(String.class))
.doOnNext(probeContextValue::set)
.onErrorReturn("fail")
);
Mono<String> contextHandler = probe.mono();
代码示例来源:origin: spring-projects/spring-restdocs
private List<OperationRequestPart> extractRequestParts(ExchangeResult result) {
if (!ClassUtils.isPresent(
"org.synchronoss.cloud.nio.multipart.NioMultipartParserListener",
getClass().getClassLoader())) {
return Collections.emptyList();
}
return new MultipartHttpMessageReader(new SynchronossPartHttpMessageReader())
.readMono(ResolvableType.forClass(Part.class),
new ExchangeResultReactiveHttpInputMessage(result),
Collections.emptyMap())
.onErrorReturn(new LinkedMultiValueMap<>()).block().values().stream()
.flatMap((parts) -> parts.stream().map(this::createOperationRequestPart))
.collect(Collectors.toList());
}
代码示例来源:origin: rsocket/rsocket-java
.requestResponse(DefaultPayload.create("Hello"))
.map(Payload::getDataUtf8)
.onErrorReturn("error")
.doOnNext(System.out::println)
.block();
.requestResponse(DefaultPayload.create("Hello"))
.map(Payload::getDataUtf8)
.onErrorReturn("error")
.doOnNext(System.out::println)
.block();
.requestResponse(DefaultPayload.create("Hello"))
.map(Payload::getDataUtf8)
.onErrorReturn("error")
.doOnNext(System.out::println)
.block();
代码示例来源:origin: rsocket/rsocket-java
@Test(timeout = 5_000L)
public void testRequestResponseErrors() {
handler =
new AbstractRSocket() {
boolean first = true;
@Override
public Mono<Payload> requestResponse(Payload payload) {
if (first) {
first = false;
return Mono.error(new RuntimeException("EX"));
} else {
return Mono.just(DefaultPayload.create("SUCCESS"));
}
}
};
RSocket client = buildClient();
Payload response1 =
client
.requestResponse(DefaultPayload.create("REQUEST", "META"))
.onErrorReturn(DefaultPayload.create("ERROR"))
.block();
Payload response2 =
client
.requestResponse(DefaultPayload.create("REQUEST", "META"))
.onErrorReturn(DefaultPayload.create("ERROR"))
.block();
assertEquals("ERROR", response1.getDataUtf8());
assertEquals("SUCCESS", response2.getDataUtf8());
}
内容来源于网络,如有侵权,请联系作者删除!