java 将WebClient与Consul一起使用时,服务发现不起作用

qeeaahzv  于 2023-06-20  发布在  Java
关注(0)|答案(1)|浏览(148)

我们正在将Sping Boot 从2迁移到3,并摆脱Netflix Ribbon。我们在尝试使用Consul查找服务时遇到问题。如果我们回滚到Sping Boot 2 + Netflix Ribbon,它的工作没有问题,所以我们放弃了任何连接问题。

日志错误:

RoundRobinLoadBalancer|No servers available for service: cachedavailability-integrations-service
ReactorLoadBalancerExchangeFilterFunction|LoadBalancer does not contain an instance for the service cachedavailability-integrations-service
Communication error with uri: http://cachedavailability-integrations-service/testing org.springframework.web.reactive.function.client.WebClientResponseException$ServiceUnavailable: 503 Service Unavailable from UNKNOWN 
    at org.springframework.web.reactive.function.client.WebClientResponseException.create(WebClientResponseException.java:336)
    Suppressed: The stacktrace has been enhanced by Reactor, refer to additional information below: 
Error has been observed at the following site(s):
    *__checkpoint ⇢ 503 SERVICE_UNAVAILABLE from GET http://cachedavailability-integrations-service/testing [DefaultWebClient]

领事界面x1c 0d1x

1.服务“si-manager”是尝试发现和访问“cachedavailability”的服务。
1.它注册得很好,但是在使用WebClientbean时没有发现其他注册的服务。
我们已经尝试了很多方法,比如:

  1. https://docs.spring.io/spring-cloud-consul/docs/current/reference/html/#using-the-discoveryclient
  2. https://www.appsloveworld.com/springboot/100/7/service-discovery-with-spring-webflux-webclient
  3. Configuring spring-cloud loadbalancer without autoconfiguration

样本
主类

@org.springframework.cloud.client.discovery.EnableDiscoveryClient
public class MainApplication {...}

WebClient配置

@Bean(name = "webClientConsulAvailability")
  public WebClient webClientConsulAvailability(
    WebClient.Builder webClientBuilder,
    ReactorLoadBalancerExchangeFilterFunction lbFunction,
    ExchangeFilterFunction logFilter
  ) {
    return webClientBuilder
      .filter(lbFunction)
      .filter(logFilter)
      .build();
  }

bootstrap.yml**

spring:
  application:
    name: si-manager-service
  profiles:
    active: ${SPRING_PROFILES_ACTIVE:local}
  cloud:
    consul:
      host: localhost
      port: 8500
      enabled: true
      discovery:
        serviceName: ${spring.application.name}
        instanceId: ${spring.application.name}8500
        enabled: true
        # Register as a service in consul.
        register: true
        registerHealthCheck: true

依赖关系

**领事版本:**v1.15.3
用法示例:

webClientConsulAvailability.get()
      .uri("http://cachedavailability-integrations-service/testing")
      .retrieve()
      .bodyToFlux(MyDTO.class)
      .doOnError(e -> {
        if (isErrorLogLevel(e)) {
          log.error(COMMUNICATION_ERROR_WITH_URI + uri, e);
        } else {
          log.warn(COMMUNICATION_ERROR_WITH_URI + uri, e);
        }
      })
      .onErrorResume(e -> Flux.empty());
yi0zb3m4

yi0zb3m41#

用下面的代码修复。

package xpto;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.netty.channel.ChannelOption;
import io.netty.handler.timeout.ReadTimeoutHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.client.loadbalancer.reactive.DeferringLoadBalancerExchangeFilterFunction;
import org.springframework.cloud.client.loadbalancer.reactive.LoadBalancedExchangeFilterFunction;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.MediaType;
import org.springframework.http.client.reactive.ClientHttpConnector;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.http.codec.json.Jackson2JsonDecoder;
import org.springframework.http.codec.json.Jackson2JsonEncoder;
import org.springframework.web.reactive.function.client.ExchangeStrategies;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.util.DefaultUriBuilderFactory;
import reactor.netty.http.client.HttpClient;

import java.util.concurrent.TimeUnit;

@Configuration
@EnableDiscoveryClient
@Slf4j
public class WebclientConfiguration {

  private final ObjectMapper objectMapper;
  @Value("${web.client.read-timeout:25000}")
  private final int webClientReadTimeout;
  @Value("${web.client.connection-timeout:3000}")
  private final int webClientConnectionTimeout;

  public WebclientConfiguration(ObjectMapper objectMapper,
    @Value("${web.client.read-timeout:25000}") int webClientReadTimeout,
    @Value("${web.client.connection-timeout:3000}") int webClientConnectionTimeout) {
    this.objectMapper = objectMapper;
    this.webClientReadTimeout = webClientReadTimeout;
    this.webClientConnectionTimeout = webClientConnectionTimeout;
  }

  private ClientHttpConnector getClientHttpConnector() {
    return new ReactorClientHttpConnector(
      HttpClient.create().compress(true).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, webClientConnectionTimeout)
        .doOnConnected(conn -> conn.addHandlerLast(new ReadTimeoutHandler(webClientReadTimeout, TimeUnit.MILLISECONDS))));
  }

  @Bean
  public DefaultUriBuilderFactory builderFactory() {
    DefaultUriBuilderFactory factory = new DefaultUriBuilderFactory();
    factory.setEncodingMode(DefaultUriBuilderFactory.EncodingMode.VALUES_ONLY);
    return factory;
  }

  @Bean(name = "webClientConsul")
  public WebClient webClientConsul(
    WebClient.Builder webClientBuilder,
    DeferringLoadBalancerExchangeFilterFunction<LoadBalancedExchangeFilterFunction> exchangeFilterFunction
  ) {
    webClientBuilder.filter(exchangeFilterFunction);
    return buildWebClient(webClientBuilder);
  }

  @Bean(name = "webClientDefault")
  public WebClient webClientDefault(WebClient.Builder webClientBuilder) {
    return buildWebClient(webClientBuilder);
  }

  private WebClient buildWebClient(WebClient.Builder webClientBuilder) {
    ClientHttpConnector connector = getClientHttpConnector();
    return webClientBuilder
      .clientConnector(connector)
      .exchangeStrategies(getExchangeStrategies())
      .build();
  }

  private ExchangeStrategies getExchangeStrategies() {
    return  ExchangeStrategies.builder()
      .codecs(clientDefaultCodecsConfigurer -> {
        clientDefaultCodecsConfigurer
          .defaultCodecs()
          .jackson2JsonEncoder(
            new Jackson2JsonEncoder(objectMapper, MediaType.APPLICATION_JSON, MediaType.TEXT_EVENT_STREAM, MediaType.APPLICATION_STREAM_JSON));
        clientDefaultCodecsConfigurer
          .defaultCodecs()
          .jackson2JsonDecoder(
            new Jackson2JsonDecoder(objectMapper, MediaType.APPLICATION_JSON, MediaType.TEXT_EVENT_STREAM, MediaType.APPLICATION_STREAM_JSON));
      }).build();
  }
  
}

相关问题