我们有Kafka流应用程序。制作者在将kafka消息发送到kafka流应用程序之前,正在kafka消息中添加头。
在Kafka流应用程序中,我们正在使用 AbstractProcessor
以及 context.forward(null, Optional.of(event));
将消息转发到另一个主题。
但头球丢了。我希望标题是从输入消息到输出主题的原样。 ProcessorContext
接口。 headers()
方法返回当前输入记录的头,但在我的示例中它是空的,尽管我正在发送带有头的消息。
* Returns the headers of the current input record; could be null if it is not available
* @return the headers
*/
Headers headers();
Kafka流api版本:2.3.1
1条答案
按热度按时间e4yzc0pl1#
context.headers()
应该叫他进来process()
如果使用处理器或transform()
如果使用变压器。