spring引导批处理在作业运行之间持久化数据

rdlzhqv9  于 2021-07-23  发布在  Java
关注(0)|答案(0)|浏览(156)

这是我的作业配置

@Configuration
@EnableScheduling

public class JobOne {

    @Autowired
    public ReaderStep readerStep;

    @Bean
    public ProcessorStep processorStep() {
        return new ProcessorStep();
    }

    @Bean
    public WriterStep writerStep() {
        return new WriterStep();
    }

    @Bean
    public ExecutionContextPromotionListener executionContextPromotionListener(){
        ExecutionContextPromotionListener ecpl = new ExecutionContextPromotionListener();
        ecpl.setKeys(new String[] {"lastRunTime", "lastExecution"});
        return ecpl;
    }

    @Bean
    public Step step(StepBuilderFactory sbf, ReaderStep rs, ProcessorStep ps, WriterStep ws, ExecutionContextPromotionListener ls) {
        return sbf.get("MyStep")
                .<String,String>chunk(1)
                .reader(readerStep)
                .processor(ps)
                .writer(ws)
                .listener(ls)
                .allowStartIfComplete(true)
                .build();
    }

    @Bean
    public Job job(JobBuilderFactory jbf, Step step) {
        return jbf.get("MyJob")
                .incrementer(new RunIdIncrementer())
                .flow(step)
                .end()
                .build();
    }
}

这是我的 ItemReader 实施:

@Component
@StepScope
public class ReaderStep implements ItemReader<String> {

    String lastRunTime = null;
    ExecutionContext executionContext = null;
    String lastExecution = null;

    @BeforeStep
    public void getLastRunTime(StepExecution stepExecution) {
        if(executionContext==null)
        executionContext = stepExecution.getExecutionContext();
        lastRunTime = (String)executionContext.get("lastRunTime");
        lastExecution = (String) executionContext.get("lastExecution");
        System.out.println("BeforeStep "+ lastRunTime);
    }
    @AfterStep
    public void setLastRunTime(StepExecution stepExecution) {
        System.out.println("AfterStep "+ lastRunTime);
        System.out.println("AfterStepEC "+ (String)executionContext.get("lastRunTime"));
        stepExecution.getExecutionContext().put("lastRunTime", executionContext.getString("lastRunTime"));
    }

    @Override
    public String read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
        lastRunTime = (String)executionContext.get("lastRunTime");
        System.out.println("Read "+ lastRunTime);
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        if(lastRunTime==null) {

            lastRunTime = sdf.format(new Date());
            System.out.println("Read put ");
            executionContext.put("lastRunTime", lastRunTime);

        } else {
            String[] timeArr = lastRunTime.split(":");
            if(Integer.parseInt(timeArr[timeArr.length-1])%2==0) {
                lastRunTime = sdf.format(new Date());
                System.out.println("Read put else");
                executionContext.put("lastRunTime", lastRunTime);
            }
        }
        if(lastExecution!=null) {
            Date lastEx = sdf.parse(lastExecution);
            lastEx = new Date(lastEx.getTime()+TimeUnit.MINUTES.toMillis(2));
            Date curDate = new Date();
            if(curDate.before(lastEx)) {
                return null;
            } else {
                lastExecution=sdf.format(curDate);
            }
        } else {
            lastExecution=sdf.format(new Date());
        }   

        return lastRunTime;
    }

}

每次作业运行时, lastRunTime executioncontext的值为 null . 我要保留分配给的值 lastRunTime 即使在运行下一个作业之后,也要设置密钥。
如何做到这一点?我的代码中缺少什么?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题