我有一个spring启动应用程序,它需要处理一些kafka流数据。我给一个 CommandLineRunner
类,该类将在启动时运行。在这里有一个Kafka消费者,可以唤醒。我添加了一个关机挂钩 Runtime.getRuntime().addShutdownHook(new Thread(consumer::wakeup));
. 我会遇到什么问题吗?Spring有没有更惯用的方法?我应该用吗 @Scheduled
相反呢?下面的代码去掉了特定的kafka实现内容,但在其他方面是完整的。
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import java.time.Duration;
import java.util.Properties;
@Component
public class InfiniteLoopStarter implements CommandLineRunner {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Override
public void run(String... args) {
Consumer<AccountKey, Account> consumer = new KafkaConsumer<>(new Properties());
Runtime.getRuntime().addShutdownHook(new Thread(consumer::wakeup));
try {
while (true) {
ConsumerRecords<AccountKey, Account> records = consumer.poll(Duration.ofSeconds(10L));
//process records
}
} catch (WakeupException e) {
logger.info("Consumer woken up for exiting.");
} finally {
consumer.close();
logger.info("Closed consumer, exiting.");
}
}
}
3条答案
按热度按时间z9zf31ra1#
为了回答我自己的问题,我查看了kafka集成库,如spring-kafka和spring-cloud-stream,但是与confluent的schema registry的集成要么没有完成,要么我不太清楚。这对于原语来说已经足够了,但是对于由模式注册表验证的类型化avro对象,我们需要它。我现在实现了一个Kafka不可知的解决方案,基于SpringBoot上的答案——启动部署后台线程的最佳方式
最后的代码如下所示:
wsxa1bj12#
实现看起来还可以,但是使用commandlinerunner并不是为了这个。commandlinerunner用于在启动时只运行一次某个任务。从设计Angular 看,它不是很优雅。我宁愿使用带有kafka的spring集成适配器组件。你可以在这里找到例子https://github.com/raphaelbrugier/spring-integration-kafka-sample/blob/master/src/main/java/com/github/rbrugier/esb/consumer/consumer.java .
voj3qocg3#
我不确定您是否会遇到任何问题,但这有点脏-spring为与kafka合作提供了非常好的内置支持,因此我倾向于此(web上有大量关于此的文档,但一个很好的文档是:https://www.baeldung.com/spring-kafka).
您将需要以下依赖项:
配置与添加
@EnableKafka
注解,然后设置侦听器和ConsumerFactorybean配置后,您可以按以下方式轻松设置使用者: