我第一次使用spring,并尝试实现一个共享队列,其中kafka侦听器将消息放在共享队列上,threadmanager最终将对从共享队列中取出的项执行多线程处理。以下是我当前的实现:
听众:
@Component
public class Listener {
@Autowired
private QueueConfig queueConfig;
private ExecutorService executorService;
private List<Future> futuresThread1 = new ArrayList<>();
public Listener() {
Properties appProps = new AppProperties().get();
this.executorService = Executors.newFixedThreadPool(Integer.parseInt(appProps.getProperty("listenerThreads")));
}
//TODO: how can I pass an approp into this annotation?
@KafkaListener(id = "id0", topics = "bose.cdp.ingest.marge.boseaccount.normalized")
public void listener(ConsumerRecord<?, ?> record) throws InterruptedException, ExecutionException
{
futuresThread1.add(executorService.submit(new Runnable() {
@Override public void run() {
try{
queueConfig.blockingQueue().put(record);
// System.out.println(queueConfig.blockingQueue().take());
} catch (Exception e){
System.out.print(e.toString());
}
}
}));
}
}
队列:
@Configuration
public class QueueConfig {
private Properties appProps = new AppProperties().get();
@Bean
public BlockingQueue<ConsumerRecord> blockingQueue() {
return new ArrayBlockingQueue<>(
Integer.parseInt(appProps.getProperty("blockingQueueSize"))
);
}
}
线程管理器:
@Component
public class ThreadManager {
@Autowired
private QueueConfig queueConfig;
private int threads;
public ThreadManager() {
Properties appProps = new AppProperties().get();
this.threads = Integer.parseInt(appProps.getProperty("threadManagerThreads"));
}
public void run() throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(threads);
try {
while (true){
queueConfig.blockingQueue().take();
}
} catch (Exception e){
System.out.print(e.toString());
executorService.shutdownNow();
executorService.awaitTermination(1, TimeUnit.SECONDS);
}
}
}
最后一点,一切都从主线开始:
@SpringBootApplication
public class SourceAccountListenerApp {
public static void main(String[] args) {
SpringApplication.run(SourceAccountListenerApp.class, args);
ThreadManager threadManager = new ThreadManager();
try{
threadManager.run();
} catch (Exception e) {
System.out.println(e.toString());
}
}
}
问题
当在调试器中运行此命令时,我可以看出侦听器正在向队列中添加内容。当threadmanager离开共享队列时,它告诉我队列为null,我得到一个npe。似乎autowiring无法将侦听器使用的队列连接到threadmanager。谢谢你的帮助。
2条答案
按热度按时间k4ymrczo1#
你用Spring´s programatic,即所谓的“javaconfig”,设置springbean(用
@Configuration
方法注解为@Bean
). 通常在应用程序启动时,spring会调用@Bean
方法,并在其应用程序上下文中注册它们(如果作用域是singleton-默认值-这只会发生一次!)。不用打电话了@Bean
方法直接在代码中的任何位置。。。你不能,否则你会得到一个单独的,新的示例,可能是没有完全配置!相反,你需要注射
BlockingQueue<ConsumerRecord>
您在QueueConfig.blockingQueue()
你的方法ThreadManager
. 因为队列似乎是ThreadManager
为了工作,我让spring通过构造函数注入它:只想澄清一件事:默认情况下
@Bean
方法被spring用来为这个bean分配一个唯一的id(方法名==bean id)。所以你的方法被称为blockingQueue
,表示您的BlockingQueue<ConsumerRecord>
示例也将以id注册blockingQueue
在应用程序上下文中。新的构造函数参数也被命名为blockingQueue
而且是类型匹配BlockingQueue<ConsumerRecord>
. 简化了,这是spring查找和注入/连接依赖项的一种方法。kninwzqo2#
这就是问题所在:
ThreadManager threadManager = new ThreadManager();
因为您是手动创建示例,所以不能使用spring提供的di。一个简单的解决方案是实现commandlinerunner,它将在完成
SourceAccountListenerApp
初始化: