reactor.core.publisher.Mono.compose()方法的使用及代码示例

x33g5p2x  于2022-01-24 转载在 其他  
字(8.3k)|赞(0)|评价(0)|浏览(281)

本文整理了Java中reactor.core.publisher.Mono.compose()方法的一些代码示例,展示了Mono.compose()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Mono.compose()方法的具体详情如下:
包路径:reactor.core.publisher.Mono
类名称:Mono
方法名:compose

Mono.compose介绍

[英]Defer the given transformation to this Mono in order to generate a target Mono type. A transformation will occur for each Subscriber. For instance:

mono.compose(original -> original.log());

[中]将给定的转换推迟到这个Mono,以便生成目标Mono类型。每个订阅服务器都将发生转换。例如:

mono.compose(original -> original.log());

代码示例

代码示例来源:origin: reactor/reactor-core

@Test
public void perTrackable() {
  
  Mono<Integer> source = Mono.just(10).compose(f -> {
    AtomicInteger value = new AtomicInteger();
    return f.map(v -> v + value.incrementAndGet());
  });
  
  
  for (int i = 0; i < 10; i++) {
    AssertSubscriber<Integer> ts = AssertSubscriber.create();
    
    source.subscribe(ts);
    
    ts.assertValues(11)
    .assertComplete()
    .assertNoError();
  }
}

代码示例来源:origin: reactor/reactor-core

@Test
public void composerThrows() {
  Mono<Integer> source = Mono.just(10).compose(f -> {
    throw new RuntimeException("Forced failure");
  });
  
  for (int i = 0; i < 10; i++) {
    AssertSubscriber<Integer> ts = AssertSubscriber.create();
    
    source.subscribe(ts);
    
    ts.assertNoValues()
    .assertNotComplete()
    .assertError(RuntimeException.class);
  }
  
}

代码示例来源:origin: reactor/reactor-core

@Test
  public void composerReturnsNull() {
    Mono<Integer> source = Mono.just(10).compose(f -> {
      return null;
    });
    
    for (int i = 0; i < 10; i++) {
      AssertSubscriber<Integer> ts = AssertSubscriber.create();
      
      source.subscribe(ts);
      
      ts.assertNoValues()
      .assertNotComplete()
      .assertError(NullPointerException.class);
    }
    
  }
}

代码示例来源:origin: org.springframework.boot/spring-boot-actuator

@Override
public Mono<Health> health() {
  return Flux.fromIterable(this.registry.getAll().entrySet())
      .flatMap((entry) -> Mono.zip(Mono.just(entry.getKey()),
          entry.getValue().health().compose(this.timeoutCompose)))
      .collectMap(Tuple2::getT1, Tuple2::getT2)
      .map(this.healthAggregator::aggregate);
}

代码示例来源:origin: mulesoft/mule

@Override
 public Publisher<Void> execute(CoreEvent event, Map<String, Object> parameters, SourceCallbackContext context) {
  return from(first.execute(event, parameters, context)).compose(v -> then.execute(event, parameters, context));
 }
}

代码示例来源:origin: org.springframework.boot/spring-boot-actuator

@Override
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
  if (this.autoTimeRequests) {
    return chain.filter(exchange).compose((call) -> filter(exchange, call));
  }
  return chain.filter(exchange);
}

代码示例来源:origin: com.aol.cyclops/cyclops-reactor

/**
 * @param transformer
 * @return
 * @see reactor.core.publisher.Mono#compose(java.util.function.Function)
 */
public final <V> Mono<V> compose(Function<? super Mono<T>, ? extends Publisher<V>> transformer) {
  return boxed.compose(transformer);
}
/**

代码示例来源:origin: org.cloudfoundry/cloudfoundry-client-spring

protected final <T> Mono<T> doGet(Class<T> responseType, Function<UriComponentsBuilder, UriComponentsBuilder> uriTransformer, Function<HttpClientRequest, HttpClientRequest> requestTransformer) {
  return doGet(uriTransformer, requestTransformer)
    .compose(deserializedResponse(responseType));
}

代码示例来源:origin: org.mule.runtime/mule-module-extensions-support

@Override
 public Publisher<Void> execute(CoreEvent event, Map<String, Object> parameters, SourceCallbackContext context) {
  return from(first.execute(event, parameters, context)).compose(v -> then.execute(event, parameters, context));
 }
}

代码示例来源:origin: org.cloudfoundry/cloudfoundry-client-spring

protected final <T> Mono<T> doDelete(Object request, Class<T> responseType, Function<UriComponentsBuilder, UriComponentsBuilder> uriTransformer,
                   Function<HttpClientRequest, HttpClientRequest> requestTransformer) {
  return this.root
    .map(root -> buildUri(root, uriTransformer))
    .then(uri -> this.httpClient.delete(uri, outbound -> this.authorizationProvider.addAuthorization(outbound)
      .map(requestTransformer)
      .then(o -> o.send(serializedRequest(o, request))))
      .doOnSubscribe(s -> this.requestLogger.debug("DELETE {}", uri))
      .compose(logResponse(uri)))
    .compose(deserializedResponse(responseType));
}

代码示例来源:origin: org.cloudfoundry/cloudfoundry-client-spring

protected final <T> Mono<T> doPost(Class<T> responseType, Function<UriComponentsBuilder, UriComponentsBuilder> uriTransformer, Function<HttpClientRequest, Mono<Void>> requestTransformer) {
  return this.root
    .map(root -> buildUri(root, uriTransformer))
    .then(uri -> this.httpClient.post(uri, outbound -> this.authorizationProvider.addAuthorization(outbound)
      .then(requestTransformer))
      .doOnSubscribe(s -> this.requestLogger.debug("POST   {}", uri))
      .compose(logResponse(uri)))
    .compose(deserializedResponse(responseType));
}

代码示例来源:origin: org.cloudfoundry/cloudfoundry-client-spring

protected final <T> Mono<T> doPut(Class<T> responseType, Function<UriComponentsBuilder, UriComponentsBuilder> uriTransformer, Function<HttpClientRequest, Mono<Void>> requestTransformer) {
  return this.root
    .map(root -> buildUri(root, uriTransformer))
    .then(uri -> this.httpClient.put(uri, outbound -> this.authorizationProvider.addAuthorization(outbound)
      .then(requestTransformer))
      .doOnSubscribe(s -> this.requestLogger.debug("PUT    {}", uri))
      .compose(logResponse(uri)))
    .compose(deserializedResponse(responseType));
}

代码示例来源:origin: org.cloudfoundry/cloudfoundry-client-spring

protected final <T> Mono<T> doPatch(Object request, Class<T> responseType, Function<UriComponentsBuilder, UriComponentsBuilder> uriTransformer,
                  Function<HttpClientRequest, HttpClientRequest> requestTransformer) {
  return this.root
    .map(root -> buildUri(root, uriTransformer))
    .then(uri -> this.httpClient.patch(uri, outbound -> this.authorizationProvider.addAuthorization(outbound)
      .map(requestTransformer)
      .then(o -> o.send(serializedRequest(o, request))))
      .doOnSubscribe(s -> this.requestLogger.debug("PATCH  {}", uri))
      .compose(logResponse(uri)))
    .compose(deserializedResponse(responseType));
}

代码示例来源:origin: org.cloudfoundry/cloudfoundry-client-spring

protected final <T> Mono<T> doPut(Object request, Class<T> responseType, Function<UriComponentsBuilder, UriComponentsBuilder> uriTransformer,
                 Function<HttpClientRequest, HttpClientRequest> requestTransformer) {
  return this.root
    .map(root -> buildUri(root, uriTransformer))
    .then(uri -> this.httpClient.put(uri, outbound -> this.authorizationProvider.addAuthorization(outbound)
      .map(requestTransformer)
      .then(o -> o.send(serializedRequest(o, request))))
      .doOnSubscribe(s -> this.requestLogger.debug("PUT    {}", uri))
      .compose(logResponse(uri)))
    .compose(deserializedResponse(responseType));
}

代码示例来源:origin: org.cloudfoundry/cloudfoundry-client-spring

protected final Mono<HttpClientResponse> doGet(Function<UriComponentsBuilder, UriComponentsBuilder> uriTransformer, Function<HttpClientRequest, HttpClientRequest> requestTransformer) {
  return this.root
    .map(root -> buildUri(root, uriTransformer))
    .then(uri -> this.httpClient.get(uri, outbound -> this.authorizationProvider.addAuthorization(outbound)
      .map(requestTransformer)
      .then(HttpClientRequest::sendHeaders))
      .doOnSubscribe(s -> this.requestLogger.debug("GET    {}", uri))
      .compose(logResponse(uri)));
}

代码示例来源:origin: org.cloudfoundry/cloudfoundry-client-spring

protected final Mono<HttpClientResponse> doWs(Function<UriComponentsBuilder, UriComponentsBuilder> uriTransformer, Function<HttpClientRequest, HttpClientRequest> requestTransformer) {
  return this.root
    .map(root -> buildUri(root, uriTransformer))
    .then(uri -> this.httpClient.get(uri, outbound -> this.authorizationProvider.addAuthorization(outbound)
      .map(requestTransformer)
      .then(HttpClientRequest::upgradeToTextWebsocket))
      .doOnSubscribe(s -> this.requestLogger.debug("WS     {}", uri))
      .compose(logResponse(uri)));
}

代码示例来源:origin: org.springframework.metrics/spring-metrics

@Override
  public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
    return chain.filter(exchange).compose(f -> {
      long start = System.nanoTime();
      return f.doOnSuccess(done ->
              registry.timer(metricName, tagConfigurer.httpRequestTags(exchange, null))
                  .record(System.nanoTime() - start, TimeUnit.NANOSECONDS)
          )
          .doOnError(t ->
              registry.timer(metricName, tagConfigurer.httpRequestTags(exchange, t))
                  .record(System.nanoTime() - start, TimeUnit.NANOSECONDS)
          );
    });
  }
}

相关文章

Mono类方法