简要说明:
我正在开发一个微服务(MS-1),它应该在某个时候通过REST操作从另一个微服务(MS-2)获取信息。
MS-1有一个公共端点,还可以监听来自第三个微服务(MS-3)的Kafka消息。此外,端点将被删除,MS-1将只监听来自MS-2的Kafka消息。
问题:
当通过端点调用MS-1时,一切都很好,它正确地执行了所需的REST操作- MS-1可以调用并从MS-2获得正确的返回。
但是,当相同的请求通过Kafka消息到达MS-1时,我的意思是,当MS-1监听MS-3发送的消息时,当MS-1需要通过REST请求调用MS-2时,它会引发以下异常- MS-1 CANNOT CALL MS-2。
例外情况:
java.lang.IllegalStateException: No thread-bound request found: Are you referring to request attributes outside of an actual web request, or processing a request outside of the originally receiving thread? If you are actually operating within a web request and still receive this message, your code is probably running outside of DispatcherServlet: In this case, use RequestContextListener or RequestContextFilter to expose the current request.
at org.springframework.web.context.request.RequestContextHolder.currentRequestAttributes(RequestContextHolder.java:131) ~[spring-web-6.0.10.jar:6.0.10] at org.springframework.web.context.support.WebApplicationContextUtils.currentRequestAttributes(WebApplicationContextUtils.java:313) ~[spring-web-6.0.10.jar:6.0.10]
at org.springframework.web.context.support.WebApplicationContextUtils$SessionObjectFactory.getObject(WebApplicationContextUtils.java:370) ~[spring-web-6.0.10.jar:6.0.10]
at org.springframework.web.context.support.WebApplicationContextUtils$SessionObjectFactory.getObject(WebApplicationContextUtils.java:365) ~[spring-web-6.0.10.jar:6.0.10]
at org.springframework.beans.factory.support.AutowireUtils$ObjectFactoryDelegatingInvocationHandler.invoke(AutowireUtils.java:283) ~[spring-beans-6.0.10.jar:6.0.10]
at jdk.proxy2/jdk.proxy2.$Proxy190.getAttribute(Unknown Source) ~[na:na]
at [PROTECTED PATH].getHeaders(RestProxyService.java:65) ~[classes/:na]
at [PROTECTED PATH].restExchange(RestProxyService.java:431) ~[classes/:na]
at [PROTECTED PATH].callRemoteApi(RestProxyService.java:346) ~[classes/:na]
at [PROTECTED PATH].callRestApi(RestProxyService.java:426) ~[classes/:na]
at [PROTECTED PATH].getDetail(MyProxy.java:57) ~[classes/:na]
at [PROTECTED PATH].entriesToReport(EntityWithDataService.java:75) ~[classes/:na] at [PROTECTED PATH].getReportObjects(SupportedReportCodes.java:69) ~[classes/:na]
at [PROTECTED PATH].generate(ReportService.java:72) ~[classes/:na]
at [PROTECTED PATH].receive(KafkaConsumer.java:25) ~[classes/:na]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[na:na]
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
at java.base/java.lang.reflect.Method.invoke(Method.java:568) ~[na:na]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:169) ~[spring-messaging-6.0.10.jar:6.0.10]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:119) ~[spring-messaging-6.0.10.jar:6.0.10]
at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:56) ~[spring-kafka-3.0.8.jar:3.0.8]
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:375) ~[spring-kafka-3.0.8.jar:3.0.8]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:92) ~[spring-kafka-3.0.8.jar:3.0.8]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:53) ~[spring-kafka-3.0.8.jar:3.0.8]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2924) ~[spring-kafka-3.0.8.jar:3.0.8]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2904) ~[spring-kafka-3.0.8.jar:3.0.8]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.lambda$doInvokeRecordListener$58(KafkaMessageListenerContainer.java:2822) ~[spring-kafka-3.0.8.jar:3.0.8]
at io.micrometer.observation.Observation.lambda$observe$4(Observation.java:544) ~[micrometer-observation-1.11.1.jar:1.11.1]
at io.micrometer.observation.Observation.observeWithContext(Observation.java:603) ~[micrometer-observation-1.11.1.jar:1.11.1]
at io.micrometer.observation.Observation.observe(Observation.java:544) ~[micrometer-observation-1.11.1.jar:1.11.1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2820) ~[spring-kafka-3.0.8.jar:3.0.8]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2672) ~[spring-kafka-3.0.8.jar:3.0.8]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2558) ~[spring-kafka-3.0.8.jar:3.0.8]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:2200) ~[spring-kafka-3.0.8.jar:3.0.8]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1555) ~[spring-kafka-3.0.8.jar:3.0.8]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1519) ~[spring-kafka-3.0.8.jar:3.0.8]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1394) ~[spring-kafka-3.0.8.jar:3.0.8]
at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
我已经改变了一些配置来解决它。现在,我的application.properties和类是这样的:
CONSUMER(listener)-> MS-1
- 应用程序.属性 *
kafka.security.enabled=false
spring.kafka.bootstrap-servers=[PROTECTED]
#spring.kafka.producer.topic=report
spring.kafka.consumer.group-id=reports
spring.kafka.consumer.topic=report
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.kafka.consumer.properties.spring.deserializer.value.delegate.class=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*
spring.kafka.consumer.properties.spring.json.type.mapping=log:[PROTECTED PATH].ReportMessageObject
- class KafkaConsumer*
@Component
@Slf4j
public class KafkaConsumer {
@Autowired
private MyService myService;
@KafkaListener(topics = "${spring.kafka.consumer.topic}", errorHandler = "kafkaEventErrorHandler", properties = "{spring.json.value.default.type=[PROTECTED PATH].ReportMessageObject}")
public void receive(@Payload @Valid ConsumerRecord<String, ReportMessageObject> payload) {
try {
log.debug("received payload={}", payload.toString());
myService.generate(payload.value().getCompanyId(), payload.value().getBody());
} catch (Exception e) {
log.error("Exception occurred while consuming ExternalInterface message {}", e.getMessage(), e);
}
}
}
- class KafkaConsumerConfig*
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Value(value = "${spring.kafka.bootstrap-servers}")
private String kafkaAddress;
@Value(value = "${spring.kafka.consumer.group-id: null}")
private String kafkaGroup;
@Value("${kafka.configuration.security.enabled:false}")
private boolean kafkaSecurityEnabled;
@Value("${spring.kafka.properties.security.protocol:null}")
private String kafkaSecurityProtocol;
@Value("${spring.kafka.properties.sasl.mechanism:null}")
private String kafkaSecurityMechanism;
@Value("${spring.kafka.properties.sasl.jaas.config:null}")
private String kafkaSecurityJaasConfig;
@Bean
public ConsumerFactory<String, ReportMessageObject> responseConsumerFactory() {
return new DefaultKafkaConsumerFactory<>(getKafkaConsumerConfigProperties(kafkaGroup), new StringDeserializer(), new JsonDeserializer<>(ReportMessageObject.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, ReportMessageObject> responseKafkaListenerFactory() {
ConcurrentKafkaListenerContainerFactory<String, ReportMessageObject> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(responseConsumerFactory());
return factory;
}
private Map<String, Object> getKafkaConsumerConfigProperties(String groupId) {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaAddress);
if(groupId != null && !groupId.isEmpty()) {
configProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
}
if (kafkaSecurityEnabled) {
configProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, kafkaSecurityProtocol);
configProps.put(SaslConfigs.SASL_MECHANISM, kafkaSecurityMechanism);
configProps.put(SaslConfigs.SASL_JAAS_CONFIG, kafkaSecurityJaasConfig);
}
return configProps;
}
}
生产商-> MS-3
- 应用程序.属性 *
kafka.security.enabled=false
spring.kafka.bootstrap-servers=[PROTECTED PATH]
spring.kafka.producer.topic=report
- class KafkaProducer*
@Slf4j
@Component
public class KafkaProducer {
@Autowired
private KafkaTemplate<String, ReportMessageObject> kafkaTemplate;
@Autowired
private SmartKafkaHeader smartKafkaHeader;
private String property1;
private String property2;
private String property3;
private String property4;
public void addHeaders(String property1, String property2, String property3, String property4) {
this.property1= property1;
this.property2= property2;
this.property3= property3;
this.property4= property4;
}
public void send(String topic, ReportMessageObject reportMessageObject) {
ProducerRecord<String, ReportMessageObject> message = new ProducerRecord<>(topic, reportMessageObject);
smartKafkaHeader.addHeadersToRecord(message, property1, property2, property3, property4);
kafkaTemplate.send(message);
kafkaTemplate.flush();
log.info("sending payload='{}' to topic='{}'", reportMessageObject, topic);
}
}
- class KafkaProducerConfig*
@EnableKafka
@Configuration
public class KafkaProducerConfig {
@Value(value = "${spring.kafka.bootstrap-servers}")
private String kafkaAddress;
@Value(value = "${spring.kafka.producer.group-id: null}")
private String kafkaGroup;
@Value("${kafka.configuration.security.enabled:false}")
private boolean kafkaSecurityEnabled;
@Value("${spring.kafka.properties.security.protocol:null}")
private String kafkaSecurityProtocol;
@Value("${spring.kafka.properties.sasl.mechanism:null}")
private String kafkaSecurityMechanism;
@Value("${spring.kafka.properties.sasl.jaas.config:null}")
private String kafkaSecurityJaasConfig;
@Bean
public ProducerFactory<String, ReportMessageObject> producerFactorySmartMessaging() {
return new DefaultKafkaProducerFactory<>(getKafkaProducerConfigProperties(kafkaGroup));
}
@Bean
public KafkaTemplate<String, ReportMessageObject> kafkaMessaging() {
return new KafkaTemplate<>(producerFactorySmartMessaging());
}
private Map<String, Object> getKafkaProducerConfigProperties(String groupId){
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaAddress);
if(groupId != null && !groupId.isEmpty()) {
configProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
}
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
configProps.put(JsonSerializer.TYPE_MAPPINGS, "log:[PROTECTED PATH].ReportMessageObject");
configProps.put(ProducerConfig.ACKS_CONFIG, "all");
if (kafkaSecurityEnabled) {
configProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, kafkaSecurityProtocol);
configProps.put(SaslConfigs.SASL_MECHANISM, kafkaSecurityMechanism);
configProps.put(SaslConfigs.SASL_JAAS_CONFIG, kafkaSecurityJaasConfig);
}
return configProps;
}
}
1条答案
按热度按时间niwlg2el1#
这个错误很明显:
java.lang.IllegalStateException:未找到线程绑定请求:你指的是实际Web请求之外的请求属性,还是在原始接收线程之外处理请求?如果你实际上是在一个web请求中操作,并且仍然收到这个消息,那么你的代码可能是在DispatcherServlet之外运行的:在这种情况下,使用RequestContextListener或RequestContextFilter公开当前请求。
当消息来自Kafka时,没有Web请求上下文,这只是Web应用程序的概念。
你不能在两个地方使用相同的代码。
您需要通过记录头传播任何此类信息,以使其可用于下游消费者(并从那里访问它,而不是试图访问Web上下文)。