我正在尝试添加一个拦截器来验证生产者发布到Kafka主题的消息。除了由kafka主题执行的模式验证之外,我还需要做一些验证。我遵循的步骤如下。
我已经编写了一个java类来扩展producerinterceptor接口。
编译类并创建一个jar文件,该文件放在类路径中包含的文件夹中。
将intercetors.classes=classname添加到kafka安装中的producer.properties中。
但当我将消息发布到主题时,不会调用我编写的自定义拦截器类我没有得到任何错误也。消息被发布到主题中)。
我已经提到https://cwiki.apache.org/confluence/display/kafka/kip-42%3a+add+producer+and+consumer+interceptors
请给我一些建议。
2条答案
按热度按时间fiei3ece1#
属性名是interceptor.classes,而不是interceptors.classes
t1qtbnec2#
这个问题已经很老了,所以我想你同时也找到了解决办法。然而,为了以防万一,我发现我的
ProducerInterceptor
类,它根据消息的内容将消息分派到不同的主题,除非我的流已经有了指定的输出,否则不会被调用。我的第一次尝试看起来像这样,因为我认为我不需要指定输出主题。这不起作用:
但事实上:
值得一提的是,没有什么可以发表的
dummy-output-topic
在第二个例子中,使用to
而不是through
似乎也是这样。就我而言,我是在
map
在使用拦截器将记录分派到不同的主题之前更改记录,因此我的代码实际上更像这样:我希望这些例子能帮助任何与我一起工作的人
ProducerInterceptor
他犯了和我一样的错误。