我试图访问一些标题,而消费的信息在批处理模式。如果我设置侦听器来处理 Message<?>
我可以手动提取标题
@KafkaListener(topics = "${kafka.topic}")
public void receive (List<Message<?> data, Acknowledgment ack) throws SQLException {
for (int i = 0; i < data.size(); ++) {
Object message = data.get(i).getPayload();
MessageHeaders mh = data.get(i).getHeaders();
Object value = mh.get("test");
我想为我做一些,但当我尝试
@KafkaListener(topics = "${kafka.topic}")
public voic receive (List<string> data,
@Header (KafkaHeaders.OFFSET) List<Integer> offsets,
@Header ("test") List<String> testHeaders,
Acknowledgment ack) throws SQLException {
我明白了 MessageHandlingException: Missing header 'test' for method parameter type [interface.java.util.list]
但是,此方法对于偏移标头很有效。
这是因为有内置代码来处理标准头,而这种方法不能用于自定义头,还是我遗漏了一些使这种方法起作用的东西?
1条答案
按热度按时间kxe2p93d1#
是的,框架只Map它知道的头文件;它将所有其他Map头放入
如果您想为该头创建一个离散Map,则需要创建一个自定义Map
BatchMessageConverter
-可能是BatchMessagingMessageConverter
.最简单的方法可能是重写这个方法,调用
super.toMessage()
然后添加标题。如果您使用的是springboot,只需将转换器添加为bean,boot就会将其连接进来。
否则,将转换器添加到容器工厂。
编辑
如果消息转换器没有头Map器;所有标题都放在标题中
KafkaHeaders.NATIVE_HEADERS
这是一个List<Headers>
.