Kafka消费者花费很长时间来消费消息

wztqucjr  于 2023-05-16  发布在  Apache
关注(0)|答案(1)|浏览(295)

我在用Spring Kafka发消息。当我向Kafka主题发布100条消息时,我可以看到它被Kafka消费者接收,延迟至少1秒,在某些情况下甚至4到5秒。我使用的主题有6个分区,我将消费者的并发性保持在6。
我使用了Kafka消费者的默认属性值。有什么想法可能是错误的,或者我可以调整哪个属性,以最小化延迟到1到2毫秒?我试着发布100,500,1000,5000条消息,但滞后仍然存在。
消费者属性:

  • 请求超时ms 30秒
  • 心跳间隔毫秒3秒
  • 最大轮询间隔(毫秒)5分钟
  • 最大轮询记录数200
  • 会话超时ms 45秒

消费类

以下是我的消费方法:

@KafkaListener(groupid = AlertsKafkaConfig.GROUP_ID_JSON, topics = TopicNameConstants.Webhook_doc_process_topic_name, containerFacotry = KafkaTopicConstans.WEBHOOK_PROCESS_TOPIC_CONF)
Public void receiveProcessPricessingMessage(@Payload String kafkajsonstring, @Header(KafkaHeaders RECEIVED_TIMESTAMP) String timestamp, @Header(KafkaHeaders.OFFSET) String offset) throws JsonProcessingException {

    try {
        long startime = System.currenttimemillis();
        RequestFactory.appendKafkaId(offset);
        long endtime = System.currenttimemillis();
        Gson gson = new Gson();
        WebhookProcessingCommand eventprocessingcommand = gson.fromJson(kafkajsonstring, WebhookProcessingCommand.class);

        BaseAbstractEventHandler eventprocessinghandler = eventhandlerfactory.getEventhandlerInstance(EventHandler.WEBHOOK_JSON_EVENT_HANDLER.getHandlerName());
        eventprocessinghandler.processEvent(eventprocessingcommand);

    } catch (Exception e) {
        LOGGER.error(ExceptionUtils.fullStackTrace(e));

    }
}
vi4fp9gy

vi4fp9gy1#

在当前设置下,Kafka consumer同步运行,并在单个线程中逐个读取消息。如果侦听器方法运行得足够快,可以随时拾取下一条消息,那么这就很好。提供的代码绝对不是这种情况。这是您在日志中看到差距的主要原因。每一条消息的处理都需要很长时间。
基本的sinn是在处理程序方法内部构造一个新类。应该只做一次。最好是作为整个应用程序的单例。Spring可以通过配置bean和依赖注入来帮助实现。
我不知道类RequesterFactory(RequestFactory?)。也许他们也需要一些时间?以及有效负载的JSON反序列化。而在另一个地方,事件处理程序的示例将在每次调用中动态生成。
尝试重构代码并检查结果。如果你需要更快,那么你可以优化消费者的并发性。现在每个分区只有一个。

相关问题