如何强制从Kafka中批量读取一定数量的消息。服务读取随机数量而不是指定数量。
在Kafka设置中,我指定了选项 * max-poll-records:“500”***
spring:
main:
allow-bean-definition-overriding: true
kafka:
listener:
type: batch
consumer:
enable-auto-commit: true
auto-offset-reset: latest
group-id: my-app
max-poll-records: "500"
fetch-min-size: "1000MB"
bootstrap servers:
"localhost:9092"
显示一次应读取多少条消息(500条消息)
并在Kafka的配置文件中指定了第二个参数***setIdleBetweenPolls(“5000”)***:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactoryBatch(
ConsumerFactory<String, String> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
var properties = new HashMap<String, Object>();
properties.putAll(consumerFactory.getConfigurationProperties());
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(properties));
factory.setBatchListener(true);
factory.getContainerProperties().setIdleBetweenPolls("5000");
return factory;
}
这是阅读间隔= 5秒。
每5秒,服务从Kafka读取500条消息,然后再读取500条,然后再读取500条,以此类推。
主要问题:当我向Kafka发送20条消息,或者50条,或者100条消息时,都没有问题。该服务一次读取所有消息。但是如果我向Kafka发送500条消息,或者10000条消息,那么服务会随机读取,不一定是500条。它可以一次读取500条消息,或者可能更少(例如,200和300或150和300和50)等。
P.S:我在互联网上找到了很多信息,我不知道如何解决这个问题,这是否是可能的。请分享您的意见和可能的解决方案,这个问题。
提前感谢大家!
1条答案
按热度按时间f0brbegy1#
没有
min.poll.records
,只有max。您可以对它进行一些控制,但不使用记录计数。
请参见fetch.min.bytes和fetch.max.wait.ms
fetch.min.bytes
默认为1,因此轮询有时会返回小于最大值的值。