我想知道以原子方式处理关于kafka主题的消息的推荐方法,例如,假设kafka producer正在发布多个密钥为k1、k2、k3的消息,现在我想以原子方式处理它们,并让我的应用程序一起知道这些消息。为了实现这一点,可能需要一些变通方法,例如passid和count以及需要一起处理的所有消息,以便客户机等待,直到它接收到同一组的所有消息。有没有其他推荐的方法来解决kafka的此类问题,即原子化地处理一批消息的能力,以便在密钥之间保持一致性。Kafka消费者提供这样的能力吗?
我想知道以原子方式处理关于kafka主题的消息的推荐方法,例如,假设kafka producer正在发布多个密钥为k1、k2、k3的消息,现在我想以原子方式处理它们,并让我的应用程序一起知道这些消息。为了实现这一点,可能需要一些变通方法,例如passid和count以及需要一起处理的所有消息,以便客户机等待,直到它接收到同一组的所有消息。有没有其他推荐的方法来解决kafka的此类问题,即原子化地处理一批消息的能力,以便在密钥之间保持一致性。Kafka消费者提供这样的能力吗?
1条答案
按热度按时间zy1mlcev1#
生产时要解决的方面
在kafka或任何其他消息代理中,不存在跨越生产者和消费者的原子性。
因此,当生产者发送消息时,必须在消息中包含某种关联id,以便消费者知道哪些消息属于同一组。
但是知道哪些消息属于同一个组对于消费者来说还不够,它还应该知道什么时候应该考虑属于某个特定组的消息已经被完全收集,这样它就可以开始处理一个组了。到目前为止,例如,如果是固定大小的组,则不需要将任何组大小作为消息的一部分发送,否则您需要在生成的消息中添加组大小或其他内容,以向使用者发出组已完成的信号。
您还应该将属于同一组的消息生成到同一分区。
现在你的基本需求得到了满足,剩下的工作你可以选择不同的道路。
例如,您可以使用
camel-kafka
以及aggregator
eip使用这个主题并写入不同的主题,其中每个记录都是整个组消息,然后您就知道您可以原子地使用这个主题