尝试在Spring Batch中实现Process Indicator模式

unftdfkk  于 2024-01-06  发布在  Spring
关注(0)|答案(1)|浏览(129)

我正在努力寻找一个Process Indicator Pattern示例。到目前为止,我有一个源STUDENTS表,其中有一个STATUS字段来指示记录是否已经处理。我正在使用任务执行器进行多线程处理。
在我的编写器中,我将处理过的记录插入到一个新的PROCESSED_STUDENTS表中,并将源STUDENTS表中处理过的记录的状态更新为Processed,我在一个事务块中执行这两个操作,以防任何失败的情况下恢复更改。
这不适用于JdbcPagingItemReader,因为有些记录在处理结束时未被处理。
有人能告诉我我错过了什么吗?

读者

  1. @Bean
  2. @StepScope
  3. public ItemReader<SourceData> reader(DataSource dataSource) {
  4. Map<String, Object> parameterValues = new HashMap<>();
  5. parameterValues.put("status", "ToBeProcessed");
  6. JdbcPagingItemReader<SourceData> reader = new JdbcPagingItemReader<>();
  7. reader.setName("Oracle_RCP");
  8. reader.setDataSource(dataSource);
  9. reader.setRowMapper(SourceData.rowMapper());
  10. reader.setParameterValues(parameterValues);
  11. reader.setPageSize(100);
  12. reader.setQueryProvider(getQueryProvider(new OraclePagingQueryProvider(), "SELECT ID, NAME, CREATED_TIME", "FROM STUDENTS", "WHERE STATUS = :status", CREATED_TIME, Order.ASCENDING));
  13. reader.setSaveState(false);
  14. try {
  15. reader.afterPropertiesSet();
  16. } catch (Exception e) {
  17. log.error(e.getMessage(), e.getStackTrace());
  18. }
  19. return reader;
  20. }
  21. public PagingQueryProvider getQueryProvider(AbstractSqlPagingQueryProvider queryProvider, String select, String from, String where, String sortKey, Order order) {
  22. queryProvider.setSelectClause(select);
  23. queryProvider.setFromClause(from);
  24. if (where != null) {
  25. queryProvider.setWhereClause(where);
  26. }
  27. Map<String, Order> sortConfiguration = new HashMap<>();
  28. sortConfiguration.put(sortKey, order);
  29. queryProvider.setSortKeys(sortConfiguration);
  30. return queryProvider;
  31. }

字符串

处理器

  1. @Bean
  2. @StepScope
  3. public ItemProcessor<SourceData, OutData> processor(
  4. @Value("#{jobParameters['processDate']}") String processDate) {
  5. return new CustomItemProcessor(processDate);
  6. }

作家

  1. @SuppressWarnings("unchecked")
  2. @Bean
  3. public ItemWriter<OutData> writer(Utils utils) {
  4. return OutDataList -> utils.batchOperation((List<OutData>) OutDataList, chunk);
  5. }

作业、步骤和任务执行器

  1. @Bean("MainJob")
  2. @Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE)
  3. public Job mainJob(JobBuilderFactory jobBuilderFactory, Step step) {
  4. return jobBuilderFactory.get("mainJob")
  5. .incrementer(new RunIdIncrementer())
  6. .flow(step)
  7. .end()
  8. .build();
  9. }
  10. @Bean
  11. public Step step(StepBuilderFactory stepBuilderFactory) {
  12. return stepBuilderFactory.get("step")
  13. .<SourceData, OutData> chunk(chunk)
  14. .reader(reader(null))
  15. .processor(processor(null))
  16. .writer(writer(null))
  17. .taskExecutor(taskExecutor())
  18. .build();
  19. }
  20. @Bean
  21. @StepScope
  22. public ThreadPoolTaskExecutor taskExecutor() {
  23. ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
  24. executor.setMaxPoolSize(50);
  25. executor.setCorePoolSize(25);
  26. executor.setQueueCapacity(25);
  27. executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
  28. executor.setThreadNamePrefix("MultiThreaded-Executor");
  29. return executor;
  30. }


batchOperation方法,用于在PROCESSED_STUDENTS表中插入已处理的记录并更新STUDENTS源表中已处理的记录:

  1. public void batchOperation(List<OutData> outDataList, int batchSize) {
  2. try (
  3. Connection con = jdbcTemplate.getDataSource().getConnection();
  4. PreparedStatement psInsert = con.prepareStatement("INSERT INTO PROCESSED_STUDENTS (ID, NAME) VALUES (?, ?)");
  5. PreparedStatement psUpdate = con.prepareStatement("UPDATE STUDENTS SET STATUS = 'Processed' WHERE ID = ?");) {
  6. // Starting transaction block
  7. con.setAutoCommit(false);
  8. int i = 0;
  9. for (OutData argument : outDataList) {
  10. psInsert.setLong(1, argument.getId());
  11. psInsert.setString(2, argument.getName());
  12. psUpdate.setLong(1, argument.getId());
  13. psInsert.addBatch();
  14. psUpdate.addBatch();
  15. i++;
  16. if (i % batchSize == 0) {
  17. psInsert.executeBatch();
  18. psUpdate.executeBatch();
  19. }
  20. }
  21. // Executing remaining batch if total record count is an odd number
  22. psInsert.executeBatch();
  23. psUpdate.executeBatch();
  24. // End transaction block, commit changes
  25. con.commit();
  26. // Setting it back to default true
  27. con.setAutoCommit(true);
  28. } catch (Exception e) {
  29. log.error(e.getMessage());
  30. }
  31. }

gajydyqb

gajydyqb1#

在这种情况下,您不能使用JdbcPagingItemReader,因为您正在更改用作筛选器(状态)的参数。分页将不起作用。
使用JdbcCursorItemReader代替。
标签:(Spring Batch) Not all records are proccessed

相关问题