spring引导批处理在作业运行之间持久化数据

rdlzhqv9  于 2021-07-23  发布在  Java
关注(0)|答案(0)|浏览(193)

这是我的作业配置

  1. @Configuration
  2. @EnableScheduling
  3. public class JobOne {
  4. @Autowired
  5. public ReaderStep readerStep;
  6. @Bean
  7. public ProcessorStep processorStep() {
  8. return new ProcessorStep();
  9. }
  10. @Bean
  11. public WriterStep writerStep() {
  12. return new WriterStep();
  13. }
  14. @Bean
  15. public ExecutionContextPromotionListener executionContextPromotionListener(){
  16. ExecutionContextPromotionListener ecpl = new ExecutionContextPromotionListener();
  17. ecpl.setKeys(new String[] {"lastRunTime", "lastExecution"});
  18. return ecpl;
  19. }
  20. @Bean
  21. public Step step(StepBuilderFactory sbf, ReaderStep rs, ProcessorStep ps, WriterStep ws, ExecutionContextPromotionListener ls) {
  22. return sbf.get("MyStep")
  23. .<String,String>chunk(1)
  24. .reader(readerStep)
  25. .processor(ps)
  26. .writer(ws)
  27. .listener(ls)
  28. .allowStartIfComplete(true)
  29. .build();
  30. }
  31. @Bean
  32. public Job job(JobBuilderFactory jbf, Step step) {
  33. return jbf.get("MyJob")
  34. .incrementer(new RunIdIncrementer())
  35. .flow(step)
  36. .end()
  37. .build();
  38. }
  39. }

这是我的 ItemReader 实施:

  1. @Component
  2. @StepScope
  3. public class ReaderStep implements ItemReader<String> {
  4. String lastRunTime = null;
  5. ExecutionContext executionContext = null;
  6. String lastExecution = null;
  7. @BeforeStep
  8. public void getLastRunTime(StepExecution stepExecution) {
  9. if(executionContext==null)
  10. executionContext = stepExecution.getExecutionContext();
  11. lastRunTime = (String)executionContext.get("lastRunTime");
  12. lastExecution = (String) executionContext.get("lastExecution");
  13. System.out.println("BeforeStep "+ lastRunTime);
  14. }
  15. @AfterStep
  16. public void setLastRunTime(StepExecution stepExecution) {
  17. System.out.println("AfterStep "+ lastRunTime);
  18. System.out.println("AfterStepEC "+ (String)executionContext.get("lastRunTime"));
  19. stepExecution.getExecutionContext().put("lastRunTime", executionContext.getString("lastRunTime"));
  20. }
  21. @Override
  22. public String read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
  23. lastRunTime = (String)executionContext.get("lastRunTime");
  24. System.out.println("Read "+ lastRunTime);
  25. SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  26. if(lastRunTime==null) {
  27. lastRunTime = sdf.format(new Date());
  28. System.out.println("Read put ");
  29. executionContext.put("lastRunTime", lastRunTime);
  30. } else {
  31. String[] timeArr = lastRunTime.split(":");
  32. if(Integer.parseInt(timeArr[timeArr.length-1])%2==0) {
  33. lastRunTime = sdf.format(new Date());
  34. System.out.println("Read put else");
  35. executionContext.put("lastRunTime", lastRunTime);
  36. }
  37. }
  38. if(lastExecution!=null) {
  39. Date lastEx = sdf.parse(lastExecution);
  40. lastEx = new Date(lastEx.getTime()+TimeUnit.MINUTES.toMillis(2));
  41. Date curDate = new Date();
  42. if(curDate.before(lastEx)) {
  43. return null;
  44. } else {
  45. lastExecution=sdf.format(curDate);
  46. }
  47. } else {
  48. lastExecution=sdf.format(new Date());
  49. }
  50. return lastRunTime;
  51. }
  52. }

每次作业运行时, lastRunTime executioncontext的值为 null . 我要保留分配给的值 lastRunTime 即使在运行下一个作业之后,也要设置密钥。
如何做到这一点?我的代码中缺少什么?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题