我有一个演示Spring Integration项目,它接收Kafka消息,聚合它们,然后发布它们。我正在尝试将JdbcMessageStore
添加到项目中。问题是它失败了,错误是:
Caused by: java.lang.IllegalArgumentException: Cannot store messages without an ID header
at org.springframework.util.Assert.notNull(Assert.java:201) ~[spring-core-5.2.15.RELEASE.jar:5.2.15.RELEASE]
at org.springframework.integration.jdbc.store.JdbcMessageStore.addMessage(JdbcMessageStore.java:314) ~[spring-integration-jdbc-5.3.8.RELEASE.jar:5.3.8.RELEASE]
调试后,我发现它需要此消息中的UUID头id
。但问题是我不能手动设置Kafka头ID -它是禁止的(与timestamp
头相同)-我试图在不同项目的Kafka producer中这样做。
如果我使用名为Big Data Tools
的IDEA插件并从那里发送消息,我可以设置id
头,但它被我的项目作为字节数组接收,并且失败并出错
IllegalArgumentException Incorrect type specified for header 'id'. Expected [UUID] but actual type is [B]
我找不到解决这个问题的方法。我需要以某种方式设置这个id
头,以便能够在数据库中存储消息。先谢谢你了
1条答案
按热度按时间wko9yo5t1#
KafkaMessageDrivenChannelAdapter
有一个选项:在这里,您可以使用以下命令设置
MessagingMessageConverter
:设置为
true
。这样,从
ConsumerRecord
创建的Message
将具有相应的id
和timestamp
头。您也可以简单地使用一个“虚拟”转换器来返回传入的有效负载,框架将创建一个新的
Message
,并在其中生成这些头文件。