我在用Spring Kafka发消息。当我向Kafka主题发布100条消息时,我可以看到它被Kafka消费者接收,延迟至少1秒,在某些情况下甚至4到5秒。我使用的主题有6个分区,我将消费者的并发性保持在6。
我使用了Kafka消费者的默认属性值。有什么想法可能是错误的,或者我可以调整哪个属性,以最小化延迟到1到2毫秒?我试着发布100,500,1000,5000条消息,但滞后仍然存在。
消费者属性:
- 请求超时ms 30秒
- 心跳间隔毫秒3秒
- 最大轮询间隔(毫秒)5分钟
- 最大轮询记录数200
- 会话超时ms 45秒
消费类
以下是我的消费方法:
@KafkaListener(groupid = AlertsKafkaConfig.GROUP_ID_JSON, topics = TopicNameConstants.Webhook_doc_process_topic_name, containerFacotry = KafkaTopicConstans.WEBHOOK_PROCESS_TOPIC_CONF)
Public void receiveProcessPricessingMessage(@Payload String kafkajsonstring, @Header(KafkaHeaders RECEIVED_TIMESTAMP) String timestamp, @Header(KafkaHeaders.OFFSET) String offset) throws JsonProcessingException {
try {
long startime = System.currenttimemillis();
RequestFactory.appendKafkaId(offset);
long endtime = System.currenttimemillis();
Gson gson = new Gson();
WebhookProcessingCommand eventprocessingcommand = gson.fromJson(kafkajsonstring, WebhookProcessingCommand.class);
BaseAbstractEventHandler eventprocessinghandler = eventhandlerfactory.getEventhandlerInstance(EventHandler.WEBHOOK_JSON_EVENT_HANDLER.getHandlerName());
eventprocessinghandler.processEvent(eventprocessingcommand);
} catch (Exception e) {
LOGGER.error(ExceptionUtils.fullStackTrace(e));
}
}
1条答案
按热度按时间vi4fp9gy1#
在当前设置下,Kafka consumer同步运行,并在单个线程中逐个读取消息。如果侦听器方法运行得足够快,可以随时拾取下一条消息,那么这就很好。提供的代码绝对不是这种情况。这是您在日志中看到差距的主要原因。每一条消息的处理都需要很长时间。
基本的sinn是在处理程序方法内部构造一个新类。应该只做一次。最好是作为整个应用程序的单例。Spring可以通过配置bean和依赖注入来帮助实现。
我不知道类RequesterFactory(RequestFactory?)。也许他们也需要一些时间?以及有效负载的JSON反序列化。而在另一个地方,事件处理程序的示例将在每次调用中动态生成。
尝试重构代码并检查结果。如果你需要更快,那么你可以优化消费者的并发性。现在每个分区只有一个。