我在Spring 3上使用基本的Kafka订阅和轮询机制来消费和处理消息,如下面给出的代码片段从我们的消费者 Jmeter 板中,我可以看到应用程序没有订阅主题,看起来根本没有调用方法,也没有错误。我如何解决这个问题??
@Async
@Override
public void onApplicationEvent(ContextRefreshedEvent applicationContextEvent) {
log.info("Subscribing to topic={}", topic);
consumer.subscribe(Collections.singletonList(topic));
while (true) {
try {
ConsumerRecords<String, Order> records = consumer.poll(Duration.ofMillis(1000));
messageProcessor.processMessages(records);
consumer.commitAsync();
} catch (Exception e) {
log.error("Exception when consuming the messages from kafka ", e);
}
}
}
1条答案
按热度按时间f87krz0w1#
我假设你想在应用程序启动时调用
onApplicationEvent()
,如果是这样,正确的注解是@EventListener(ApplicationReadyEvent.class)
虽然我不太清楚这个方法的位置,但如果你的类是一个服务或组件,可以在你的类的构造函数之后开始监听Kafka,另一个选择是使用@PostConstruct
。同样,这取决于您的实现。我希望这能有所帮助。