如何在Kafka上添加延迟的工作?据我所知,它不涉及每个消息,但每个主题。我的工作有不同的时间表,我希望他们被消费。比如说一个是在接下来的4小时内,另一个是12月1日,等等。Kafka是否有本土的支持,这或其他第三方的方式来实现相同的?我正在考虑使用redis来代替延迟队列,并在其时间表到达后将作业推送到kafka,但是如果可能的话,我只想使用一个依赖项。
a0zr77ik1#
作为替代,您可以使用rabbitmq,它通过使用消息ttl和死信交换来支持这一点欲了解更多信息,请访问:https://m.alphasights.com/exponential-backoff-with-rabbitmq-78386b9bec81
nfg76nw02#
不幸的是,kafka不能像某些消息队列那样延迟消息的可见性。消息发布后,将立即提供给所有消费者。唯一的小例外是在事务作用域中进行发布,并且使用者启用了读取提交隔离模式。即便如此,延误也将是微乎其微的。kafka将所有处理语义留给消费者自行决定。如果您需要延迟处理,您可能需要使用持久数据存储(例如rdbms或redis)或消费端的另一个队列。您肯定不想用thread.sleep()阻止生产者上的记录消费,因为这会影响您轮询记录的能力,kafka最终会认为您的消费者失败了。
mwngjboj3#
Kafka没有工作的概念。它只是一个愚蠢的高性能消息队列服务。根据您的需求,您可以考虑将作业存储在支持按作业执行时间进行索引的存储器中,就像某些rdbms一样。然后在某些进程中,周期性地提取执行时间在某个小范围内的作业[上次检查时间、当前时间+展望时间间隔],并将它们放入kafka主题中进行最终处理。
zfycwa2u4#
回答有点迟。现在,在最新的kafka版本0.10+中,可以使用每个消息的新时间戳从延迟流进行消费。我现在使用它是为了实现一个连续的聚合数据集,而不依赖于外部依赖。这些记录会通过,并且在第一个事件之后的60分钟内可能会有更新/删除,因此在看到所有更新之前,我不能将其中一个声明为“最终”。因此,为了处理这种情况,我将主题与所有创建/更新/删除一起使用两次,第一次是实时的(或尽可能快),第二次是延迟90分钟,以确保我不会错过任何东西。在实时使用者上,我在本地存储create所需的所有更新。然后,在延迟消费者上,当我收到一个特定的“create”时,我将查找本地存储中的任何更新/删除,更新记录以便它知道它的最终状态,并再次将其生成kafka的最终主题。为了确保磁盘空间不会用完,我还不断地截断本地存储,这样它最多可以保存两个小时的更新/删除。
4条答案
按热度按时间a0zr77ik1#
作为替代,您可以使用rabbitmq,它通过使用消息ttl和死信交换来支持这一点
欲了解更多信息,请访问:
https://m.alphasights.com/exponential-backoff-with-rabbitmq-78386b9bec81
nfg76nw02#
不幸的是,kafka不能像某些消息队列那样延迟消息的可见性。消息发布后,将立即提供给所有消费者。唯一的小例外是在事务作用域中进行发布,并且使用者启用了读取提交隔离模式。即便如此,延误也将是微乎其微的。
kafka将所有处理语义留给消费者自行决定。如果您需要延迟处理,您可能需要使用持久数据存储(例如rdbms或redis)或消费端的另一个队列。您肯定不想用thread.sleep()阻止生产者上的记录消费,因为这会影响您轮询记录的能力,kafka最终会认为您的消费者失败了。
mwngjboj3#
Kafka没有工作的概念。它只是一个愚蠢的高性能消息队列服务。根据您的需求,您可以考虑将作业存储在支持按作业执行时间进行索引的存储器中,就像某些rdbms一样。然后在某些进程中,周期性地提取执行时间在某个小范围内的作业[上次检查时间、当前时间+展望时间间隔],并将它们放入kafka主题中进行最终处理。
zfycwa2u4#
回答有点迟。现在,在最新的kafka版本0.10+中,可以使用每个消息的新时间戳从延迟流进行消费。我现在使用它是为了实现一个连续的聚合数据集,而不依赖于外部依赖。
这些记录会通过,并且在第一个事件之后的60分钟内可能会有更新/删除,因此在看到所有更新之前,我不能将其中一个声明为“最终”。
因此,为了处理这种情况,我将主题与所有创建/更新/删除一起使用两次,第一次是实时的(或尽可能快),第二次是延迟90分钟,以确保我不会错过任何东西。在实时使用者上,我在本地存储create所需的所有更新。然后,在延迟消费者上,当我收到一个特定的“create”时,我将查找本地存储中的任何更新/删除,更新记录以便它知道它的最终状态,并再次将其生成kafka的最终主题。
为了确保磁盘空间不会用完,我还不断地截断本地存储,这样它最多可以保存两个小时的更新/删除。