apache-kafka ApacheNifi ConsumeKafka没有阅读主题

hivapdat  于 2022-11-01  发布在  Apache
关注(0)|答案(1)|浏览(137)

我将从Apache Kafka中的某个主题中获取消息,并将它们记录在Oracle数据库中。问题是ConsumeKafka_1_0 1.17.0处理器没有阅读Kafka主题中的消息。
我尝试使用ConsumeKafka_1_0、ConsumeKafka_2_0和ConsumeKafka_2_6的版本;
我将“偏移重置”设置为“最新”或“最早”;
我修改了每个请求的组ID;
我将“荣誉事务”设置为True或False;
我的Kafka服务器在同一个网络上,并且不使用Kerberos或TSL票证的任何凭证。已在默认端口9092上配置,并在另一台服务器上使用Zookeper。
我访问了Broker服务器,其中包含主题,并成功执行了列出主题和消息的命令。
我工作的公司的其他流程使用Kafka没有任何问题。
是否有人遇到过此问题并设法解决了它?

o2rvlv0m

o2rvlv0m1#

我尝试使用ConsumeKafka_1_0、ConsumeKafka_2_0和ConsumeKafka_2_6的版本
取决于您使用的Kafka版本。如果您使用的Kafka broker版本高于2.6,则使用该版本。如果您的版本高于2.0,但低于2.6,则使用2.0......依此类推。
将“偏移重置”设置为“最新”或“最早”
取决于您是否关心现有数据。如果是,请使用earliest。
修改了每个请求上的组ID
这是一个很好的调试步骤,但不是所有时候都需要。如果组实际上是在Kafka群集中创建的,您可以使用kafka-consumer-groups.sh命令进一步调试。
将事务处理设为True或False
取决于您的生成器是否使用事务。如果您将此设置为True,而您的生成器不使用它们,则不确定NiFi将执行什么操作...不过,将此设置保留为True可能是安全的。

包含最新Kafka作品的示例

对于纯文本Kafka协议

  • 安全.协议=明文
  • sasl.mechanism =PLAIN(无法禁用,因此需要用户名+密码)
  • 用户名:某个非空字符串
  • 密码:某个非空字符串

用于简单的Kafka消费者调试

  • 记录读取器:GrokReader
  • 使用Grok表达式进行配置:%{GREEDYDATA:message}

对于内部Nifi FlowFile输出(可以使用任何格式,但JSON适合验证您的步骤是否有效)

  • 记录作者:JSONRecordSetWriter

parse.failure添加新的处理器(或在“关系”选项卡下终止它),并通过单击并从“使用处理器”中拖动来设置success关系。您可以禁用下一个处理器,以便在NiFi中简单地将FlowFiles排队。
然后,启动Consume处理器,并检查队列中的数据。
生产者命令:

echo 'Hello, World' | kcat -P -b localhost:9092 -t foobar

相关问题