在批处理模式下kafka头作为列表

6tr1vspr  于 2021-07-15  发布在  Kafka
关注(0)|答案(1)|浏览(500)

我试图访问一些标题,而消费的信息在批处理模式。如果我设置侦听器来处理 Message<?> 我可以手动提取标题

@KafkaListener(topics = "${kafka.topic}")
public void receive (List<Message<?> data, Acknowledgment ack) throws SQLException {
  for (int i = 0; i < data.size(); ++) {
    Object message = data.get(i).getPayload();
    MessageHeaders mh = data.get(i).getHeaders();
    Object value = mh.get("test");

我想为我做一些,但当我尝试

@KafkaListener(topics = "${kafka.topic}")
public voic receive (List<string> data,
  @Header (KafkaHeaders.OFFSET) List<Integer> offsets,
  @Header ("test") List<String> testHeaders,
  Acknowledgment ack) throws SQLException {

我明白了 MessageHandlingException: Missing header 'test' for method parameter type [interface.java.util.list] 但是,此方法对于偏移标头很有效。
这是因为有内置代码来处理标准头,而这种方法不能用于自定义头,还是我遗漏了一些使这种方法起作用的东西?

kxe2p93d

kxe2p93d1#

是的,框架只Map它知道的头文件;它将所有其他Map头放入

/**
 * The header for a list of Maps of converted native Kafka headers. Used for batch
 * listeners; the map at a particular list position corresponds to the data in the
 * payload list position.
 */
public static final String BATCH_CONVERTED_HEADERS = PREFIX + "batchConvertedHeaders";

如果您想为该头创建一个离散Map,则需要创建一个自定义Map BatchMessageConverter -可能是 BatchMessagingMessageConverter .
最简单的方法可能是重写这个方法,调用 super.toMessage() 然后添加标题。

return MessageBuilder.fromMessage(super.toMessage(...))
    .setHeader("test", ...)
    .build();

如果您使用的是springboot,只需将转换器添加为bean,boot就会将其连接进来。
否则,将转换器添加到容器工厂。
编辑
如果消息转换器没有头Map器;所有标题都放在标题中 KafkaHeaders.NATIVE_HEADERS 这是一个 List<Headers> .

相关问题