spring 确定从Kafka阅读消息的自定义量

v64noz0r  于 2023-06-28  发布在  Spring
关注(0)|答案(1)|浏览(122)

如何强制从Kafka中批量读取一定数量的消息。服务读取随机数量而不是指定数量。
在Kafka设置中,我指定了选项 * max-poll-records:“500”***

spring:
   main:
     allow-bean-definition-overriding: true
   kafka:
     listener:
       type: batch
     consumer:
       enable-auto-commit: true
       auto-offset-reset: latest
       group-id: my-app
       max-poll-records: "500"
       fetch-min-size: "1000MB"
     bootstrap servers:
       "localhost:9092"

显示一次应读取多少条消息(500条消息)
并在Kafka的配置文件中指定了第二个参数***setIdleBetweenPolls(“5000”)***:

@Bean
 public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactoryBatch(
     ConsumerFactory<String, String> consumerFactory) {
     ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
     var properties = new HashMap<String, Object>();

     properties.putAll(consumerFactory.getConfigurationProperties());
     factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(properties));
     factory.setBatchListener(true);
     factory.getContainerProperties().setIdleBetweenPolls("5000");
     return factory;
 }

这是阅读间隔= 5秒。
每5秒,服务从Kafka读取500条消息,然后再读取500条,然后再读取500条,以此类推。

主要问题:当我向Kafka发送20条消息,或者50条,或者100条消息时,都没有问题。该服务一次读取所有消息。但是如果我向Kafka发送500条消息,或者10000条消息,那么服务会随机读取,不一定是500条。它可以一次读取500条消息,或者可能更少(例如,200和300或150和300和50)等。

P.S:我在互联网上找到了很多信息,我不知道如何解决这个问题,这是否是可能的。请分享您的意见和可能的解决方案,这个问题。
提前感谢大家!

f0brbegy

f0brbegy1#

没有min.poll.records,只有max。
您可以对它进行一些控制,但不使用记录计数。
请参见fetch.min.bytes和fetch.max.wait.ms
fetch.min.bytes默认为1,因此轮询有时会返回小于最大值的值。

相关问题