- 使用案例**
有一个上游系统通过Kafka生成数据批次(一个批次平均有1千万条记录)。
起始偏移量和预期记录计数通过一个主题(控件),而数据本身通过另一个主题(数据)。
在生产批处理时,可能会周期性地重复某些记录,从而导致预期记录与控制主题上发送的记录不匹配。
每条记录都有唯一的ID。
- 问题**
由于重复的记录没有包含在预期的记录计数中,我真的不知道什么时候停止处理数据,而且,我希望在反序列化之前跳过那些重复的记录,以加快处理速度。
- 建议的解决方案**
我建议在Kafka头中发送记录id,这样在消费者端,我将在hashset中保存特定批次的id,并检查记录是重复的还是新的。
压缩日志不是一个选项,因为消费者在生产者停止生产记录之前就开始消费记录,所以无论如何都有可能出现重复的记录。
- 问题**
我听说在每个记录的头中发送id是一种反模式,但我想了解它是否真的是这样(以及为什么)。
先谢谢你
2条答案
按热度按时间kuuvgm7e1#
在hashset中保留特定批次的ID
散列集不是分布式的,如果使用者进程崩溃,需要重新启动,散列集将不会被维护。
因此,您需要一个外部的持久缓存,如Redis、Hazelcast等。
头可以有任何你想要的值,但是它们本身仍然需要反序列化,它们当然不是反模式。
我的同事使用Cassandra或MongoDB与Elasticsearch的组合来防止处理重复的ID,这不仅可能发生在生产者身上,也可能发生在缺少偏移提交的消费者重新平衡中。他们确实使用了头部来跟踪这些信息,但显然,这会增加消费者的延迟,因此您需要增加或降低
max.poll.interval.ms
。u7up0aaq2#
据我所知,您有大量的数据需要一起处理,这是一个经典的批处理-批处理实现。您为什么要使用Kafka呢?将数据拆分为多个消息会创建一个事件依赖项,这实际上是一个反模式,解决方案非常痛苦。您是否考虑过使用另一种模式,例如,在一个bucket中创建一个文件中的数据,并通过Kafka消息发送Bucket URL。这样,您可以消除重复数据(但只有在所有记录都写入bucket后,您才能星星使用)。很多时候,它仍然更快。
如果这没有用,并且你仍然需要/喜欢你的模式,你需要的是建立一个幂等性,这是一个很好的解释:https://www.lydtechconsulting.com/blog-kafka-idempotent-consumer.html如您所见,需要进行更多处理。
关于头,是的,你可以,它确实是构建标准(你可以遵循Cloud Events标准),但我不认为你会赢得很多时间,通常消息是不重复的。如果你要构建一个过滤器并反对大多数消息,这个用例是很好的,但这不是你的情况。