我正在用kafka实现一种延迟队列。spring-kafka侦听器容器为某个主题(比如t1)接收到的每一条消息都将被延迟一段时间(比如d分钟),然后消息将被发送回另一个主题(比如t2)。目前,我正在SpringKafka侦听器容器方法(AcknowledgeingConsumeraWareMessageListener)中执行此操作:
从t1接收消息
暂停侦听器容器
如果需要的话,睡4分钟
恢复侦听器容器
发送消息到t2
我知道心跳线程是一个不同的线程,不会受到上述步骤的影响,但是轮询与处理发生在同一个线程中,并且在按照这个答案处理记录之后。我已经将@kafkalistener属性设置为“max.poll.interval.ms=2xd”,这样它就不会超时,但是我从kafkaeventlistener获得了无响应的消费事件(带有timesincelastpoll)。即使我没有为@kafkalistener属性设置max.poll.interval.ms,我仍然会得到相同的无响应消费事件。在这两种情况下,消息只处理一次并发送到t2。
问题
当侦听器容器暂停时,如果在max.poll.interval.ms内没有轮询,结果会是什么?如果容器没有暂停呢(我已将消费者配置为手动确认)
我应该为休眠和恢复容器生成一个单独的线程,从而释放容器处理线程来轮询吗?有关系吗?
版本:spring boot 2.1.8、spring kafka 2.2.8
1条答案
按热度按时间pod7payv1#
如果需要的话,睡4分钟
你不能“睡眠”消费者线程超过
max.poll.interval.ms
.暂停容器的关键是使其继续运行
poll
(但在恢复之前将永远不会获得任何新记录)。如果你真的让听众睡着了,停顿是没有意义的;你只需要增加
max.poll.interval.ms
适当地。