寻找最佳的方法来设计我的Kafka消费者。基本上,我想看看在处理消息的过程中出现任何异常/错误时,避免数据丢失的最佳方法是什么。
我的用例如下。
a) 我使用服务来处理消息的原因是——将来我计划编写一个错误处理器应用程序,该应用程序将在一天结束时运行,它将再次尝试处理失败的消息(不是所有消息,而是由于任何依赖关系(如父级丢失)而失败的消息)。
b) 我想确保有零消息丢失,所以我会保存到一个文件中的消息,以防有任何问题,而保存到数据库的消息。
c) 在生产环境中,可能有多个使用者和服务示例正在运行,因此多个应用程序很有可能试图写入同一个文件。
q-1)写入文件是避免数据丢失的唯一选择吗?
问题2)如果这是唯一的选择,如何确保多个应用程序同时写入同一个文件和读取?请考虑在将来构建错误处理器时,它可能正在读取同一文件中的消息,而另一个应用程序正在尝试写入该文件。
错误处理器—我们的源代码遵循事件驱动机制,有时依赖事件(例如,某个事件的父实体)很可能会延迟几天。所以在这种情况下,我希望我的错误处理器能多次处理相同的消息。
2条答案
按热度按时间5q4ezhmt1#
我以前也遇到过类似的事情。所以,直接进入你的问题:
不一定,你可以用一个新的主题把这些信息发送回Kafka(比如-
error-topic
). 所以,当你的错误处理器准备好了,它就可以监听这个error-topic
当这些信息进来的时候就把它们消费掉。我认为这个问题是针对第一个问题提出的。因此,与其使用一个文件进行读写操作,同时打开多个文件句柄来执行此操作,kafka可能是一个更好的选择,因为它是为此类问题而设计的。
注意:基于我对你的问题领域的有限理解,以下几点只是一些值得思考的东西。所以,你可以选择忽略这个安全。
还有一点值得你考虑的设计
service
组件-您还可以考虑通过将所有错误消息发送回kafka来合并第4点和第5点。这将使您能够以一致的方式处理所有错误消息,而不是将一些消息放在错误数据库中,另一些放在kafka中。编辑:基于错误处理器需求的附加信息,这里是解决方案设计的图示。
为了保持通用性,我特意暂时保留了错误处理器的输出摘要。
我希望这有帮助!
ss2ws0br2#
如果在写入数据库之前不提交已使用的消息,那么在kafka保留消息时,不会丢失任何内容。这样做的代价是,如果使用者确实提交到数据库,但是kafka偏移量提交失败或超时,那么最终会再次使用记录,并且可能会在服务中处理重复的记录。
即使您确实写入了一个文件,也不能保证您的顺序,除非您为每个分区打开一个文件,并确保所有使用者只在一台机器上运行(因为您在那里保存状态,这是不容错的)。重复数据消除仍然需要处理。
此外,您可以查看kafka connect框架,而不是将自己的消费者写入数据库。为了验证消息,可以类似地部署kafka streams应用程序,将输入主题中的坏消息过滤到要发送到db的主题中