kafka消费者中的重试逻辑

1sbrub3j  于 2021-06-07  发布在  Kafka
关注(0)|答案(3)|浏览(835)

我有一个用例,在这个用例中,我使用队列中的某些日志,并使用该日志中的一些信息访问一些第三方api,如果第三方系统没有正确响应,我希望为该特定日志实现一个重试逻辑。
我可以添加一个时间域并将消息重新发布到同一队列中,如果该消息的时间域有效(即小于当前时间),则该消息将再次被使用,否则将再次被推送到队列中。
但是这个逻辑将一次又一次地添加相同的日志,直到重试时间正确为止,队列将不必要地增长。
在kafka中有没有更好的方法来实现重试逻辑?

kgsdhlau

kgsdhlau1#

您可以创建几个重试主题并将失败的任务推送到那里。例如,您可以创建3个不同延迟分钟的主题,并轮换单个失败的任务,直到达到最大尝试限制。
'重试主题' — 5分钟后重试
'重试\u 30m\u主题' — 30分钟后重试
'重试主题' — 1小时后重试
详细信息请参见:https://blog.pragmatists.com/retrying-consumer-architecture-in-the-apache-kafka-939ac4cb851a

sz81bmfz

sz81bmfz2#

在使用者中,如果抛出异常,则生成另一条尝试次数为1的消息。所以下次当它被消费时,它的属性是attempt no 1。在producer中处理它,如果它尝试的次数超过您的重试次数,则停止生成它。

chhqkbe1

chhqkbe13#

是的,这可能是我也想到的一个直接的解决方案。但是有了这个,我们最终将创建许多主题,因为消息处理可能会再次失败。
我通过将这个用例Map到rabbitmq解决了这个问题。在rabbitmq中,我们有重试交换的概念,如果来自交换的消息处理失败,那么您可以使用ttl将其发送到重试交换。一旦ttl过期,消息将移回主交换机,并准备再次处理。
我可以发布一些示例,解释如何使用rabbitmq实现指数退避消息处理。

相关问题