我在kafkalistener中使用springboot2.3.0和springkafka2.5.0,我试图将messageheadersMap到一个自定义类。下面的代码可以工作,但给出了byte[]中的头,然后我必须将其转换为侦听器中的类(并对每个侦听器重复),我希望避免这样做。
@Slf4j
@Component
@KafkaListener(topics = {"${spring.kafka.topics.simple}"}, groupId = "consumerGroup",
containerFactory = "kafkaListenerContainerFactory")
public class RequestConsumer {
@KafkaHandler
public void listen(@Payload CustomerDetails customerDetails, @Header("sec") byte[]
principle, @Headers MessageHeaders messageHeaders) {
log.info("Received a CustomerDetails");
}
@KafkaHandler(isDefault = true)
public void listen(@Payload(required = false) GenericRecord object, @Headers
MessageHeaders messageHeaders) {
log.info("Received an unexpected object");
}
}
当我把代码改成这样的时候:
@KafkaHandler
public void listen(@Payload CustomerDetails customerDetails, @Header("sec") PreAuthenticatedAuthenticationToken principle, @Headers MessageHeaders messageHeaders) {
log.info("Received a CustomerDetails");
}
侦听器将中断此错误:
org.springframework.kafka.listener.listenerexecutionfailedexception:侦听器方法“public void com.example.demo.consumer.requestconsumer.listen(com.example.schemas.customerdetails,org.springframework.security.web.authentication.preauth.preauthenticatedauthenticationtoken,org.springframework.messaging.messageheaders)”引发异常;嵌套异常为org.springframework.core.convert.converternotfoundexception:找不到能够从类型[byte[]]转换为类型[@org.springframework.messaging.handler.annotation.header org.springframework.security.web.authentication.preauthenticatedauthenticationtoken];嵌套异常为org.springframework.core.convert.converternotfoundexception:找不到能够从类型[byte[]]转换为类型[@org.springframework.messaging.handler.annotation.header org.springframework.security.web.authentication.preauth.preauthenticatedauthenticationtoken]
我已经看了一遍了https://docs.spring.io/spring-kafka/reference/html/#headers 讨论从头到消息头的头Map的文档,反之亦然,但没有转换这些头的示例。这个错误似乎表明我可以注册一个转换器,但没有发现,尽管谷歌搜索了很多次,并试图逐步通过SpringKafka的代码。
在此方面的帮助将不胜感激。
干杯,奥斯卡
1条答案
按热度按时间ryevplcw1#
我一直在努力,我找到了一个合适的方法来实现这一点。虽然我没有找到转换头的方法,但是可以编写一个handlermethodargumentresolver,允许我们在侦听器中使用自定义对象。在这段代码中,很容易将头转换为preauthenticatedauthenticationtoken对象,然后我可以直接在侦听器中使用它。
更多详细信息:https://docs.spring.io/spring-kafka/docs/2.5.3.release/reference/html/#adding-kafkalistener的自定义handlermethodargumentresolver
确认有效。
干杯。