我用springboot创建了一个spring批处理作业。我定制了阅读器,从restapi获取json数据,并将数据转换为java对象,编写器将数据推送到队列中。我在foreach循环中调用我的作业来设置参数并用不同的语言向restapi发送请求。对于第一次迭代,我的作业成功运行,但对于其他迭代,它只显示它已完成。
批量配置:
@Configuration
@EnableBatchProcessing
public class BatchConfiguration {
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
@Autowired
public RestWebClient webClient;
@Bean
public ItemReader<Code> reader() {
return new CodeAndLabelRestItemReader(webClient);
}
@Bean
public CodeAndLabelItemProcessor processor() {
return new CodeAndLabelItemProcessor("France","DP","transaction");
}
@Bean
public ItemWriter<CodeAndLabel> calWriter(AmqpTemplate amqpTemplate) {
return new CodeAndLabelItemWriter(amqpTemplate);
}
@Bean(name = "importJob")
public Job importCodesAndLabelsJob(JobCompletionNotificationListener listener, Step stepJms) {
return jobBuilderFactory.get("importJob")
.incrementer(new RunIdIncrementer())
.listener(listener)
.flow(stepJms)
.end()
.build();
}
@Bean
public Step stepJms(ItemWriter<CodeAndLabel> writer) {
return stepBuilderFactory.get("stepJms")
.<Code, CodeAndLabel>chunk(10)
.reader(reader())
.processor(processor())
.writer(writer)
.build();
}
读卡器:
public class CodeAndLabelRestItemReader implements ItemReader<Code>{
private final RestWebClient webClient;
private int nextCodeIndex;
private List<Code> codes;
public CodeAndLabelRestItemReader(RestWebClient webClient) {
this.webClient = webClient;
nextCodeIndex = 0;
}
@BeforeStep
public void beforeStep(final StepExecution stepExecution) {
JobParameters jobParameters = stepExecution.getJobParameters();
this.webClient.setEndPointSuffix(jobParameters.getString("endPointSuffix"));
}
@Override
public Code read() {
if(codesAndLabelsListNotInitialized()) {
codes = webClient.getCodes();
}
Code nextCode = null;
if (nextCodeIndex < codes.size()) {
nextCode = codes.get(nextCodeIndex);
nextCodeIndex++;
}
return nextCode;
}
private boolean codesAndLabelsListNotInitialized() {
return this.codes == null;
}
}
处理器:
public class CodeAndLabelItemProcessor implements ItemProcessor<Code, CodeAndLabel> {
private String populationId;
private String populationDataProvider;
private String transactionId;
public CodeAndLabelItemProcessor(String populationId, String populationDataProvider, String transactionId) {
this.populationId = populationId;
this.populationDataProvider = populationDataProvider;
this.transactionId = transactionId;
}
@Override
public CodeAndLabel process(Code code) throws Exception {
CodeAndLabel codeAndLabel = new CodeAndLabel();
codeAndLabel.setUid(code.getUid());
System.out.println("Converting (" + code + ") into (" + codeAndLabel + ")");
return codeAndLabel;
}
}
作者:
public class CodeAndLabelItemWriter implements ItemWriter<CodeAndLabel>{
private AmqpTemplate template;
public CodeAndLabelItemWriter(AmqpTemplate template) {
this.template = template;
}
@Override
public void write(List<? extends CodeAndLabel> items) throws Exception {
if (log.isDebugEnabled()) {
log.debug("Writing to RabbitMQ with " + items.size() + " items."); }
for(CodeAndLabel item : items) {
template.convertAndSend(BatchConfiguration.topicExchangeName,"com.batchprocessing.queue",item);
System.out.println("item : "+item);
}
}
}侦听器:
@Component
public class JobCompletionNotificationListener extends JobExecutionListenerSupport {
@Autowired
private JdbcTemplate jdbcTemplate;
@Override
public void afterJob(JobExecution jobExecution) {
if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
System.out.println("JOB FINISHED");
}
}
}
以及运行作业的类:
@Component
public class Initialization {
// some code here
String[] languages = processLanguage.split(";");
for(String language : languages) {
JobParameters params = new JobParametersBuilder()
.addString("JobID",String.valueOf(System.currentTimeMillis()))
.addString("endPointSuffix",
"/codeAndLabel".concat(language.toUpperCase()))
.toJobParameters();
jobLauncher.run(job, params);
}
输出:对于第一次迭代:
Converting (WFR.SP.2C) into (WFR.SP.2C)
Converting (WFR.SP.3E) into (WFR.SP.3E)
Converting (WFR.SP.FC) into (WFR.SP.FC)
Converting (WFR.SP.FD) into (WFR.SP.FD)
Converting (WFR.SP.FI) into (WFR.SP.FI)
Converting (WFR.SP.FM) into (WFR.SP.FM)
item : WFR.SP.2C
item : WFR.SP.3E
item : WFR.SP.FC
item : WFR.SP.FD
item : WFR.SP.FI
item : WFR.SP.FM
JOB FINISHED
第二次迭代
JOB FINISHED
我认为在第二次迭代中,作业没有运行reader处理器和writerbeans,我不知道为什么。有人能帮点忙吗?
暂无答案!
目前还没有任何答案,快来回答吧!