我正在管理一个kafka队列,该队列使用跨多台计算机的公共使用者组。现在我还需要显示队列的当前内容。如何仅读取组中那些尚未读取的消息,同时使那些消息再次被组中实际处理这些消息的其他使用者读取。任何帮助都将不胜感激。
unhi4e5o1#
Kafka现在有了 KStream.peek() 方法参见提案“添加kstream peek方法”。从文档中我并不100%清楚这会阻止使用从主题中窥视到的消息,但我看不出您如何以任何安全可靠的方式使用它,除非它这样做。另请参见:在实现同步自动偏移提交时处理使用者重新平衡高级消费者和偷窥信息
KStream.peek()
fcg9iug32#
我认为你可以使用发布-订阅模式。然后,每个使用者都有自己的偏移量,可以为自己使用所有消息。
b5lpy0ml3#
在Kafka看来,“阅读”某个主题的信息和“消费”这些信息是同一回事。在较高的级别上,唯一使“已消费”消息对使用者不可用的是使用者将其读取偏移量设置为超出所讨论消息的值。因此,您可以关闭使用者的自动提交功能,并避免在只想“读取”而不想“消费”的情况下提交偏移量。获取“所有尚未读取的消息”的一个好代理是将最新提交的偏移量与每个分区的高水位线偏移量进行比较。这提供了一个“滞后”的概念,它表示给定的使用者在分区的使用方面落后了多少。这个 fetch_consumer_lag pykafka中的cli函数就是一个很好的例子。
fetch_consumer_lag
dsf9zpds4#
在kafka中,一个分区只能由一个组中的一个使用者使用,也就是说,如果您的主题有10个分区,并且您生成了20个具有相同groupid的使用者,那么只有10个将连接到kafka,其余10个将处于空闲状态。只有在现有消费者中的一个死亡或没有从主题中投票的情况下,Kafka才会确定一个新的消费者。好吧,我不认为你能像我理解的那样在一个消费群体里做你想做的事。显然,您可以创建另一个groupid并根据第一个使用者组收集的信息处理消息。
4条答案
按热度按时间unhi4e5o1#
Kafka现在有了
KStream.peek()
方法参见提案“添加kstream peek方法”。
从文档中我并不100%清楚这会阻止使用从主题中窥视到的消息,但我看不出您如何以任何安全可靠的方式使用它,除非它这样做。
另请参见:
在实现同步自动偏移提交时处理使用者重新平衡
高级消费者和偷窥信息
fcg9iug32#
我认为你可以使用发布-订阅模式。然后,每个使用者都有自己的偏移量,可以为自己使用所有消息。
b5lpy0ml3#
在Kafka看来,“阅读”某个主题的信息和“消费”这些信息是同一回事。在较高的级别上,唯一使“已消费”消息对使用者不可用的是使用者将其读取偏移量设置为超出所讨论消息的值。因此,您可以关闭使用者的自动提交功能,并避免在只想“读取”而不想“消费”的情况下提交偏移量。
获取“所有尚未读取的消息”的一个好代理是将最新提交的偏移量与每个分区的高水位线偏移量进行比较。这提供了一个“滞后”的概念,它表示给定的使用者在分区的使用方面落后了多少。这个
fetch_consumer_lag
pykafka中的cli函数就是一个很好的例子。dsf9zpds4#
在kafka中,一个分区只能由一个组中的一个使用者使用,也就是说,如果您的主题有10个分区,并且您生成了20个具有相同groupid的使用者,那么只有10个将连接到kafka,其余10个将处于空闲状态。只有在现有消费者中的一个死亡或没有从主题中投票的情况下,Kafka才会确定一个新的消费者。
好吧,我不认为你能像我理解的那样在一个消费群体里做你想做的事。显然,您可以创建另一个groupid并根据第一个使用者组收集的信息处理消息。