在Java Sping Boot 中分配多线程进程中未使用/空闲/已完成的线程

xu3bshqb  于 2023-02-28  发布在  Java
关注(0)|答案(1)|浏览(141)

我正在构建一个从多个服务中查询数据的服务,这些服务都是独立的,所以我可以同时发送所有请求,以提高处理效率。下面是我如何让服务启动多个线程来同时完成所有任务:

ExecutorService executor = Executors.newFixedThreadPool(5);
        CompletionService completionService =
                new ExecutorCompletionService<>(executor);

        List<Future<Map>> res = new ArrayList<>();
        res.add(completionService.submit(() -> callAPI(HttpMethod.GET, "/inquiry1", null, null, null)));
        res.add(completionService.submit(() -> callAPI(HttpMethod.GET, "/inquiry2", null, null, null)));
        res.add(completionService.submit(() -> callAPI(HttpMethod.GET, "/inquiry3", null, null, null)));
        res.add(completionService.submit(() -> callAPI(HttpMethod.GET, "/inquiry4", null, null, null)));
        res.add(completionService.submit(() -> callAPI(HttpMethod.GET, "/inquiry5", null, null, null)));

        Map responseMap = new HashMap();
        for (int i=0; i < res.size(); i++) {
            try {
                responseMap.put("result_"+i, (Map) completionService.take().get());
                
            } catch (Exception e) {
                throw (Exception) e.getCause();
            }
        }
        executor.shutdown();

当所有依赖项API运行正常时,这实际上会得到预期的输出:

{
    "result_1": {...},
    "result_2": {...},
    "result_3": {...},
    "result_4": {...},
    "result_5": {...}
}

然而,当任何依赖API需要更长的处理时间(甚至有网关超时问题),其他线程正在等待它,即使其他进程已经完成。当有许多请求淹没这个服务,因为它不断打开5个新线程,他们正在等待任何缓慢的依赖API的响应,它使示例崩溃。
因此,我的问题是:
1.如何使线程在进程结束时(当它收到依赖API的响应并将值存储在内存中时)自动关闭或分配给另一个进程?
1.如何设置可以打开的线程数的阈值,并在打开多线程之前让服务检查进程是否可以继续或只是将其切断?

jqjz2hbq

jqjz2hbq1#

代码将为每次调用服务方法创建5个线程。

ExecutorService executor = Executors.newFixedThreadPool(5);

这将创建一个线程池,每次调用5个线程。如果调用100次,则有100 * 5 = 500个线程。它们只使用一次,并且不会在每次方法中关闭池时回收。
当您使用Spring时,我建议您使用TaskExecutor或更确切地说AsyncTaskExecutor,并使用动态线程池(最小大小为5,最大大小为5,您可以选择任何大小,以及合理的队列大小)配置它,这样您将只拥有您指定为max的线程数(最终将根据您的配置按比例缩小)。
您应该在配置中将这个TaskExecutor定义为bean(或者使用Sping Boot 配置的bean),并将其注入到服务中以使用它。

@Bean
public ThreadPoolTaskExecutor taskExecutor() {
  ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
  executor.setCorePoolSize(5);
  executor.setMaxPoolSize(10);
  executor.setQueueCapacity(16);
  return executor;
}

那就用这个为你服务。

public class YourService {

  private final AsyncTaskExecutor executor;

  public YourService(AsyncTaskExecutor executor) {
    this.executor=executor;
  }

  public Map getResponse()

    List<Future<Map>> res = new ArrayList<>();
    res.add(executor.submit(() -> callAPI(HttpMethod.GET, "/inquiry1", null, null, null)));
    res.add(executor.submit(() -> callAPI(HttpMethod.GET, "/inquiry2", null, null, null)));
    res.add(executor.submit(() -> callAPI(HttpMethod.GET, "/inquiry3", null, null, null)));
    res.add(executor.submit(() -> callAPI(HttpMethod.GET, "/inquiry4", null, null, null)));
    res.add(executor.submit(() -> callAPI(HttpMethod.GET, "/inquiry5", null, null, null)));

    Map responseMap = new HashMap();
    for (int i=0; i < res.size(); i++) {
      try {
        responseMap.put("result_"+i, (Map) completionService.take().get());
                
        } catch (Exception e) {
          throw (Exception) e.getCause();
        }
    }
    return responseMap;
}

类似于这样,这将最多有10个线程活动,而不是每个请求5个线程,并重用创建的线程来创建任务。

相关问题