重新启动侦听器并从最新消息继续

0ejtzxu1  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(336)

案例
客户是 ReplyingKafkaTemplate 示例。
服务器是 ConcurrentMessageListenerContainer 使用创建 @KafkaListener 以及 @SendTo 方法上的注解。
集装箱工厂用途 ContainerStoppingErrorHandler .
请求主题只有一个分区。
组ID是静态的。测试消费者群体。
请求随超时一起发送。
由于抛出了一个异常,服务器停机,但客户机一直在调度请求,这些请求在请求主题上排队。
当前行为
当服务器恢复时,它将继续处理可能已超时的旧请求。
期望的行为
相反,最好继续最后一条信息;因此,即使跳过未处理的消息,相应的请求也会超时并重试。
问题
实现这一目标的建议方法是什么?
据我所知,似乎我必须手动设置初始偏移量。最简单的实现方法是什么?

mlnl4t2r

mlnl4t2r1#

你的 @KafkaListener 类必须 extends AbstractConsumerSeekAware 然后这样做:

@Override
    public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
        super.onPartitionsAssigned(assignments, callback);
        callback.seekToEnd(assignments.keySet());
    }

因此,每次当您的使用者加入组时,它都会寻找所有指定的分区,跳过所有旧记录。

相关问题