如何使JobRegistry包含Spring批处理中服务器重启时的作业信息

vmjh9lq9  于 2023-11-17  发布在  Spring
关注(0)|答案(1)|浏览(161)

我们有一个场景,每当一个作业执行STARTED Status时,服务器可能会崩溃。在这种情况下,为了重新启动我从这里理解的batch Spring Batch resume after server's failure,需要将batch_job_execution和batch_step_execution中的status和end_time列值从STARTED更新为FAQs,以便JobOperator.restart(jobExecutionId)API工作。
但是调用它会出现“NoSuchJobException:No job configuration with the name [] was registered”,因为JobOperator中的jobRegistryMap是空的。
现在,如何在服务器重启时重新填充JobRegistry中的这些作业信息,以便我们可以调用JobOperator.restart(jobExecutionId)。
下面是我的代码

  1. /** The Test properties. */
  2. @Autowired
  3. private TestProperties TestProperties;
  4. @Primary
  5. @Bean(name = "TestDataSource")
  6. public DataSource batchDataSource() {
  7. DataSource TestDBSrc = DataSourceBuilder.create().username(getUsername()).password(getPassword()).url(getUrl()).build();
  8. if (TestDBSrc != null && TestDBSrc instanceof HikariDataSource) {
  9. @SuppressWarnings("resource")
  10. HikariDataSource hikariDatsource = (HikariDataSource) TestDBSrc;
  11. hikariDatsource.setSchema(getSchema());
  12. }
  13. return TestDBSrc;
  14. }
  15. private String getSchema() {
  16. return TestProperties.getValue("spring.datasource.hikari.schema", "");
  17. }
  18. private String getUsername() {
  19. return TestProperties.getValue("spring.datasource.username", "");
  20. }
  21. private String getPassword() {
  22. return TestProperties.getValue("spring.datasource.password", "");
  23. }
  24. private String getUrl() {
  25. return TestProperties.getValue("spring.datasource.jdbc-url", "");
  26. }
  27. @Bean(name = "transactionManager")
  28. public JdbcTransactionManager batchTransactionManager(@Qualifier("TestDataSource") DataSource dataSource) {
  29. return new JdbcTransactionManager(dataSource);
  30. }
  31. @Bean(name = "TestBatchJobRepository")
  32. public JobRepository jobRepository(@Qualifier("TestDataSource") DataSource batchDataSource,
  33. @Qualifier("transactionManager") JdbcTransactionManager batchTransactionManager) throws Exception {
  34. JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
  35. factory.setDataSource(batchDataSource);
  36. factory.setTransactionManager(batchTransactionManager);
  37. factory.afterPropertiesSet();
  38. return factory.getObject();
  39. }
  40. @Bean(name = "TestBatchJobLauncher")
  41. public JobLauncher jobLauncher(@Qualifier("TestBatchJobRepository") JobRepository jobRepository) throws Exception {
  42. TaskExecutorJobLauncher jobLauncher = new TaskExecutorJobLauncher();
  43. jobLauncher.setJobRepository(jobRepository);
  44. jobLauncher.afterPropertiesSet();
  45. return jobLauncher;
  46. }
  47. @Bean (name = "TestBatchJobExplorer")
  48. public JobExplorer jobExplorer(@Qualifier("TestDataSource") DataSource dataSource,@Qualifier("transactionManager")JdbcTransactionManager batchTransactionManager) throws Exception {
  49. final JobExplorerFactoryBean bean = new JobExplorerFactoryBean();
  50. bean.setDataSource(dataSource);
  51. bean.setTransactionManager(batchTransactionManager);
  52. bean.setTablePrefix("BATCH_");
  53. bean.setJdbcOperations(new JdbcTemplate(dataSource));
  54. bean.afterPropertiesSet();
  55. return bean.getObject();
  56. }
  57. @Bean (name ="TestBatchJobRegistry")
  58. public JobRegistry jobRegistry() throws Exception {
  59. return new MapJobRegistry();
  60. }
  61. @Bean
  62. public JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor(@Qualifier("TestBatchJobRegistry") JobRegistry jobRegistry) {
  63. JobRegistryBeanPostProcessor postProcessor = new JobRegistryBeanPostProcessor();
  64. postProcessor.setJobRegistry(jobRegistry);
  65. return postProcessor;
  66. }
  67. @Bean (name = "TestBatchJobOperator")
  68. public JobOperator jobOperator(@Qualifier("TestBatchJobLauncher") JobLauncher jobLauncher, @Qualifier("TestBatchJobRepository") JobRepository jobRepository,
  69. @Qualifier("TestBatchJobRegistry") JobRegistry jobRegistry, @Qualifier("TestBatchJobExplorer") JobExplorer jobExplorer) {
  70. final SimpleJobOperator jobOperator = new SimpleJobOperator();
  71. jobOperator.setJobLauncher(jobLauncher);
  72. jobOperator.setJobRepository(jobRepository);
  73. jobOperator.setJobRegistry(jobRegistry);
  74. jobOperator.setJobExplorer(jobExplorer);
  75. return jobOperator;
  76. }
  77. }

字符串
因为我使用默认的内存Map

  1. @Bean (name ="TestBatchJobRegistry")
  2. public JobRegistry jobRegistry() throws Exception {
  3. return new MapJobRegistry();
  4. }


它在服务器重启时被清除。
更新24/10/2023:x1c 0d1x
在服务器重新启动时,

作业注册表为空。已尝试,

  1. public void restart(@Qualifier("febpBatchJobExplorer") JobExplorer jobExplorer,@Qualifier("febpBatchJobRepository") JobRepository jobRepository, @Qualifier("febpBatchJobOperator") JobOperator jobOperator){
  2. try {
  3. List<String> jobNames=jobExplorer.getJobNames();
  4. for(String jobName:jobNames)
  5. {
  6. List<JobInstance> jobInstances = jobExplorer.getJobInstances(jobName,0,1);// this will get one latest job from the database
  7. if(CollectionUtils.isNotEmpty(jobInstances)){
  8. JobInstance jobInstance = jobInstances.get(0);
  9. List<JobExecution> jobExecutions = jobExplorer.getJobExecutions(jobInstance);
  10. if(CollectionUtils.isNotEmpty(jobExecutions)){
  11. for(JobExecution execution: jobExecutions){
  12. // If the job status is STARTED then update the status to FAILED and restart the job using JobOperator.java
  13. if(execution.getStatus().equals(BatchStatus.STARTED) || execution.getStatus().equals(BatchStatus.FAILED)){
  14. execution.setEndTime(LocalDateTime.now());
  15. execution.setStatus(BatchStatus.FAILED);
  16. execution.setExitStatus(ExitStatus.FAILED);
  17. jobRepository.update(execution);
  18. jobOperator.restart(execution.getId());
  19. }
  20. }
  21. }
  22. }
  23. }
  24. }catch (Exception e1) {
  25. e1.printStackTrace();
  26. }
  27. }


但是jobOperator.restart(execution.getId())再次失败,因为jobRegistry为空。
在低于Spring批次代码

时失败

**更新2023年10月27日:**提供示例应用程序以重现问题https://github.com/PSHREYASHOLLA/SamplebatchApplication

它是一个maven项目,所以你可以调用mvn install,它将创建\target\SamplebatchApplication-0.0.1-SNAPSHOT.jar。现在你可以像启动任何springboot应用程序(启用Liquibase)一样启动它,java -jar SamplebatchApplication-0.0.1-SNAPSHOT.jar。
如果你看到application.properties文件,我们将其指向一个postgres数据库。我们所有的批处理配置https:github.com/PSHREYASHOLLA/SamplebatchApplication/blob/main/src/main/java/com/example/postgresql/model/FEBPDBConfig.java。
请通过调用rest post API启动批处理,http://localhost:8080/batch/batcher-or-resume-application-batch-job JSON body {“appRestartJobExecutionId”:““}如果我们使用空appRestartJobExecutionId调用此,则流程如下,com.example.postgresql.batchController.batcherOrResumeApplicationBatchJobByB2E()->com.example.postgresql.model.FebpApplicationJobServiceImpl. batcherApplicationBatchJob()-->我们做JobLauncher.run().现在,此作业将作为reader的一部分从febp_emp_detail_test读取50条记录,并作为writer的一部分将更新的记录写入febp_emp_tax_detail_test。这是一个愉快的流程。
现在,如果你调用上面的API,并说5秒后你杀死了服务器,只有部分提交将发生在febp_emp_tax_detail_test中,批处理状态将处于STARTED状态。现在,假设我重新启动服务器并使用失败的作业执行ID调用相同的post API,它现在将调用om.example.postgresql.Buller.BatchController.BullerOrResumeApplicationBatchJobByB2E()-> com.example.postgresql.model.FebpApplicationJobServiceImpl.resumeApplicationBatchJob()-> jobOperator.restart(failedBatchExecutionId);此处由于jobRegistry为空,重启API失败。

更新02/11/2023:

根据Mahmoud Ben Hassine的建议,将我的作业更改为Bean后,我可以重新启动作业。但现在重新启动后,新的作业执行开始并显示完成,但无法处理数据。如果作业在单线程上运行,则处理所有数据,但如果是多线程,则不做任何事情。请检查https://github.com/PSHREYASHOLLA/SamplebatchApplication/blob/main/src/main/java/com/example/postgresql/model/EmployeeTaxCalculationBatchConfig.java
第60行,Step step = new StepBuilder(“FEBP_EMP_TAX_CALCULATION_STEP”,jobRepository).<pageDetail,pageTaxDetail>chunk(5,transactionManager).reader(reader.getPagingItemReader()).processor(itemProcessor).writer(itemWriter).taskExecutor(actStmntTaskExecutor()).throttleLimit(50).build();
这里,如果throttleLimit为1,则在重新启动后处理记录,但如果是多线程的,则不处理示例表中的剩余记录。

5jvtdoz2

5jvtdoz21#

这不应该是这样的,因为你已经注册了一个JobRegistryBeanPostProcessor。这是bean后处理器,每次Spring应用程序上下文启动(重新启动)时,它都会填充作业注册表。

  • 编辑:提供最小示例后更新答案 *

这里的问题是作业没有被声明为bean,而是配置类(显然不是Job类型)被声明为该名称的bean。因此,JobRegistryBeanPostProcessor找不到作业,也没有在注册表中注册它。
Job应该在应用程序上下文中注册为bean,以便JobRegistryBeanPostProcessor对bean进行后处理并在注册表中注册。

相关问题