Spring批处理jpaPagingItemReader为什么有些行没有读取?

qpgpyjmq  于 2023-01-05  发布在  Spring
关注(0)|答案(4)|浏览(214)

我用的是春批(3.0.1.RELEASE)/ JPA和一个HSQLBD服务器数据库。我需要浏览整个表(使用分页)和更新项目(一个接一个)。所以我使用了一个jpaPagingItemReader。但是当我运行作业时,我可以看到一些行被跳过,跳过的行数等于页面大小。例如,如果我的表有12行,并且jpaPagingItemReader.pagesize = 3,则作业将读取:第1、2、3行,然后是第7、8、9行(跳过第4、5、6行)......您能告诉我代码/配置中有什么问题吗?或者可能是HSQLDB分页的问题?下面是我的代码:

[编辑]:问题出在对POJO实体执行修改的ItemProcessor上。由于JPAPagingItemReader在每次阅读之间进行了刷新,因此实体被更新((这是我想要的)。但似乎游标分页也增加了(如日志中所示:行ID 4、5和6已被跳过)。我如何管理此问题?

@Configuration
@EnableBatchProcessing(modular=true)
public class AppBatchConfig {
  @Inject
  private InfrastructureConfiguration infrastructureConfiguration;  
  @Inject private JobBuilderFactory jobs;
  @Inject private StepBuilderFactory steps;

  @Bean  public Job job() {
     return jobs.get("Myjob1").start(step1()).build();
  }
  @Bean  public Step step1() {  
      return steps.get("step1")
                .<SNUserPerCampaign, SNUserPerCampaign> chunk(0)
                .reader(reader()).processor(processor()).build();   
  }
  @Bean(destroyMethod = "")
@JobScope 
public ItemStreamReader<SNUserPerCampaign> reader() String trigramme) {
    JpaPagingItemReader reader = new JpaPagingItemReader();
    reader.setEntityManagerFactory(infrastructureConfiguration.getEntityManagerFactory());
    reader.setQueryString("select t from SNUserPerCampaign t where t.isactive=true");
    reader.setPageSize(3));
    return reader;
}
 @Bean @JobScope
 public ItemProcessor<SNUserPerCampaign, SNUserPerCampaign> processor() {   
     return new MyItemProcessor();
 }
}

@Configuration
@EnableBatchProcessing
public class StandaloneInfrastructureConfiguration implements InfrastructureConfiguration {
 @Inject private EntityManagerFactory emf;  
 @Override
public EntityManagerFactory getEntityManagerFactory() {
    return emf;
}
}

从我的项目处理器:

@Override
public SNUserPerCampaign process(SNUserPerCampaign item) throws Exception {
    //do some stuff …
   //then if (condition) update the Entity pojo :   
   item.setModificationDate(new Timestamp(System.currentTimeMillis());
   item.setIsactive = false;

}

来自Spring xml配置文件:

<tx:annotation-driven transaction-manager="transactionManager" />     
<bean id="transactionManager" class="org.springframework.orm.jpa.JpaTransactionManager">
    <property name="entityManagerFactory" ref="entityManagerFactory" />
</bean>

<bean id="entityManagerFactory" class="org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean">
    <property name="dataSource" ref="dataSource" />
</bean>

<bean id="dataSource" class="org.springframework.jdbc.datasource.DriverManagerDataSource">
    <property name="driverClassName" value="org.hsqldb.jdbcDriver" />
    <property name="url" value="jdbc:hsqldb:hsql://localhost:9001/MYAppDB" />
    <property name="username" value="sa" />
    <property name="password" value="" />
</bean>

跟踪/日志总结:

11:16:05.728 TRACE MyItemProcessor - item processed: snUserInternalId=1]
11:16:06.038 TRACE MyItemProcessor - item processed: snUserInternalId=2]
11:16:06.350 TRACE MyItemProcessor - item processed: snUserInternalId=3]

11:16:06.674 DEBUG SQL- update SNUSER_CAMPAIGN  set ...etc...
11:16:06.677 DEBUG SQL- update SNUSER_CAMPAIGN  set ...etc...
11:16:06.679 DEBUG SQL- update SNUSER_CAMPAIGN  set ...etc...

11:16:06.681 DEBUG SQL- select ...etc... from  SNUSER_CAMPAIGN snuserperc0_ 

11:16:06.687 TRACE MyItemProcessor - item processed: snUserInternalId=7]
11:16:06.998 TRACE MyItemProcessor - item processed: snUserInternalId=8]
11:16:07.314 TRACE MyItemProcessor - item processed: snUserInternalId=9]
quhf5bfb

quhf5bfb1#

org.springframework.batch.item.database.JpaPagingItemReader创建自己的entityManager示例
(from org.springframework.batch.item.database.JpaPagingItemReader#doOpen):

entityManager = entityManagerFactory.createEntityManager(jpaPropertyMap);

如果您在事务中,就像看上去的那样,读取器实体不会分离(从org. springframework. batch. item. database. JpaPagingItemReader #doReadPage):

if (!transacted) {
        List<T> queryResult = query.getResultList();
        for (T entity : queryResult) {
            entityManager.detach(entity);
            results.add(entity);
        }//end if
    } else {
        results.addAll(query.getResultList());
        tx.commit();
    }

因此,当您将一个项目更新到processor或writer中时,该项目仍然由reader的entityManager管理。
当项读取器读取下一个数据块时,它将上下文刷新到数据库。
因此,如果我们看一下您的案例,在处理完第一个数据块之后,我们在数据库中有:

|id|active
|1 | false
|2 | false
|3 | false

jpaPagingItemReader使用limit & offset来检索分页数据,因此阅读器创建的下一个select如下所示:

select * from table where active = true offset 3 limits 3.

读取器将丢失ID为4、5、6的项,因为它们现在是数据库检索的第一行。
作为一种变通方法,您可以使用jdbc实现(org. springframework. batch. item. database. JdbcPagingItemReader),因为它不使用限制和偏移量。它基于排序列(通常是id列),因此您不会丢失任何数据。当然,您必须将数据更新到写入器中(使用JPA或纯JDBC实现)
阅读器将更加冗长:

@Bean
public ItemReader<? extends Entity> reader() {
    JdbcPagingItemReader<Entity> reader = new JdbcPagingItemReader<Entity>();
    final SqlPagingQueryProviderFactoryBean sqlPagingQueryProviderFactoryBean = new SqlPagingQueryProviderFactoryBean();
    sqlPagingQueryProviderFactoryBean.setDataSource(dataSource);
    sqlPagingQueryProviderFactoryBean.setSelectClause("select *");
    sqlPagingQueryProviderFactoryBean.setFromClause("from <your table name>");
    sqlPagingQueryProviderFactoryBean.setWhereClause("where active = true");
    sqlPagingQueryProviderFactoryBean.setSortKey("id");
    try {
        reader.setQueryProvider(sqlPagingQueryProviderFactoryBean.getObject());
    } catch (Exception e) {
        e.printStackTrace();
    }
    reader.setDataSource(dataSource);
    reader.setPageSize(3);
    reader.setRowMapper(new BeanPropertyRowMapper<Entity>(Entity.class));
    return reader;
lvjbypge

lvjbypge2#

我也遇到过同样的情况,我的阅读器是一个JpaPagingItemReader,它查询在编写器中更新的字段,因此跳过了需要更新的一半项,这是由于页面窗口在前进,而已经读取的项不再处于阅读器范围内。
对我来说,最简单的解决方法是重写JpaPagingItemReader上的getPage方法,使其始终返回第一页。

JpaPagingItemReader<XXXXX> jpaPagingItemReader = new JpaPagingItemReader() {
    @Override
    public int getPage() {
        return 0;
    }
};
1szpjjfi

1szpjjfi3#

需要注意的几件事:
1.从JpaPagingItemReader返回的所有实体都将被分离。我们可以通过以下两种方式之一来实现这一点。我们可以在查询页面之前创建一个事务,然后提交该事务(这将分离与该事务的EntityManager关联的所有实体),或者显式调用entityManager.detach。我们这样做是为了能够正确执行重试和跳过等功能。
1.虽然你没有公布处理器中的所有代码,但我的直觉是,在//do some stuff部分,你的项目被重新附加,这就是为什么会发生更新。然而,没有看到代码,我不能肯定。
1.在任何一种情况下,都应该使用显式的ItemWriter,事实上,我认为在使用java config时不需要ItemWriter是一个bug(我们对XML是这样做的)。
1.对于丢失记录的特定问题,您需要记住,任何*PagingItemReader都不使用游标。它们都对每一页数据执行独立的查询。因此,如果您更新每一页之间的底层数据,它可能会对将来页中返回的项产生影响。例如,如果我的寻呼查询指定where val1 > 4并且我具有val1从1到5的记录,在块2中,该项现在符合条件,因此可以返回。如果需要更新where子句中的值(从而影响将要处理的数据集的内容),最好添加某种类型的processed标志,您可以通过它来查询。

oknwwptz

oknwwptz4#

我也遇到过基于pageSize跳过行的问题。例如,如果我将pageSize设置为2,它将读取2,忽略2,读取2,忽略2等。
我正在构建一个守护进程处理器来轮询“请求”数据库表中处于“等待处理”状态的记录。该守护进程被设计为永远在后台运行。
我有一个在@NamedQuery中定义的“status”字段,它将选择状态为“10”:等待处理的记录。在处理记录后,状态字段将更新为“20”:错误或“30”:成功。这原来是问题的原因-我正在更新查询中定义的字段。如果我引入一个“processedField”并更新它而不是“status”字段,那么就没有问题-所有记录都将被读取。
作为更新状态字段的一种可能的解决方案,我将MaxItemCount设置为与PageSize相同;这在步骤完成之前正确地更新了记录。2然后我继续执行这个步骤直到请求停止守护进程。3好的,可能不是最有效的方法(但我仍然受益于JPA提供的易用性),但我认为使用JdbcPagingItemReader可能会更好(如上所述-谢谢!)。欢迎对这个批处理数据库轮询问题的最佳方法发表意见:)

相关问题