Spring批处理AsyncItemProcessor未并行处理

xytpbqjk  于 2023-02-11  发布在  Spring
关注(0)|答案(1)|浏览(187)

我有一个AsyncItemProcessor,我想与下面的配置并行运行。但是,处理不是并行发生的。

@Configuration
@EnableBatchProcessing
@EnableAsync
public class JobConfig {
   @Autowired
    private JobBuilderFactory jobBuilder;

    @Autowired
    private StepBuilderFactory stepBuilder;

    @Autowired
    @Qualifier("writer")
    private ItemWriter writer;

    @Bean
    @JobScope
    public ItemProcessor itemProcessor() {
        ItemProcessor itemProcessor = new ItemProcessor();
        return itemProcessor;
    }

    @Bean
    @JobScope
    public AsyncItemProcessor asyncItemProcessor() throws IOException {
        AsyncItemProcessor asyncItemProcessor = new AsyncItemProcessor();
        asyncItemProcessor.setDelegate(itemProcessor());
        asyncItemProcessor.setTaskExecutor(getAsyncExecutor());
        asyncItemProcessor.afterPropertiesSet();
        return asyncItemProcessor;
    }

    @Bean(name = "asyncExecutor")
    public TaskExecutor getAsyncExecutor() {
        SimpleAsyncTaskExecutor simpleAsyncTaskExecutor = new SimpleAsyncTaskExecutor() {
            @Override
            protected void doExecute(Runnable task) {
                final JobExecution jobExecution = JobSynchronizationManager.getContext().getJobExecution();
                super.doExecute(() -> {
                    JobSynchronizationManager.register(jobExecution);
                    try {
                        task.run();
                    } finally {
                        JobSynchronizationManager.close();
                    }
                });
            }
        };
        simpleAsyncTaskExecutor.setThreadNamePrefix("processing 1-");
        simpleAsyncTaskExecutor.setConcurrencyLimit(100);
        return simpleAsyncTaskExecutor;
    }

  

    @Bean
    @JobScope
    public AsyncItemWriter asyncItemWriter() throws IOException {
        AsyncItemWriter asyncItemWriter = new AsyncItemWriter<>();
        asyncItemWriter.setDelegate(writer);
        asyncItemWriter.afterPropertiesSet();
        return asyncItemWriter;
    }

    @Bean
    @JobScope
    public FlatFileItemReader<Result> requestFileReader() {
        DefaultLineMapper lineMapper = new DefaultLineMapper();  
        ......
        FlatFileItemReader<Result> itemReader = new FlatFileItemReader<>();
        itemReader.setLineMapper(lineMapper);
        return itemReader;
    }

    @Bean
    public Step simpleFileStep() throws IOException {
        return stepBuilder.get("simpleFileStep").chunk(100).reader(fileReader).processor(asyncItemProcessor())
                .writer(asyncItemWriter()).build();
    }

    @Bean(name = "customExecutionContext")
    @JobScope
    public ExecutionContext customExecutionContext() {
        ExecutionContext executionContext = new ExecutionContext();
        return executionContext;
    }
}

处理器类别:

@JobScope
public class RequestProcessor implements ItemProcessor<Result, List<Item>> {
    @Value("#{jobExecution}")
    private JobExecution jobExecution;

    @Autowired
    @Qualifier("customExecutionContext")
    private ExecutionContext storedContext;

    @Override
    public List<Item> process(Result result) throws Exception {
             Date start = new Date();
             // Processing logic
             Date end = new Date();
            long diff = end.getTime() - start.getTime();
            log.info("Time taken to process the 
                          items:"+TimeUnit.SECONDS.convert(diff, TimeUnit.MILLISECONDS));
            return items;  
    }
}

我想在这个场景中并行处理1000条记录的文件,但是一次只能处理和写入100条记录。如果配置有问题,请告诉我。
另外,在处理完每个100个项目的块之后,我在处理下一个块之前有2分钟的延迟。在这段时间里,我只能看到以下日志:

[GC concurrent-string-deduplication, 16.2K->0.0B(16.2K), avg 88.7%, 0.0000820 secs]
[GC pause (G1 Evacuation Pause) (young) 687M->302M(768M), 0.0138859 secs]
7rfyedvj

7rfyedvj1#

并发性与并行性不同。AsyncItemProcessor旨在与AsyncItemWriter一起并行处理项目。在您的示例中,将并发处理100个项目的单个块,但不会并行处理块。它仍然是块的串行执行,但每个块由任务执行器中的不同线程并发处理。
在Spring Batch中无法并行处理块,但Spring Batch提供的是分区,分区可以并行处理(使用本地线程或远程JVM)。
因此,您可以对输入进行分区(例如,每个分区中包含1000个项目),并配置分区步骤以并行处理分区。请注意,每个分区也可以进一步并发处理块。您可以在参考文档中找到更多详细信息和代码示例:缩放和并行处理。

相关问题