婀娜多姿的SpringKafka

5ktev3wc  于 2024-01-06  发布在  Apache
关注(0)|答案(1)|浏览(202)

我正在使用spring framework和spring Kafka为几个Kafka主题编写consumer。由于某些原因,我必须使用spring-Kafka 1.3。我已经手动配置读取所有主题,创建侦听器,将其 Package 在容器中,然后启动所有主题。我使用手动确认的批量消息侦听器。
我希望应用程序在关闭时优雅地停止。我想让消费者暂停,处理所有剩余的负载,然后停止容器。当所有容器都停止时,我会停止并退出应用程序。但是,旧版本没有容器的暂停方法,也没有任何访问消费者<K的方法,有没有什么方法可以在不更新到新版本的情况下实现这一点?
stop方法等待容器在定义的超时值内处理有效负载,然后停止它,这可能导致有效负载被丢弃或处理两次。

ff29svar

ff29svar1#

既然你是自己构建容器,你就可以访问消费者。如果你创建一个消费者服务,它包含了所有使用的消费者的内部列表,然后你添加一个关闭钩子,会怎么样呢?

@Service
public class ConsumerService {

    private final List<KafkaConsumer> consumers = new ArrayList<KafkaConsumer>();

    public KafkaConsumer<String, String> requestConsumer(){
        var consumer = createConsumer();
        consumers.add(consumer);
        return consumer;   
    }

    @PreDestroy
    public void destroy() {
        //here do the clean, like for example not ACK the messages, so another consumer can pick them up?
    }

}

字符串

相关问题