java—使用SpringKafka自定义转换ApacheKafka头

wn9m85ua  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(535)

我在kafkalistener中使用springboot2.3.0和springkafka2.5.0,我试图将messageheadersMap到一个自定义类。下面的代码可以工作,但给出了byte[]中的头,然后我必须将其转换为侦听器中的类(并对每个侦听器重复),我希望避免这样做。

@Slf4j
    @Component
    @KafkaListener(topics = {"${spring.kafka.topics.simple}"}, groupId = "consumerGroup", 
    containerFactory = "kafkaListenerContainerFactory")
    public class RequestConsumer {

       @KafkaHandler
       public void listen(@Payload CustomerDetails customerDetails, @Header("sec") byte[] 
       principle, @Headers MessageHeaders messageHeaders) {
           log.info("Received a CustomerDetails");
       }

       @KafkaHandler(isDefault = true)
       public void listen(@Payload(required = false) GenericRecord object, @Headers 
       MessageHeaders messageHeaders) {
           log.info("Received an unexpected object");
       }
    }

当我把代码改成这样的时候:

@KafkaHandler
    public void listen(@Payload CustomerDetails customerDetails, @Header("sec") PreAuthenticatedAuthenticationToken principle, @Headers MessageHeaders messageHeaders) {
        log.info("Received a CustomerDetails");
    }

侦听器将中断此错误:
org.springframework.kafka.listener.listenerexecutionfailedexception:侦听器方法“public void com.example.demo.consumer.requestconsumer.listen(com.example.schemas.customerdetails,org.springframework.security.web.authentication.preauth.preauthenticatedauthenticationtoken,org.springframework.messaging.messageheaders)”引发异常;嵌套异常为org.springframework.core.convert.converternotfoundexception:找不到能够从类型[byte[]]转换为类型[@org.springframework.messaging.handler.annotation.header org.springframework.security.web.authentication.preauthenticatedauthenticationtoken];嵌套异常为org.springframework.core.convert.converternotfoundexception:找不到能够从类型[byte[]]转换为类型[@org.springframework.messaging.handler.annotation.header org.springframework.security.web.authentication.preauth.preauthenticatedauthenticationtoken]
我已经看了一遍了https://docs.spring.io/spring-kafka/reference/html/#headers 讨论从头到消息头的头Map的文档,反之亦然,但没有转换这些头的示例。这个错误似乎表明我可以注册一个转换器,但没有发现,尽管谷歌搜索了很多次,并试图逐步通过SpringKafka的代码。
在此方面的帮助将不胜感激。
干杯,奥斯卡

ryevplcw

ryevplcw1#

我一直在努力,我找到了一个合适的方法来实现这一点。虽然我没有找到转换头的方法,但是可以编写一个handlermethodargumentresolver,允许我们在侦听器中使用自定义对象。在这段代码中,很容易将头转换为preauthenticatedauthenticationtoken对象,然后我可以直接在侦听器中使用它。
更多详细信息:https://docs.spring.io/spring-kafka/docs/2.5.3.release/reference/html/#adding-kafkalistener的自定义handlermethodargumentresolver
确认有效。
干杯。

相关问题