我有一个springboot应用程序,它用springkafka听一个kafka主题。在消息被使用之后,使用消息中的信息执行几个web/rest服务调用,以收集一些其他数据,这个过程需要一些预期的时间。因此,我使用了一个大小为20的线程池来创建一个并行消息处理。
这个系统通常运行良好,但很少有大量的消息(大约200k)在短时间(1秒)内放入/产生到Kafka主题。在这种情况下,消费者会立即消费消息,但消息处理机制不够快。因此,在等待线程时,所有消耗的消息都会留在内存中,而应用程序会从内存错误中退出。
将线程池大小增加到一定程度可以是一种改进,但这并不是解决此问题的永久解决方案。我想在一段时间内消耗的消息数量和处理的消息数量之间建立一个平衡。这可以限制从Kafka主题中消耗的消息的数量,或者在有可能立即处理消息时消耗消息。
有没有Kafka消费者配置来限制一段时间内的消息数量?当消息消耗的延迟不是问题时,如何优化消耗和处理机制?
ps:两次后续轮询之间的时间间隔似乎没有配置(每次轮询之间的延迟时间是多少),如果有,可能有一个配置的解决方案。
这是我的消费代码:
@Autowired
MessageProcessUtil messageProcessUtil;
private ExecutorService executor = Executors.newFixedThreadPool(20);
@KafkaListener(topics = "${kafka.consumer.topicName}")
public void consume(String message){
logger.info(String.format("$$ -> Consumed Message -> %s",message));
messageProcessUtil.processMessage(message, executor);
}
使用者配置:
kafka.consumer.enable.auto.commit=true
kafka.consumer.auto.commit.interval.ms=1000
kafka.consumer.request.timeout.ms=40000
kafka.consumer.session.timeout.ms=30000
kafka.consumer.max.poll.records=1
kafka.consumer.fetch.max.wait.ms=500
kafka.consumer.auto.offset.reset=earliest
事先谢谢你的帮助。
3条答案
按热度按时间cvxl0en21#
由于您将消息消费和消息处理分开,因此没有任何配置可以实现您想要的。
但是您可以使用blockingqueue来实现它。您可以设置队列的最大数目,让一个拉线程将消息从kafka拉到队列中,并让进程线程使用队列中的消息。当队列变满时,拉线程将阻塞并降低拉速率。
oymdgrw72#
对已消费的kafka记录添加异步处理不是一个好主意;造成抵销管理问题;使用
concurreny
上@KafkaListener
要添加更多使用者(您至少需要在主题上有那么多分区)。z9zf31ra3#
kafka使用者可以通过设置客户端配置fetch.max.bytes来选择要获取的最大字节数。
请参阅此链接了解更多详细信息。