reactor.core.publisher.Mono.handle()方法的使用及代码示例

x33g5p2x  于2022-01-24 转载在 其他  
字(7.1k)|赞(0)|评价(0)|浏览(777)

本文整理了Java中reactor.core.publisher.Mono.handle()方法的一些代码示例,展示了Mono.handle()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Mono.handle()方法的具体详情如下:
包路径:reactor.core.publisher.Mono
类名称:Mono
方法名:handle

Mono.handle介绍

[英]Handle the items emitted by this Mono by calling a biconsumer with the output sink for each onNext. At most one SynchronousSink#next(Object)call must be performed and/or 0 or 1 SynchronousSink#error(Throwable) or SynchronousSink#complete().
[中]通过使用每个onNext的输出接收器调用biconsumer来处理此Mono发出的项。最多必须执行一个SynchronousSink#next(Object)调用和/或0或1个SynchronousSink#error(Throwable)或SynchronousSink#complete()。

代码示例

代码示例来源:origin: spring-projects/spring-security

@Override
public Mono<OAuth2AuthorizationRequest> removeAuthorizationRequest(
    ServerWebExchange exchange) {
  String state = getStateParameter(exchange);
  if (state == null) {
    return Mono.empty();
  }
  return exchange.getSession()
    .map(WebSession::getAttributes)
    .handle((sessionAttrs, sink) -> {
      Map<String, OAuth2AuthorizationRequest> stateToAuthzRequest = sessionAttrsMapStateToAuthorizationRequest(sessionAttrs);
      if (stateToAuthzRequest == null) {
        sink.complete();
        return;
      }
      OAuth2AuthorizationRequest removedValue = stateToAuthzRequest.remove(state);
      if (stateToAuthzRequest.isEmpty()) {
        sessionAttrs.remove(this.sessionAttributeName);
      }
      if (removedValue == null) {
        sink.complete();
      } else {
        sink.next(removedValue);
      }
    });
}

代码示例来源:origin: spring-projects/spring-data-redis

@SuppressWarnings("unchecked")
AsyncConnect(LettuceConnectionProvider connectionProvider, Class<T> connectionType) {
  Assert.notNull(connectionProvider, "LettuceConnectionProvider must not be null!");
  Assert.notNull(connectionType, "Connection type must not be null!");
  this.connectionProvider = connectionProvider;
  Mono<StatefulConnection> defer = Mono
      .defer(() -> Mono.fromCompletionStage(connectionProvider.getConnectionAsync(connectionType)));
  this.connectionPublisher = defer.doOnNext(it -> {
    if (isClosing(this.state.get())) {
      it.closeAsync();
    } else {
      connection = it;
    }
  }) //
      .cache() //
      .handle((connection, sink) -> {
        if (isClosing(this.state.get())) {
          sink.error(new IllegalStateException("Unable to connect. Connection is closed!"));
        } else {
          sink.next((T) connection);
        }
      });
}

代码示例来源:origin: reactor/reactor-core

@Test
public void normal() {
  Mono.just(1)
    .handle((v, s) -> s.next(v * 2))
    .subscribeWith(AssertSubscriber.create())
    .assertContainValues(singleton(2))
    .assertNoError()
    .assertComplete();
}
@Test

代码示例来源:origin: reactor/reactor-core

@Test
public void filterNullMapResult() {
  Mono.just(1)
    .handle((v, s) -> { /*ignore*/ })
    .subscribeWith(AssertSubscriber.create())
    .assertValueCount(0)
    .assertNoError()
    .assertComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void normalHide() {
  Mono.just(1)
    .hide()
    .handle((v, s) -> s.next(v * 2))
    .subscribeWith(AssertSubscriber.create())
    .assertContainValues(singleton(2))
    .assertNoError()
    .assertComplete();
}

代码示例来源:origin: spring-projects/spring-data-mongodb

.handle((it, sink) -> {

代码示例来源:origin: reactor/reactor-core

Mono<Tuple2<Integer, String>> doPut(String url, Mono<String> data) {
  Mono<Tuple2<String, Optional<Object>>> dataAndContext =
      data.zipWith(Mono.subscriberContext()
               .map(c -> c.getOrEmpty(HTTP_CORRELATION_ID)));
  return dataAndContext
      .<String>handle((dac, sink) -> {
        if (dac.getT2().isPresent()) {
          sink.next("PUT <" + dac.getT1() + "> sent to " + url + " with header X-Correlation-ID = " + dac.getT2().get());
        }
        else {
          sink.next("PUT <" + dac.getT1() + "> sent to " + url);
        }
        sink.complete();
      })
      .map(msg -> Tuples.of(200, msg));
}

代码示例来源:origin: reactor/reactor-core

@Test
public void flatMapDelayErrors() throws Exception {
  AtomicInteger onNextSignals = new AtomicInteger();
  StepVerifier.create(Flux.range(0, 5).hide()
              .flatMapDelayError(i -> Mono.just(i)
                       .doOnNext(e -> onNextSignals.incrementAndGet())
                       .handle((s1, sink) -> {
                         if (s1 == 1 || s1 == 3) {
                           sink.error(new RuntimeException("Error: " + s1));
                         }
                         else {
                           sink.next(s1);
                         }
                       })
                       .subscribeOn(Schedulers.parallel()),
                  4, 4)
              .retry(1))
        .recordWith(ArrayList::new)
        .expectNextCount(3)
        .consumeRecordedWith(c ->  {
          assertThat(c).containsExactlyInAnyOrder(0, 2, 4);
          c.clear();
        })
        .expectNextCount(3)
        .consumeRecordedWith(c ->  assertThat(c).containsExactlyInAnyOrder(0, 2, 4))
        .verifyError();
  assertThat(onNextSignals.get()).isEqualTo(10);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void contextGetMono() throws InterruptedException {
  StepVerifier.create(Mono.just(1)
              .log()
              .handle((d, c) -> c.next(c.currentContext().get("test") + "" + d))
              .handle((d, c) -> c.next(c.currentContext().get("test2") + "" + d))
              .subscriberContext(ctx -> ctx.put("test2", "bar"))
              .subscriberContext(ctx -> ctx.put("test", "foo"))
              .log())
        .expectNext("barfoo1")
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void contextGetHideMono() throws InterruptedException {
  StepVerifier.create(Mono.just(1)
              .hide()
              .log()
              .handle((d, c) -> c.next(c.currentContext().get("test") + "" + d))
              .handle((d, c) -> c.next(c.currentContext().get("test2") + "" + d))
              .subscriberContext(ctx -> ctx.put("test", "foo"))
              .subscriberContext(ctx -> ctx.put("test2", "bar"))
              .log())
        .expectNext("barfoo1")
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

.autoConnect()
.flatMap(i -> Mono.just(i)
         .doOnNext(e -> onNextSignals.incrementAndGet()).<Integer>handle(
        (s1, sink) -> {
          if (s1 == 1) {

代码示例来源:origin: reactor/reactor-core

.doOnNext(e -> onNextSignals.incrementAndGet()).<Integer>handle(
(s1, sink) -> {
  if (s1 == 1) {

代码示例来源:origin: com.aol.cyclops/cyclops-reactor

/**
 * @param handler
 * @return
 * @see reactor.core.publisher.Mono#handle(java.util.function.BiConsumer)
 */
public final <R> Mono<R> handle(BiConsumer<? super T, SynchronousSink<R>> handler) {
  return boxed.handle(handler);
}
/**

代码示例来源:origin: apache/servicemix-bundles

@Override
public Mono<OAuth2AuthorizationRequest> removeAuthorizationRequest(
    ServerWebExchange exchange) {
  String state = getStateParameter(exchange);
  if (state == null) {
    return Mono.empty();
  }
  return exchange.getSession()
    .map(WebSession::getAttributes)
    .handle((sessionAttrs, sink) -> {
      Map<String, OAuth2AuthorizationRequest> stateToAuthzRequest = sessionAttrsMapStateToAuthorizationRequest(sessionAttrs);
      if (stateToAuthzRequest == null) {
        sink.complete();
        return;
      }
      OAuth2AuthorizationRequest removedValue = stateToAuthzRequest.remove(state);
      if (stateToAuthzRequest.isEmpty()) {
        sessionAttrs.remove(this.sessionAttributeName);
      }
      if (removedValue == null) {
        sink.complete();
      } else {
        sink.next(removedValue);
      }
    });
}

代码示例来源:origin: org.mule.runtime/mule-core

private Publisher<CoreEvent> applyInternal(final Exception exception, CoreEvent event) {
 return just(event)
   .map(beforeRouting(exception))
   .flatMapMany(route(exception)).last()
   .map(afterRouting(exception))
   .doOnError(MessagingException.class, onRoutingError())
   .<CoreEvent>handle((result, sink) -> {
    if (exception instanceof MessagingException) {
     if (((MessagingException) exception).handled()) {
      sink.next(result);
     } else {
      ((MessagingException) exception).setProcessedEvent(result);
      sink.error(exception);
     }
    } else {
     sink.error(exception);
    }
   })
   .doOnSuccessOrError((result, throwable) -> fireEndNotification(event, result, throwable));
}

相关文章

Mono类方法