我已经启动了一个nifi进程(使用kafka)并将其与一个主题相连接。它正在运行,但我无法(不知道)在哪里可以查看消息?
cwdobuhd1#
您可能需要使用消息 --from-beginning 如果这些消息以前已被使用(因此已提交偏移量)。在getkafka处理器上,有一个属性auto offset reset,它应该设置为与 --from-beginning 在Kafka控制台消费者。
--from-beginning
ve7v8dk22#
Kafka处理器为每条消息运行并生成流文件。只有当您将一个处理器连接到其他组件(如另一个处理器或输出端口)时,您才能可视化正在移动的数据。对于初学者,您可以尝试以下方法:连接 ConsumeKafka 与 LogAttribute 或者其他处理器。停止或禁用 LogAttribute 处理器。现在当你开始 ConsumeKafka ,所有从配置的kafka主题接收的消息将以流文件的形式排队。右键单击流文件排队的关系,然后单击 List Queue 您可以访问队列。单击队列上的任何项目,将出现上下文菜单。点击 View 按钮,您可以看到数据。“查看”kafka消息的整个解释只是为了帮助您调试和开始使用nifi。理想情况下,您将使用其他nifi处理器来解决您的用例。例子您收到来自kafka的消息,并希望将其写入mongodb,因此您可以将流程设置为:注意:有基于记录的处理器,如 ConsumeKafkaRecord 以及 PutMongoRecord 但他们基本上是做同样的事情,更多的增强。既然你是新来的,我建议一个简单的流程。你可以找到关于 Record 在这里尝试一下。
ConsumeKafka
LogAttribute
List Queue
View
ConsumeKafkaRecord
PutMongoRecord
Record
2条答案
按热度按时间cwdobuhd1#
您可能需要使用消息
--from-beginning
如果这些消息以前已被使用(因此已提交偏移量)。在getkafka处理器上,有一个属性auto offset reset,它应该设置为与
--from-beginning
在Kafka控制台消费者。ve7v8dk22#
Kafka处理器为每条消息运行并生成流文件。只有当您将一个处理器连接到其他组件(如另一个处理器或输出端口)时,您才能可视化正在移动的数据。
对于初学者,您可以尝试以下方法:
连接
ConsumeKafka
与LogAttribute
或者其他处理器。停止或禁用
LogAttribute
处理器。现在当你开始
ConsumeKafka
,所有从配置的kafka主题接收的消息将以流文件的形式排队。右键单击流文件排队的关系,然后单击
List Queue
您可以访问队列。单击队列上的任何项目,将出现上下文菜单。点击
View
按钮,您可以看到数据。“查看”kafka消息的整个解释只是为了帮助您调试和开始使用nifi。理想情况下,您将使用其他nifi处理器来解决您的用例。
例子
您收到来自kafka的消息,并希望将其写入mongodb,因此您可以将流程设置为:
注意:有基于记录的处理器,如
ConsumeKafkaRecord
以及PutMongoRecord
但他们基本上是做同样的事情,更多的增强。既然你是新来的,我建议一个简单的流程。你可以找到关于Record
在这里尝试一下。