案例
客户是 ReplyingKafkaTemplate
示例。
服务器是 ConcurrentMessageListenerContainer
使用创建 @KafkaListener
以及 @SendTo
方法上的注解。
集装箱工厂用途 ContainerStoppingErrorHandler
.
请求主题只有一个分区。
组ID是静态的。测试消费者群体。
请求随超时一起发送。
由于抛出了一个异常,服务器停机,但客户机一直在调度请求,这些请求在请求主题上排队。
当前行为
当服务器恢复时,它将继续处理可能已超时的旧请求。
期望的行为
相反,最好继续最后一条信息;因此,即使跳过未处理的消息,相应的请求也会超时并重试。
问题
实现这一目标的建议方法是什么?
据我所知,似乎我必须手动设置初始偏移量。最简单的实现方法是什么?
1条答案
按热度按时间mlnl4t2r1#
你的
@KafkaListener
类必须extends AbstractConsumerSeekAware
然后这样做:因此,每次当您的使用者加入组时,它都会寻找所有指定的分区,跳过所有旧记录。