在Kafka,有没有什么方法可以在几个相关的信息被消费后产生一个信息(无需在应用程序代码处手动控制……)用例是选择一个巨大的文件,将它分成几个块,在一个主题中为每个块发布一条消息,一旦所有这些消息都被使用,就会生成另一条消息,通知另一个主题的结果。我们可以用一个数据库或者redis来控制国家,但我想知道是否有更高层次的方法只利用Kafka的生态系统。
yuvru6vn1#
你可以用 ConsumerGroupCommand 要检查特定使用者组是否已完成处理特定主题中的所有邮件,请执行以下操作: $ kafka-consumer-groups --bootstrap-server broker_host:port --describe --group chunk_consumer 或 $ kafka-run-class kafka.admin.ConsumerGroupCommand ... 每个分区的零延迟将指示消息已被成功使用,以及使用者提交的偏移量。或者,您可以选择订阅 __consumer_offsets 主题和处理来自它的消息,但是使用 ConsumerGroupCommand 似乎是一个更直接的解决方案。
ConsumerGroupCommand
$ kafka-consumer-groups --bootstrap-server broker_host:port --describe --group chunk_consumer
$ kafka-run-class kafka.admin.ConsumerGroupCommand ...
__consumer_offsets
qlckcl4x2#
方法如下:在使用每个区块后,应用程序应该生成状态为(consumered和chunk number)的消息第二个应用程序(kafka streams once)应该聚合结果,当处理包含所有块的消息产生最终消息时,该文件将被处理。
2条答案
按热度按时间yuvru6vn1#
你可以用
ConsumerGroupCommand
要检查特定使用者组是否已完成处理特定主题中的所有邮件,请执行以下操作:$ kafka-consumer-groups --bootstrap-server broker_host:port --describe --group chunk_consumer
或$ kafka-run-class kafka.admin.ConsumerGroupCommand ...
每个分区的零延迟将指示消息已被成功使用,以及使用者提交的偏移量。或者,您可以选择订阅
__consumer_offsets
主题和处理来自它的消息,但是使用ConsumerGroupCommand
似乎是一个更直接的解决方案。qlckcl4x2#
方法如下:
在使用每个区块后,应用程序应该生成状态为(consumered和chunk number)的消息
第二个应用程序(kafka streams once)应该聚合结果,当处理包含所有块的消息产生最终消息时,该文件将被处理。