我正在尝试创建一个Sping Boot 应用程序,它使用@Async多线程异步处理数千条记录。为此,我创建了8个线程和8个来自主列表的子列表,以便8个线程异步处理8个子列表。我还使用@scheduler,以便每2秒调用一次方法。
但问题是由于调度程序,有时这个应用程序处理重复的记录,因为这个方法每2秒被调用,并且每2秒从数据库中检索数据。例如,第一次方法被调用,并且从数据库中检索72000条记录,其标记为0,然后@Async方法处理所有这些记录,并且将处理的记录标记从0改变为1。然后在2秒内再次调用方法并检索标记为0的新记录。
在随附的日志图片中,您可以看到调度程序第一次被调用并检索到72000条记录,多个线程在下一个调度程序启动和检索到16000条记录之间开始处理,其中包含当前调度程序中存在的记录。
我正在寻找解决方案,下一个调度程序不应该调用,直到第一个调度程序得到完成。因为有时第一个调度程序处理之间的记录,如果下一个调度程序调用在2秒内,那么它可能会再次检索这些记录已经存在于第一个调度程序调用。
我不能增加调度器调用时间。因为最大时间我们得到的记录约400-500,有时我们得到的记录数以千计。
代码如下。
@SpringBootApplication
@EnableScheduling
@EnableAsync
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
@Bean
public RestTemplate restTemplate(RestTemplateBuilder builder) {
return builder.build();
}
@Bean("ThreadPoolTaskExecutor")
public TaskExecutor getAsyncExecutor() {
final ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(8);
executor.setMaxPoolSize(8);
executor.setQueueCapacity(8);
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setThreadNamePrefix("async-");
executor.setRejectedExecutionHandler(new RejectedExecutionHandlerImpl());
executor.initialize();
return executor;
}
}
@Service
public class Main {
@Autowired
private DemoDao dao;
@Autowired
private Asyn asyn;
static int schedulerCount = 0;
@Scheduled(cron = "0/2 * * * * *")
public void schedule() {
System.out.println("++++++++++++++++++++++++++++++++++++Scheduler started schedulerCount : "+schedulerCount+"+++++++++++++++++++++++++++++++++++++"+ LocalDateTime.now());
List<Json> jsonList = new ArrayList<Json>();
List<List<Json>> smallLi = new ArrayList<List<Json>>();
try {
jsonList = dao.getJsonList();
System.out.println("jsonList size : " + jsonList.size());
int count = jsonList.size();
//Creating 8 sublist (8 sublist because thread pool size is 8) from main list(jsonList)
int limit = Math.round(count / 8) + 1;
for (int j = 0; j < count; j += limit) {
smallLi.add(new ArrayList<Json>(jsonList.subList(j, Math.min(count, j + limit))));
}
System.out.println("smallLi : " + smallLi.size());
//After creating 8 sublist, sending sublists with Async method so that 8 threads create and each thread process one sublist asynchronously.
for (int i = 0; i < smallLi.size(); i++) {
asyn.withAsyn(smallLi.get(i), schedulerCount);
}
schedulerCount++;
} catch (Exception e) {
e.printStackTrace();
}
}
@Async("ThreadPoolTaskExecutor")
public void withAsyn(List<Json> li, int schedulerCount) throws Exception {
System.out.println("with start+++++++++++++ schedulerCount " + schedulerCount + ", name : "
+ Thread.currentThread().getName() + ", time : " + LocalDateTime.now() + ", start index : "
+ li.get(0).getId() + ", end index : " + li.get(li.size() - 1).getId());
try {
XSSFWorkbook workbook = new XSSFWorkbook();
XSSFSheet spreadsheet = workbook.createSheet("Data");
XSSFRow row;
for (int i = 0; i < li.size(); i++) {
row = spreadsheet.createRow(i);
Cell cell9 = row.createCell(0);
cell9.setCellValue(li.get(i).getId());
Cell cell = row.createCell(1);
cell.setCellValue(li.get(i).getName());
Cell cell1 = row.createCell(2);
cell1.setCellValue(li.get(i).getPhone());
Cell cell2 = row.createCell(3);
cell2.setCellValue(li.get(i).getEmail());
Cell cell3 = row.createCell(4);
cell3.setCellValue(li.get(i).getAddress());
Cell cell4 = row.createCell(5);
cell4.setCellValue(li.get(i).getPostalZip());
}
FileOutputStream out = new FileOutputStream(new File("C:\\Users\\RK658\\Desktop\\logs\\generated\\"
+ Thread.currentThread().getName() + "_" + schedulerCount + ".xlsx"));
workbook.write(out);
out.close();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("with end+++++++++++++ schedulerCount " + schedulerCount + ", name : "
+ Thread.currentThread().getName() + ", time : " + LocalDateTime.now() + ", start index : "
+ li.get(0).getId() + ", end index : " + li.get(li.size() - 1).getId());
}
2条答案
按热度按时间4dbbbstv1#
我相信你可以使用ShedLock来解决这个问题。https://www.baeldung.com/shedlock-spring
https://github.com/lukas-krecan/ShedLock
plicqrtu2#
我正在寻找解决方案,下一个调度程序不应该调用,直到第一个调度程序得到完成。
您已经定义了
@Service public class Main {...}
,默认情况下@Service
是spring构建单例的指示,如果您没有做任何修改,那么预期的行为是spring在堆内存中只创建类Main
的单个示例。所以在这种情况下,如果你在你的方法中添加了
synchronized
关键字,你将只允许一个线程在某个时候执行你的方法,其他所有请求执行你的调度方法的线程都需要等待,直到已经运行的线程完成。上述解决方案场景的示例。
Thread A
开始执行方法schedule
。Thread A
仍在执行,Thread B
启动并希望执行相同的方法。Thread B
将在外面等待,并且仅在Thread A
完成执行之后才开始执行该方法。因此,在上述场景中,如果3个调度程序在接近3个时间间隔时产生,所有这些调度程序都将等待执行,这可能会导致调度的瓶颈,并可能给资源带来沉重的负载。
如果是这样的话,另一个替代的解决方案是
cancel
Thread B
方法执行,如果它到达时Thread A
已经执行了相同的方法,这样你就可以确保没有瓶颈。要实现这一点,您可以使用
ReentrantLock
。