kafka延迟队列实现

siotufzp  于 2021-06-08  发布在  Kafka
关注(0)|答案(4)|浏览(701)

想要使用高级使用者api实现延迟使用者吗
主旨:
按键生成消息(每个消息包含创建时间戳)这确保每个分区按生成的时间对消息进行排序。
auto.commit.enable=false(将在每个消息进程之后显式提交)
使用消息
检查消息时间戳并检查是否经过了足够的时间
处理消息(此操作永远不会失败)
提交1偏移量

  1. while (it.hasNext()) {
  2. val msg = it.next().message()
  3. //checks timestamp in msg to see delay period exceeded
  4. while (!delayedPeriodPassed(msg)) {
  5. waitSomeTime() //Thread.sleep or something....
  6. }
  7. //certain that the msg was delayed and can now be handled
  8. Try { process(msg) } //the msg process will never fail the consumer
  9. consumer.commitOffsets //commit each msg
  10. }

关于这一实施的一些担忧:
提交每个偏移可能会减慢zk的速度
consumer.commitofsets能否引发异常?如果是,我将使用相同的消息两次(可以用幂等消息求解)
等待很长时间而不提交偏移量的问题,例如延迟时间是24小时,将从迭代器获取下一个,睡眠24小时,处理并提交(zk session timeout?)
zk会话如何在没有提交新偏移量的情况下保持活动(设置一个配置单元zookeeper.session.timeout.ms可以在没有识别的情况下解析死亡消费者)
我还缺什么问题吗?
谢谢!

btqmn9zl

btqmn9zl1#

解决这个问题的一种方法是使用不同的主题,推送所有要延迟的消息。如果所有延迟的消息都应在相同的时间延迟后处理,这将是相当直接的:

  1. while(it.hasNext()) {
  2. val message = it.next().message()
  3. if(shouldBeDelayed(message)) {
  4. val delay = 24 hours
  5. val delayTo = getCurrentTime() + delay
  6. putMessageOnDelayedQueue(message, delay, delayTo)
  7. }
  8. else {
  9. process(message)
  10. }
  11. consumer.commitOffset()
  12. }

现在,所有常规消息都将被尽快处理,而那些需要延迟的消息将被放到另一个主题上。
好在我们知道延迟主题的头部的消息是应该首先处理的消息,因为它的delayto值是最小的。因此,我们可以设置另一个使用者来读取head消息,检查时间戳是否在过去,如果是,则处理消息并提交偏移量。如果没有,则不提交偏移量,而是一直休眠到该时间:

  1. while(it.hasNext()) {
  2. val delayedMessage = it.peek().message()
  3. if(delayedMessage.delayTo < getCurrentTime()) {
  4. val readMessage = it.next().message
  5. process(readMessage.originalMessage)
  6. consumer.commitOffset()
  7. } else {
  8. delayProcessingUntil(delayedMessage.delayTo)
  9. }
  10. }

如果有不同的延迟时间,您可以将主题划分为延迟时间(例如24小时、12小时、6小时)。如果延迟时间比这更动态,它会变得更复杂一些。你可以通过引入两个延迟主题来解决这个问题。从延迟主题中读取所有消息 A 并处理所有 delayTo 价值已经成为过去。在其他人中,你只找到最接近的一个 delayTo 然后把它们放在主题上 B . 睡眠,直到最近的一个应该被处理,并以相反的方式进行,即处理来自主题的消息 B 把不该处理的一次放回主题上 A .
回答您的具体问题(有些问题已在您的问题评论中提及)
提交每个偏移可能会减慢zk的速度
您可以考虑切换到在kafka中存储偏移量(这是0.8.2中提供的功能,请参阅) offsets.storage 消费者配置中的属性)
consumer.commitofsets能否引发异常?如果是,我将使用相同的消息两次(可以用幂等消息求解)
我相信它可以,如果它不能与偏移存储通信的话。如你所说,使用幂等消息解决了这个问题。
等待很长时间而不提交偏移量的问题,例如延迟时间是24小时,将从迭代器获取下一个,睡眠24小时,处理并提交(zk session timeout?)
上述解决方案不会有问题,除非消息本身的处理时间超过会话超时时间。
zk会话如何在没有提交新偏移量的情况下保持活动(设置一个配置单元zookeeper.session.timeout.ms可以在没有识别的情况下解析死亡消费者)
同样,使用上述方法,您不需要设置很长的会话超时。
我还缺什么问题吗?
总是有;)

展开查看全部
olhwl3o2

olhwl3o22#

按计划键控列表或其redis替代方案可能是最好的方法。

d8tt03nd

d8tt03nd3#

我建议你换一条路。
在消费者的主线程中处理等待时间是没有意义的。这将是如何使用队列的反模式。从概念上讲,您需要以尽可能快的速度处理消息,并将队列保持在较低的加载因子。
相反,我将使用一个调度程序,它将为需要延迟的每条消息调度作业。通过这种方式,您可以处理队列并创建将在预定义的时间点触发的异步作业。
使用这种技术的缺点是,它可以感知内存中保存调度作业的jvm的状态。如果该jvm失败,那么您就失去了计划的作业,并且不知道该任务是否被执行。
有一些调度器实现,但是可以配置为在集群环境中运行,从而使您免受jvm崩溃的影响。
看看这个java调度框架:http://www.quartz-scheduler.org/

z0qdvdin

z0qdvdin4#

使用tibco ems或其他jms队列。它们内置了重试延迟。Kafka可能不是你所做的正确的设计选择

相关问题