本文整理了Java中reactor.core.publisher.Mono.handle()
方法的一些代码示例,展示了Mono.handle()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Mono.handle()
方法的具体详情如下:
包路径:reactor.core.publisher.Mono
类名称:Mono
方法名:handle
[英]Handle the items emitted by this Mono by calling a biconsumer with the output sink for each onNext. At most one SynchronousSink#next(Object)call must be performed and/or 0 or 1 SynchronousSink#error(Throwable) or SynchronousSink#complete().
[中]通过使用每个onNext的输出接收器调用biconsumer来处理此Mono发出的项。最多必须执行一个SynchronousSink#next(Object)调用和/或0或1个SynchronousSink#error(Throwable)或SynchronousSink#complete()。
代码示例来源:origin: spring-projects/spring-security
@Override
public Mono<OAuth2AuthorizationRequest> removeAuthorizationRequest(
ServerWebExchange exchange) {
String state = getStateParameter(exchange);
if (state == null) {
return Mono.empty();
}
return exchange.getSession()
.map(WebSession::getAttributes)
.handle((sessionAttrs, sink) -> {
Map<String, OAuth2AuthorizationRequest> stateToAuthzRequest = sessionAttrsMapStateToAuthorizationRequest(sessionAttrs);
if (stateToAuthzRequest == null) {
sink.complete();
return;
}
OAuth2AuthorizationRequest removedValue = stateToAuthzRequest.remove(state);
if (stateToAuthzRequest.isEmpty()) {
sessionAttrs.remove(this.sessionAttributeName);
}
if (removedValue == null) {
sink.complete();
} else {
sink.next(removedValue);
}
});
}
代码示例来源:origin: spring-projects/spring-data-redis
@SuppressWarnings("unchecked")
AsyncConnect(LettuceConnectionProvider connectionProvider, Class<T> connectionType) {
Assert.notNull(connectionProvider, "LettuceConnectionProvider must not be null!");
Assert.notNull(connectionType, "Connection type must not be null!");
this.connectionProvider = connectionProvider;
Mono<StatefulConnection> defer = Mono
.defer(() -> Mono.fromCompletionStage(connectionProvider.getConnectionAsync(connectionType)));
this.connectionPublisher = defer.doOnNext(it -> {
if (isClosing(this.state.get())) {
it.closeAsync();
} else {
connection = it;
}
}) //
.cache() //
.handle((connection, sink) -> {
if (isClosing(this.state.get())) {
sink.error(new IllegalStateException("Unable to connect. Connection is closed!"));
} else {
sink.next((T) connection);
}
});
}
代码示例来源:origin: reactor/reactor-core
@Test
public void normal() {
Mono.just(1)
.handle((v, s) -> s.next(v * 2))
.subscribeWith(AssertSubscriber.create())
.assertContainValues(singleton(2))
.assertNoError()
.assertComplete();
}
@Test
代码示例来源:origin: reactor/reactor-core
@Test
public void filterNullMapResult() {
Mono.just(1)
.handle((v, s) -> { /*ignore*/ })
.subscribeWith(AssertSubscriber.create())
.assertValueCount(0)
.assertNoError()
.assertComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void normalHide() {
Mono.just(1)
.hide()
.handle((v, s) -> s.next(v * 2))
.subscribeWith(AssertSubscriber.create())
.assertContainValues(singleton(2))
.assertNoError()
.assertComplete();
}
代码示例来源:origin: spring-projects/spring-data-mongodb
.handle((it, sink) -> {
代码示例来源:origin: reactor/reactor-core
Mono<Tuple2<Integer, String>> doPut(String url, Mono<String> data) {
Mono<Tuple2<String, Optional<Object>>> dataAndContext =
data.zipWith(Mono.subscriberContext()
.map(c -> c.getOrEmpty(HTTP_CORRELATION_ID)));
return dataAndContext
.<String>handle((dac, sink) -> {
if (dac.getT2().isPresent()) {
sink.next("PUT <" + dac.getT1() + "> sent to " + url + " with header X-Correlation-ID = " + dac.getT2().get());
}
else {
sink.next("PUT <" + dac.getT1() + "> sent to " + url);
}
sink.complete();
})
.map(msg -> Tuples.of(200, msg));
}
代码示例来源:origin: reactor/reactor-core
@Test
public void flatMapDelayErrors() throws Exception {
AtomicInteger onNextSignals = new AtomicInteger();
StepVerifier.create(Flux.range(0, 5).hide()
.flatMapDelayError(i -> Mono.just(i)
.doOnNext(e -> onNextSignals.incrementAndGet())
.handle((s1, sink) -> {
if (s1 == 1 || s1 == 3) {
sink.error(new RuntimeException("Error: " + s1));
}
else {
sink.next(s1);
}
})
.subscribeOn(Schedulers.parallel()),
4, 4)
.retry(1))
.recordWith(ArrayList::new)
.expectNextCount(3)
.consumeRecordedWith(c -> {
assertThat(c).containsExactlyInAnyOrder(0, 2, 4);
c.clear();
})
.expectNextCount(3)
.consumeRecordedWith(c -> assertThat(c).containsExactlyInAnyOrder(0, 2, 4))
.verifyError();
assertThat(onNextSignals.get()).isEqualTo(10);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void contextGetMono() throws InterruptedException {
StepVerifier.create(Mono.just(1)
.log()
.handle((d, c) -> c.next(c.currentContext().get("test") + "" + d))
.handle((d, c) -> c.next(c.currentContext().get("test2") + "" + d))
.subscriberContext(ctx -> ctx.put("test2", "bar"))
.subscriberContext(ctx -> ctx.put("test", "foo"))
.log())
.expectNext("barfoo1")
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void contextGetHideMono() throws InterruptedException {
StepVerifier.create(Mono.just(1)
.hide()
.log()
.handle((d, c) -> c.next(c.currentContext().get("test") + "" + d))
.handle((d, c) -> c.next(c.currentContext().get("test2") + "" + d))
.subscriberContext(ctx -> ctx.put("test", "foo"))
.subscriberContext(ctx -> ctx.put("test2", "bar"))
.log())
.expectNext("barfoo1")
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
.autoConnect()
.flatMap(i -> Mono.just(i)
.doOnNext(e -> onNextSignals.incrementAndGet()).<Integer>handle(
(s1, sink) -> {
if (s1 == 1) {
代码示例来源:origin: reactor/reactor-core
.doOnNext(e -> onNextSignals.incrementAndGet()).<Integer>handle(
(s1, sink) -> {
if (s1 == 1) {
代码示例来源:origin: com.aol.cyclops/cyclops-reactor
/**
* @param handler
* @return
* @see reactor.core.publisher.Mono#handle(java.util.function.BiConsumer)
*/
public final <R> Mono<R> handle(BiConsumer<? super T, SynchronousSink<R>> handler) {
return boxed.handle(handler);
}
/**
代码示例来源:origin: apache/servicemix-bundles
@Override
public Mono<OAuth2AuthorizationRequest> removeAuthorizationRequest(
ServerWebExchange exchange) {
String state = getStateParameter(exchange);
if (state == null) {
return Mono.empty();
}
return exchange.getSession()
.map(WebSession::getAttributes)
.handle((sessionAttrs, sink) -> {
Map<String, OAuth2AuthorizationRequest> stateToAuthzRequest = sessionAttrsMapStateToAuthorizationRequest(sessionAttrs);
if (stateToAuthzRequest == null) {
sink.complete();
return;
}
OAuth2AuthorizationRequest removedValue = stateToAuthzRequest.remove(state);
if (stateToAuthzRequest.isEmpty()) {
sessionAttrs.remove(this.sessionAttributeName);
}
if (removedValue == null) {
sink.complete();
} else {
sink.next(removedValue);
}
});
}
代码示例来源:origin: org.mule.runtime/mule-core
private Publisher<CoreEvent> applyInternal(final Exception exception, CoreEvent event) {
return just(event)
.map(beforeRouting(exception))
.flatMapMany(route(exception)).last()
.map(afterRouting(exception))
.doOnError(MessagingException.class, onRoutingError())
.<CoreEvent>handle((result, sink) -> {
if (exception instanceof MessagingException) {
if (((MessagingException) exception).handled()) {
sink.next(result);
} else {
((MessagingException) exception).setProcessedEvent(result);
sink.error(exception);
}
} else {
sink.error(exception);
}
})
.doOnSuccessOrError((result, throwable) -> fireEndNotification(event, result, throwable));
}
内容来源于网络,如有侵权,请联系作者删除!