在kafka 0.11中,向记录(producerrecord和consumerrecord)添加了头文件,在处理带有kafka流的主题时,是否可以获得这些头文件?调用方法时,例如 map
在 KStream
它提供了 key
以及 value
但我看不到 headers
. 如果我们能 map
超过 ConsumerRecord
s。
前任。
KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> stream = kStreamBuilder.stream("some-topic");
stream
.map((key, value) -> ... ) // can I get access to headers in methods like map, filter, aggregate, etc?
...
像这样的方法会奏效:
KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> stream = kStreamBuilder.stream("some-topic");
stream
.map((record) -> {
record.headers();
record.key();
record.value();
})
...
1条答案
按热度按时间vvppvyoh1#
从streams api的2.0版开始,记录头就可以访问(详见kip-244。)
您可以通过处理器api(即,通过
transform()
,transformValues()
,或process()
),通过给定的“上下文”对象(参见。https://docs.confluent.io/current/streams/developer-guide/processor-api.html#accessing-处理器上下文)。在2.0之前,上下文只公开主题、分区、偏移量和时间戳,而不公开在那些旧版本中实际上由流在读取时丢弃的头。
不过,在dsl级别元数据是不可用的。然而,扩展dsl的工作也在进行中:https://cwiki.apache.org/confluence/display/kafka/kip-159%3a+introducing+rich+functions+to+streams