我有一个AsyncItemProcessor,我想与下面的配置并行运行。但是,处理不是并行发生的。
@Configuration
@EnableBatchProcessing
@EnableAsync
public class JobConfig {
@Autowired
private JobBuilderFactory jobBuilder;
@Autowired
private StepBuilderFactory stepBuilder;
@Autowired
@Qualifier("writer")
private ItemWriter writer;
@Bean
@JobScope
public ItemProcessor itemProcessor() {
ItemProcessor itemProcessor = new ItemProcessor();
return itemProcessor;
}
@Bean
@JobScope
public AsyncItemProcessor asyncItemProcessor() throws IOException {
AsyncItemProcessor asyncItemProcessor = new AsyncItemProcessor();
asyncItemProcessor.setDelegate(itemProcessor());
asyncItemProcessor.setTaskExecutor(getAsyncExecutor());
asyncItemProcessor.afterPropertiesSet();
return asyncItemProcessor;
}
@Bean(name = "asyncExecutor")
public TaskExecutor getAsyncExecutor() {
SimpleAsyncTaskExecutor simpleAsyncTaskExecutor = new SimpleAsyncTaskExecutor() {
@Override
protected void doExecute(Runnable task) {
final JobExecution jobExecution = JobSynchronizationManager.getContext().getJobExecution();
super.doExecute(() -> {
JobSynchronizationManager.register(jobExecution);
try {
task.run();
} finally {
JobSynchronizationManager.close();
}
});
}
};
simpleAsyncTaskExecutor.setThreadNamePrefix("processing 1-");
simpleAsyncTaskExecutor.setConcurrencyLimit(100);
return simpleAsyncTaskExecutor;
}
@Bean
@JobScope
public AsyncItemWriter asyncItemWriter() throws IOException {
AsyncItemWriter asyncItemWriter = new AsyncItemWriter<>();
asyncItemWriter.setDelegate(writer);
asyncItemWriter.afterPropertiesSet();
return asyncItemWriter;
}
@Bean
@JobScope
public FlatFileItemReader<Result> requestFileReader() {
DefaultLineMapper lineMapper = new DefaultLineMapper();
......
FlatFileItemReader<Result> itemReader = new FlatFileItemReader<>();
itemReader.setLineMapper(lineMapper);
return itemReader;
}
@Bean
public Step simpleFileStep() throws IOException {
return stepBuilder.get("simpleFileStep").chunk(100).reader(fileReader).processor(asyncItemProcessor())
.writer(asyncItemWriter()).build();
}
@Bean(name = "customExecutionContext")
@JobScope
public ExecutionContext customExecutionContext() {
ExecutionContext executionContext = new ExecutionContext();
return executionContext;
}
}
处理器类别:
@JobScope
public class RequestProcessor implements ItemProcessor<Result, List<Item>> {
@Value("#{jobExecution}")
private JobExecution jobExecution;
@Autowired
@Qualifier("customExecutionContext")
private ExecutionContext storedContext;
@Override
public List<Item> process(Result result) throws Exception {
Date start = new Date();
// Processing logic
Date end = new Date();
long diff = end.getTime() - start.getTime();
log.info("Time taken to process the
items:"+TimeUnit.SECONDS.convert(diff, TimeUnit.MILLISECONDS));
return items;
}
}
我想在这个场景中并行处理1000条记录的文件,但是一次只能处理和写入100条记录。如果配置有问题,请告诉我。
另外,在处理完每个100个项目的块之后,我在处理下一个块之前有2分钟的延迟。在这段时间里,我只能看到以下日志:
[GC concurrent-string-deduplication, 16.2K->0.0B(16.2K), avg 88.7%, 0.0000820 secs]
[GC pause (G1 Evacuation Pause) (young) 687M->302M(768M), 0.0138859 secs]
1条答案
按热度按时间7rfyedvj1#
并发性与并行性不同。
AsyncItemProcessor
旨在与AsyncItemWriter
一起并行处理项目。在您的示例中,将并发处理100个项目的单个块,但不会并行处理块。它仍然是块的串行执行,但每个块由任务执行器中的不同线程并发处理。在Spring Batch中无法并行处理块,但Spring Batch提供的是分区,分区可以并行处理(使用本地线程或远程JVM)。
因此,您可以对输入进行分区(例如,每个分区中包含1000个项目),并配置分区步骤以并行处理分区。请注意,每个分区也可以进一步并发处理块。您可以在参考文档中找到更多详细信息和代码示例:缩放和并行处理。