java—使用动态参数spring batch在foreach循环中运行作业

z9ju0rcb  于 2021-06-30  发布在  Java
关注(0)|答案(0)|浏览(319)

我用springboot创建了一个spring批处理作业。我定制了阅读器,从restapi获取json数据,并将数据转换为java对象,编写器将数据推送到队列中。我在foreach循环中调用我的作业来设置参数并用不同的语言向restapi发送请求。对于第一次迭代,我的作业成功运行,但对于其他迭代,它只显示它已完成。
批量配置:

  1. @Configuration
  2. @EnableBatchProcessing
  3. public class BatchConfiguration {
  4. @Autowired
  5. public JobBuilderFactory jobBuilderFactory;
  6. @Autowired
  7. public StepBuilderFactory stepBuilderFactory;
  8. @Autowired
  9. public RestWebClient webClient;
  10. @Bean
  11. public ItemReader<Code> reader() {
  12. return new CodeAndLabelRestItemReader(webClient);
  13. }
  14. @Bean
  15. public CodeAndLabelItemProcessor processor() {
  16. return new CodeAndLabelItemProcessor("France","DP","transaction");
  17. }
  18. @Bean
  19. public ItemWriter<CodeAndLabel> calWriter(AmqpTemplate amqpTemplate) {
  20. return new CodeAndLabelItemWriter(amqpTemplate);
  21. }
  22. @Bean(name = "importJob")
  23. public Job importCodesAndLabelsJob(JobCompletionNotificationListener listener, Step stepJms) {
  24. return jobBuilderFactory.get("importJob")
  25. .incrementer(new RunIdIncrementer())
  26. .listener(listener)
  27. .flow(stepJms)
  28. .end()
  29. .build();
  30. }
  31. @Bean
  32. public Step stepJms(ItemWriter<CodeAndLabel> writer) {
  33. return stepBuilderFactory.get("stepJms")
  34. .<Code, CodeAndLabel>chunk(10)
  35. .reader(reader())
  36. .processor(processor())
  37. .writer(writer)
  38. .build();
  39. }

读卡器:

  1. public class CodeAndLabelRestItemReader implements ItemReader<Code>{
  2. private final RestWebClient webClient;
  3. private int nextCodeIndex;
  4. private List<Code> codes;
  5. public CodeAndLabelRestItemReader(RestWebClient webClient) {
  6. this.webClient = webClient;
  7. nextCodeIndex = 0;
  8. }
  9. @BeforeStep
  10. public void beforeStep(final StepExecution stepExecution) {
  11. JobParameters jobParameters = stepExecution.getJobParameters();
  12. this.webClient.setEndPointSuffix(jobParameters.getString("endPointSuffix"));
  13. }
  14. @Override
  15. public Code read() {
  16. if(codesAndLabelsListNotInitialized()) {
  17. codes = webClient.getCodes();
  18. }
  19. Code nextCode = null;
  20. if (nextCodeIndex < codes.size()) {
  21. nextCode = codes.get(nextCodeIndex);
  22. nextCodeIndex++;
  23. }
  24. return nextCode;
  25. }
  26. private boolean codesAndLabelsListNotInitialized() {
  27. return this.codes == null;
  28. }

}
处理器:

  1. public class CodeAndLabelItemProcessor implements ItemProcessor<Code, CodeAndLabel> {
  2. private String populationId;
  3. private String populationDataProvider;
  4. private String transactionId;
  5. public CodeAndLabelItemProcessor(String populationId, String populationDataProvider, String transactionId) {
  6. this.populationId = populationId;
  7. this.populationDataProvider = populationDataProvider;
  8. this.transactionId = transactionId;
  9. }
  10. @Override
  11. public CodeAndLabel process(Code code) throws Exception {
  12. CodeAndLabel codeAndLabel = new CodeAndLabel();
  13. codeAndLabel.setUid(code.getUid());
  14. System.out.println("Converting (" + code + ") into (" + codeAndLabel + ")");
  15. return codeAndLabel;
  16. }

}
作者:

  1. public class CodeAndLabelItemWriter implements ItemWriter<CodeAndLabel>{
  2. private AmqpTemplate template;
  3. public CodeAndLabelItemWriter(AmqpTemplate template) {
  4. this.template = template;
  5. }
  6. @Override
  7. public void write(List<? extends CodeAndLabel> items) throws Exception {
  8. if (log.isDebugEnabled()) {
  9. log.debug("Writing to RabbitMQ with " + items.size() + " items."); }
  10. for(CodeAndLabel item : items) {
  11. template.convertAndSend(BatchConfiguration.topicExchangeName,"com.batchprocessing.queue",item);
  12. System.out.println("item : "+item);
  13. }
  14. }

}侦听器:

  1. @Component
  2. public class JobCompletionNotificationListener extends JobExecutionListenerSupport {
  3. @Autowired
  4. private JdbcTemplate jdbcTemplate;
  5. @Override
  6. public void afterJob(JobExecution jobExecution) {
  7. if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
  8. System.out.println("JOB FINISHED");
  9. }
  10. }

}
以及运行作业的类:

  1. @Component
  2. public class Initialization {
  3. // some code here
  4. String[] languages = processLanguage.split(";");
  5. for(String language : languages) {
  6. JobParameters params = new JobParametersBuilder()
  7. .addString("JobID",String.valueOf(System.currentTimeMillis()))
  8. .addString("endPointSuffix",
  9. "/codeAndLabel".concat(language.toUpperCase()))
  10. .toJobParameters();
  11. jobLauncher.run(job, params);
  12. }

输出:对于第一次迭代:

  1. Converting (WFR.SP.2C) into (WFR.SP.2C)
  2. Converting (WFR.SP.3E) into (WFR.SP.3E)
  3. Converting (WFR.SP.FC) into (WFR.SP.FC)
  4. Converting (WFR.SP.FD) into (WFR.SP.FD)
  5. Converting (WFR.SP.FI) into (WFR.SP.FI)
  6. Converting (WFR.SP.FM) into (WFR.SP.FM)
  7. item : WFR.SP.2C
  8. item : WFR.SP.3E
  9. item : WFR.SP.FC
  10. item : WFR.SP.FD
  11. item : WFR.SP.FI
  12. item : WFR.SP.FM
  13. JOB FINISHED

第二次迭代

  1. JOB FINISHED

我认为在第二次迭代中,作业没有运行reader处理器和writerbeans,我不知道为什么。有人能帮点忙吗?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题