java 如何防止下一个@Scheduler调用,直到当前@scheduler在Sping Boot 中完成?

bakd9h0s  于 2022-12-25  发布在  Java
关注(0)|答案(2)|浏览(131)

我正在尝试创建一个Sping Boot 应用程序,它使用@Async多线程异步处理数千条记录。为此,我创建了8个线程和8个来自主列表的子列表,以便8个线程异步处理8个子列表。我还使用@scheduler,以便每2秒调用一次方法。

但问题是由于调度程序,有时这个应用程序处理重复的记录,因为这个方法每2秒被调用,并且每2秒从数据库中检索数据。例如,第一次方法被调用,并且从数据库中检索72000条记录,其标记为0,然后@Async方法处理所有这些记录,并且将处理的记录标记从0改变为1。然后在2秒内再次调用方法并检索标记为0的新记录。

在随附的日志图片中,您可以看到调度程序第一次被调用并检索到72000条记录,多个线程在下一个调度程序启动和检索到16000条记录之间开始处理,其中包含当前调度程序中存在的记录。

我正在寻找解决方案,下一个调度程序不应该调用,直到第一个调度程序得到完成。因为有时第一个调度程序处理之间的记录,如果下一个调度程序调用在2秒内,那么它可能会再次检索这些记录已经存在于第一个调度程序调用。

我不能增加调度器调用时间。因为最大时间我们得到的记录约400-500,有时我们得到的记录数以千计。
代码如下。

@SpringBootApplication
@EnableScheduling
@EnableAsync
public class DemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }
    
    @Bean
    public RestTemplate restTemplate(RestTemplateBuilder builder) {
        return builder.build();
    }
    
    
    @Bean("ThreadPoolTaskExecutor")
      public TaskExecutor getAsyncExecutor() {
        final ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(8);
        executor.setMaxPoolSize(8);
        executor.setQueueCapacity(8);
        executor.setWaitForTasksToCompleteOnShutdown(true);
        executor.setThreadNamePrefix("async-");
        executor.setRejectedExecutionHandler(new RejectedExecutionHandlerImpl());
        executor.initialize();
        return executor;
      }
}

@Service
public class Main {

    @Autowired
    private DemoDao dao;

    @Autowired
    private Asyn asyn;
    
    
    static int schedulerCount = 0;

    
    @Scheduled(cron = "0/2 * * * * *")
    public void schedule() {

        System.out.println("++++++++++++++++++++++++++++++++++++Scheduler started schedulerCount  :  "+schedulerCount+"+++++++++++++++++++++++++++++++++++++"+ LocalDateTime.now());
        List<Json> jsonList = new ArrayList<Json>();
        List<List<Json>> smallLi = new ArrayList<List<Json>>();
        try {
            jsonList = dao.getJsonList();
            System.out.println("jsonList size   :   " + jsonList.size());
            
            int count = jsonList.size();
            
            //Creating 8 sublist (8 sublist because thread pool size is 8) from main list(jsonList)
            int limit = Math.round(count / 8) + 1;
            
            for (int j = 0; j < count; j += limit) {
                smallLi.add(new ArrayList<Json>(jsonList.subList(j, Math.min(count, j + limit))));
            }
            System.out.println("smallLi  :  " + smallLi.size());
            
            //After creating 8 sublist, sending sublists with Async method so that 8 threads create and each thread process one sublist asynchronously.
            for (int i = 0; i < smallLi.size(); i++) {
                asyn.withAsyn(smallLi.get(i), schedulerCount);
            }
            schedulerCount++;
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

@Async("ThreadPoolTaskExecutor")
    public void withAsyn(List<Json> li, int schedulerCount) throws Exception {

        System.out.println("with start+++++++++++++ schedulerCount " + schedulerCount + ", name  :  "
                + Thread.currentThread().getName() + ", time  :  " + LocalDateTime.now() + ", start index  :  "
                + li.get(0).getId() + ", end index  :  " + li.get(li.size() - 1).getId());
        try {
            XSSFWorkbook workbook = new XSSFWorkbook();
            XSSFSheet spreadsheet = workbook.createSheet("Data");
            XSSFRow row;

            for (int i = 0; i < li.size(); i++) {

                row = spreadsheet.createRow(i);

                Cell cell9 = row.createCell(0);
                cell9.setCellValue(li.get(i).getId());

                Cell cell = row.createCell(1);
                cell.setCellValue(li.get(i).getName());

                Cell cell1 = row.createCell(2);
                cell1.setCellValue(li.get(i).getPhone());

                Cell cell2 = row.createCell(3);
                cell2.setCellValue(li.get(i).getEmail());

                Cell cell3 = row.createCell(4);
                cell3.setCellValue(li.get(i).getAddress());

                Cell cell4 = row.createCell(5);
                cell4.setCellValue(li.get(i).getPostalZip());
            }
            FileOutputStream out = new FileOutputStream(new File("C:\\Users\\RK658\\Desktop\\logs\\generated\\"
                    + Thread.currentThread().getName() + "_" + schedulerCount + ".xlsx"));
            workbook.write(out);
            out.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println("with end+++++++++++++ schedulerCount " + schedulerCount + ", name  :  "
                + Thread.currentThread().getName() + ", time  :  " + LocalDateTime.now() + ", start index  :  "
                + li.get(0).getId() + ", end index  :  " + li.get(li.size() - 1).getId());
    }
plicqrtu

plicqrtu2#

我正在寻找解决方案,下一个调度程序不应该调用,直到第一个调度程序得到完成。
您已经定义了@Service public class Main {...},默认情况下@Service是spring构建单例的指示,如果您没有做任何修改,那么预期的行为是spring在堆内存中只创建类Main的单个示例。
所以在这种情况下,如果你在你的方法中添加了synchronized关键字,你将只允许一个线程在某个时候执行你的方法,其他所有请求执行你的调度方法的线程都需要等待,直到已经运行的线程完成。

@Scheduled(cron = "0/2 * * * * *")
    public synchronized void schedule() { .... }

上述解决方案场景的示例。

  • Thread A开始执行方法schedule
  • 2秒后,Thread A仍在执行,Thread B启动并希望执行相同的方法。
  • Thread B将在外面等待,并且仅在Thread A完成执行之后才开始执行该方法。

因此,在上述场景中,如果3个调度程序在接近3个时间间隔时产生,所有这些调度程序都将等待执行,这可能会导致调度的瓶颈,并可能给资源带来沉重的负载。
如果是这样的话,另一个替代的解决方案是cancelThread B方法执行,如果它到达时Thread A已经执行了相同的方法,这样你就可以确保没有瓶颈。
要实现这一点,您可以使用ReentrantLock

@Service
public class Main {

   private final ReentrantLock reentrantLock = new ReentrantLock();

    ....
   
    @Scheduled(cron = "0/2 * * * * *")
    public void schedule() {

      if (reentrantLock.tryLock()) {
            try {

            //... all your existing method code

            } finally {
                reentrantLock.unlock();
            }
        } else {
          //log that the `Thread B` was cancelled if you want, as it requested to execute the method while `Thread A` was already executing.
        }
    }

 }

相关问题