java—是否可以使用kafka流访问消息头?

monwx1rj  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(284)

在kafka 0.11中,向记录(producerrecord和consumerrecord)添加了头文件,在处理带有kafka流的主题时,是否可以获得这些头文件?调用方法时,例如 mapKStream 它提供了 key 以及 value 但我看不到 headers . 如果我们能 map 超过 ConsumerRecord s。
前任。

KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> stream = kStreamBuilder.stream("some-topic");
stream
    .map((key, value) ->  ... ) // can I get access to headers in methods like map, filter, aggregate, etc?
    ...

像这样的方法会奏效:

KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> stream = kStreamBuilder.stream("some-topic");
stream
    .map((record) -> {
        record.headers();
        record.key();
        record.value();
    })
    ...
vvppvyoh

vvppvyoh1#

从streams api的2.0版开始,记录头就可以访问(详见kip-244。)
您可以通过处理器api(即,通过 transform() , transformValues() ,或 process() ),通过给定的“上下文”对象(参见。https://docs.confluent.io/current/streams/developer-guide/processor-api.html#accessing-处理器上下文)。
在2.0之前,上下文只公开主题、分区、偏移量和时间戳,而不公开在那些旧版本中实际上由流在读取时丢弃的头。
不过,在dsl级别元数据是不可用的。然而,扩展dsl的工作也在进行中:https://cwiki.apache.org/confluence/display/kafka/kip-159%3a+introducing+rich+functions+to+streams

相关问题