Kafka 如何解决'java.lang.IllegalStateException:未找到线程绑定请求:[...]'?

9wbgstp7  于 2023-10-15  发布在  Apache
关注(0)|答案(1)|浏览(129)

简要说明

我正在开发一个微服务(MS-1),它应该在某个时候通过REST操作从另一个微服务(MS-2)获取信息。
MS-1有一个公共端点,还可以监听来自第三个微服务(MS-3)的Kafka消息。此外,端点将被删除,MS-1将只监听来自MS-2的Kafka消息。

问题

当通过端点调用MS-1时,一切都很好,它正确地执行了所需的REST操作- MS-1可以调用并从MS-2获得正确的返回。
但是,当相同的请求通过Kafka消息到达MS-1时,我的意思是,当MS-1监听MS-3发送的消息时,当MS-1需要通过REST请求调用MS-2时,它会引发以下异常- MS-1 CANNOT CALL MS-2。

例外情况

java.lang.IllegalStateException: No thread-bound request found: Are you referring to request attributes outside of an actual web request, or processing a request outside of the originally receiving thread? If you are actually operating within a web request and still receive this message, your code is probably running outside of DispatcherServlet: In this case, use RequestContextListener or RequestContextFilter to expose the current request.
at org.springframework.web.context.request.RequestContextHolder.currentRequestAttributes(RequestContextHolder.java:131) ~[spring-web-6.0.10.jar:6.0.10]     at org.springframework.web.context.support.WebApplicationContextUtils.currentRequestAttributes(WebApplicationContextUtils.java:313) ~[spring-web-6.0.10.jar:6.0.10]     
at org.springframework.web.context.support.WebApplicationContextUtils$SessionObjectFactory.getObject(WebApplicationContextUtils.java:370) ~[spring-web-6.0.10.jar:6.0.10]   
at org.springframework.web.context.support.WebApplicationContextUtils$SessionObjectFactory.getObject(WebApplicationContextUtils.java:365) ~[spring-web-6.0.10.jar:6.0.10]   
at org.springframework.beans.factory.support.AutowireUtils$ObjectFactoryDelegatingInvocationHandler.invoke(AutowireUtils.java:283) ~[spring-beans-6.0.10.jar:6.0.10]    
at jdk.proxy2/jdk.proxy2.$Proxy190.getAttribute(Unknown Source) ~[na:na]    
at [PROTECTED PATH].getHeaders(RestProxyService.java:65) ~[classes/:na]     
at [PROTECTED PATH].restExchange(RestProxyService.java:431) ~[classes/:na]  
at [PROTECTED PATH].callRemoteApi(RestProxyService.java:346) ~[classes/:na]     
at [PROTECTED PATH].callRestApi(RestProxyService.java:426) ~[classes/:na]   
at [PROTECTED PATH].getDetail(MyProxy.java:57) ~[classes/:na]   
at [PROTECTED PATH].entriesToReport(EntityWithDataService.java:75) ~[classes/:na]   at [PROTECTED PATH].getReportObjects(SupportedReportCodes.java:69) ~[classes/:na]   
at [PROTECTED PATH].generate(ReportService.java:72) ~[classes/:na]  
at [PROTECTED PATH].receive(KafkaConsumer.java:25) ~[classes/:na]   
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]  
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[na:na]    
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]    
at java.base/java.lang.reflect.Method.invoke(Method.java:568) ~[na:na]  
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:169) ~[spring-messaging-6.0.10.jar:6.0.10]  
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:119) ~[spring-messaging-6.0.10.jar:6.0.10]    
at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:56) ~[spring-kafka-3.0.8.jar:3.0.8]     
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:375) ~[spring-kafka-3.0.8.jar:3.0.8]   
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:92) ~[spring-kafka-3.0.8.jar:3.0.8]    
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:53) ~[spring-kafka-3.0.8.jar:3.0.8]    
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2924) ~[spring-kafka-3.0.8.jar:3.0.8]     
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2904) ~[spring-kafka-3.0.8.jar:3.0.8]   
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.lambda$doInvokeRecordListener$58(KafkaMessageListenerContainer.java:2822) ~[spring-kafka-3.0.8.jar:3.0.8]  
at io.micrometer.observation.Observation.lambda$observe$4(Observation.java:544) ~[micrometer-observation-1.11.1.jar:1.11.1]     
at io.micrometer.observation.Observation.observeWithContext(Observation.java:603) ~[micrometer-observation-1.11.1.jar:1.11.1]   
at io.micrometer.observation.Observation.observe(Observation.java:544) ~[micrometer-observation-1.11.1.jar:1.11.1]  
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2820) ~[spring-kafka-3.0.8.jar:3.0.8]    
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2672) ~[spring-kafka-3.0.8.jar:3.0.8]   
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2558) ~[spring-kafka-3.0.8.jar:3.0.8]  
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:2200) ~[spring-kafka-3.0.8.jar:3.0.8]    
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1555) ~[spring-kafka-3.0.8.jar:3.0.8]   
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1519) ~[spring-kafka-3.0.8.jar:3.0.8]     
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1394) ~[spring-kafka-3.0.8.jar:3.0.8]   
at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804) ~[na:na]  
at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]

我已经改变了一些配置来解决它。现在,我的application.properties和类是这样的:

CONSUMER(listener)-> MS-1

  • 应用程序.属性 *
kafka.security.enabled=false
spring.kafka.bootstrap-servers=[PROTECTED]     
#spring.kafka.producer.topic=report     
spring.kafka.consumer.group-id=reports     
spring.kafka.consumer.topic=report     
spring.kafka.consumer.auto-offset-reset=earliest     
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer     
spring.kafka.consumer.properties.spring.deserializer.value.delegate.class=org.springframework.kafka.support.serializer.JsonDeserializer     
spring.kafka.consumer.properties.spring.json.trusted.packages=*     
spring.kafka.consumer.properties.spring.json.type.mapping=log:[PROTECTED PATH].ReportMessageObject
  • class KafkaConsumer*
@Component
@Slf4j
public class KafkaConsumer {

    @Autowired
    private MyService myService;
    
    @KafkaListener(topics = "${spring.kafka.consumer.topic}", errorHandler = "kafkaEventErrorHandler", properties = "{spring.json.value.default.type=[PROTECTED PATH].ReportMessageObject}")
    public void receive(@Payload @Valid ConsumerRecord<String, ReportMessageObject> payload) {
        try {
            log.debug("received payload={}", payload.toString());
            myService.generate(payload.value().getCompanyId(), payload.value().getBody());
        } catch (Exception e) {
            log.error("Exception occurred while consuming ExternalInterface message {}", e.getMessage(), e);
        }
    }

}
  • class KafkaConsumerConfig*
@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    @Value(value = "${spring.kafka.bootstrap-servers}")
    private String kafkaAddress;
    
    @Value(value = "${spring.kafka.consumer.group-id: null}")
    private String kafkaGroup;
    
    @Value("${kafka.configuration.security.enabled:false}")
    private boolean kafkaSecurityEnabled;
    
    @Value("${spring.kafka.properties.security.protocol:null}")
    private String kafkaSecurityProtocol;
    
    @Value("${spring.kafka.properties.sasl.mechanism:null}")
    private String kafkaSecurityMechanism;
    
    @Value("${spring.kafka.properties.sasl.jaas.config:null}")
    private String kafkaSecurityJaasConfig;
    
    @Bean
    public ConsumerFactory<String, ReportMessageObject> responseConsumerFactory() {
        return new DefaultKafkaConsumerFactory<>(getKafkaConsumerConfigProperties(kafkaGroup), new StringDeserializer(), new JsonDeserializer<>(ReportMessageObject.class));
    }
    
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, ReportMessageObject> responseKafkaListenerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, ReportMessageObject> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(responseConsumerFactory());
        return factory;
    }
    
    private Map<String, Object> getKafkaConsumerConfigProperties(String groupId) {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaAddress);
        if(groupId != null && !groupId.isEmpty()) {
            configProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        }
        
        if (kafkaSecurityEnabled) {
            configProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, kafkaSecurityProtocol);
            configProps.put(SaslConfigs.SASL_MECHANISM, kafkaSecurityMechanism);
            configProps.put(SaslConfigs.SASL_JAAS_CONFIG, kafkaSecurityJaasConfig);
        }
        
        return configProps;
    }

}

生产商-> MS-3

  • 应用程序.属性 *
kafka.security.enabled=false     
spring.kafka.bootstrap-servers=[PROTECTED PATH]     
spring.kafka.producer.topic=report
  • class KafkaProducer*
@Slf4j
@Component
public class KafkaProducer {

    @Autowired
    private KafkaTemplate<String, ReportMessageObject> kafkaTemplate;
    @Autowired
    private SmartKafkaHeader smartKafkaHeader;
    
    private String property1;
    private String property2;
    private String property3;
    private String property4;
    
    public void addHeaders(String property1, String property2, String property3, String property4) {
        this.property1= property1;
        this.property2= property2;
        this.property3= property3;
        this.property4= property4;
    }
    
    public void send(String topic, ReportMessageObject reportMessageObject) {
    
        ProducerRecord<String, ReportMessageObject> message = new ProducerRecord<>(topic, reportMessageObject);
    
        smartKafkaHeader.addHeadersToRecord(message, property1, property2, property3, property4);
    
        kafkaTemplate.send(message);
        kafkaTemplate.flush();
    
        log.info("sending payload='{}' to topic='{}'", reportMessageObject, topic);
    }
    
    }
  • class KafkaProducerConfig*
@EnableKafka
@Configuration
public class KafkaProducerConfig {

    @Value(value = "${spring.kafka.bootstrap-servers}")
    private String kafkaAddress;
    
    @Value(value = "${spring.kafka.producer.group-id: null}")
    private String kafkaGroup;
    
    @Value("${kafka.configuration.security.enabled:false}")
    private boolean kafkaSecurityEnabled;
    
    @Value("${spring.kafka.properties.security.protocol:null}")
    private String kafkaSecurityProtocol;
    
    @Value("${spring.kafka.properties.sasl.mechanism:null}")
    private String kafkaSecurityMechanism;
    
    @Value("${spring.kafka.properties.sasl.jaas.config:null}")
    private String kafkaSecurityJaasConfig;
    
    @Bean
    public ProducerFactory<String, ReportMessageObject> producerFactorySmartMessaging() {
        return new DefaultKafkaProducerFactory<>(getKafkaProducerConfigProperties(kafkaGroup));
    }
    
    @Bean
    public KafkaTemplate<String, ReportMessageObject> kafkaMessaging() {
        return new KafkaTemplate<>(producerFactorySmartMessaging());
    }
    
    private Map<String, Object> getKafkaProducerConfigProperties(String groupId){
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaAddress);
        if(groupId != null && !groupId.isEmpty()) {
            configProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        }
    
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        configProps.put(JsonSerializer.TYPE_MAPPINGS, "log:[PROTECTED PATH].ReportMessageObject");
        configProps.put(ProducerConfig.ACKS_CONFIG, "all");
        
        if (kafkaSecurityEnabled) {
            configProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, kafkaSecurityProtocol);
            configProps.put(SaslConfigs.SASL_MECHANISM, kafkaSecurityMechanism);
            configProps.put(SaslConfigs.SASL_JAAS_CONFIG, kafkaSecurityJaasConfig);
        }
    
        return configProps;
    }

}
niwlg2el

niwlg2el1#

这个错误很明显:
java.lang.IllegalStateException:未找到线程绑定请求:你指的是实际Web请求之外的请求属性,还是在原始接收线程之外处理请求?如果你实际上是在一个web请求中操作,并且仍然收到这个消息,那么你的代码可能是在DispatcherServlet之外运行的:在这种情况下,使用RequestContextListener或RequestContextFilter公开当前请求。
当消息来自Kafka时,没有Web请求上下文,这只是Web应用程序的概念。
你不能在两个地方使用相同的代码。
您需要通过记录头传播任何此类信息,以使其可用于下游消费者(并从那里访问它,而不是试图访问Web上下文)。

相关问题