我使用SpringKafka2.x阅读来自kafka的消息。
我的问题是,如果我的方法花了一段时间在我的Kafka利斯特方法上,那一次Kafka发送了两次相同的消息,这对我来说是个问题。如果我读了Kafka的消息,那我想要什么呢?Kafka没有第二次给我发送这个消息,像max.poll.interval.ms这样的时间值不保证一次读消息。在spring boot中实现一次读策略的正确方法是什么。在我的消息中,我没有相应的密钥我无法控制的原因。
@KafkaListener(topics = "mytopics",groupId = "mygroup",concurrency = 3",containerFactory = "MyListenerContainerFactory")
void messageReceiver(@Payload String data, @Headers MessageHeaders headers) {
String receivedTopic= headers.get(KafkaHeaders.RECEIVED_TOPIC).toString();
//DO something
}
1条答案
按热度按时间roqulrg31#
你需要在10分钟内处理投票结果
max.poll.interval.ms
. 所以增加它,或者减少它max.poll.records
.Kafka没有任何机制来阻止重新交付,如果你花太长的时间来处理。