我用的是春批(3.0.1.RELEASE)/ JPA和一个HSQLBD服务器数据库。我需要浏览整个表(使用分页)和更新项目(一个接一个)。所以我使用了一个jpaPagingItemReader。但是当我运行作业时,我可以看到一些行被跳过,跳过的行数等于页面大小。例如,如果我的表有12行,并且jpaPagingItemReader.pagesize = 3,则作业将读取:第1、2、3行,然后是第7、8、9行(跳过第4、5、6行)......您能告诉我代码/配置中有什么问题吗?或者可能是HSQLDB分页的问题?下面是我的代码:
[编辑]:问题出在对POJO实体执行修改的ItemProcessor上。由于JPAPagingItemReader在每次阅读之间进行了刷新,因此实体被更新((这是我想要的)。但似乎游标分页也增加了(如日志中所示:行ID 4、5和6已被跳过)。我如何管理此问题?
@Configuration
@EnableBatchProcessing(modular=true)
public class AppBatchConfig {
@Inject
private InfrastructureConfiguration infrastructureConfiguration;
@Inject private JobBuilderFactory jobs;
@Inject private StepBuilderFactory steps;
@Bean public Job job() {
return jobs.get("Myjob1").start(step1()).build();
}
@Bean public Step step1() {
return steps.get("step1")
.<SNUserPerCampaign, SNUserPerCampaign> chunk(0)
.reader(reader()).processor(processor()).build();
}
@Bean(destroyMethod = "")
@JobScope
public ItemStreamReader<SNUserPerCampaign> reader() String trigramme) {
JpaPagingItemReader reader = new JpaPagingItemReader();
reader.setEntityManagerFactory(infrastructureConfiguration.getEntityManagerFactory());
reader.setQueryString("select t from SNUserPerCampaign t where t.isactive=true");
reader.setPageSize(3));
return reader;
}
@Bean @JobScope
public ItemProcessor<SNUserPerCampaign, SNUserPerCampaign> processor() {
return new MyItemProcessor();
}
}
@Configuration
@EnableBatchProcessing
public class StandaloneInfrastructureConfiguration implements InfrastructureConfiguration {
@Inject private EntityManagerFactory emf;
@Override
public EntityManagerFactory getEntityManagerFactory() {
return emf;
}
}
从我的项目处理器:
@Override
public SNUserPerCampaign process(SNUserPerCampaign item) throws Exception {
//do some stuff …
//then if (condition) update the Entity pojo :
item.setModificationDate(new Timestamp(System.currentTimeMillis());
item.setIsactive = false;
}
来自Spring xml配置文件:
<tx:annotation-driven transaction-manager="transactionManager" />
<bean id="transactionManager" class="org.springframework.orm.jpa.JpaTransactionManager">
<property name="entityManagerFactory" ref="entityManagerFactory" />
</bean>
<bean id="entityManagerFactory" class="org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean">
<property name="dataSource" ref="dataSource" />
</bean>
<bean id="dataSource" class="org.springframework.jdbc.datasource.DriverManagerDataSource">
<property name="driverClassName" value="org.hsqldb.jdbcDriver" />
<property name="url" value="jdbc:hsqldb:hsql://localhost:9001/MYAppDB" />
<property name="username" value="sa" />
<property name="password" value="" />
</bean>
跟踪/日志总结:
11:16:05.728 TRACE MyItemProcessor - item processed: snUserInternalId=1]
11:16:06.038 TRACE MyItemProcessor - item processed: snUserInternalId=2]
11:16:06.350 TRACE MyItemProcessor - item processed: snUserInternalId=3]
11:16:06.674 DEBUG SQL- update SNUSER_CAMPAIGN set ...etc...
11:16:06.677 DEBUG SQL- update SNUSER_CAMPAIGN set ...etc...
11:16:06.679 DEBUG SQL- update SNUSER_CAMPAIGN set ...etc...
11:16:06.681 DEBUG SQL- select ...etc... from SNUSER_CAMPAIGN snuserperc0_
11:16:06.687 TRACE MyItemProcessor - item processed: snUserInternalId=7]
11:16:06.998 TRACE MyItemProcessor - item processed: snUserInternalId=8]
11:16:07.314 TRACE MyItemProcessor - item processed: snUserInternalId=9]
4条答案
按热度按时间quhf5bfb1#
org.springframework.batch.item.database.JpaPagingItemReader
创建自己的entityManager
示例(from
org.springframework.batch.item.database.JpaPagingItemReader#doOpen
):如果您在事务中,就像看上去的那样,读取器实体不会分离(从org. springframework. batch. item. database. JpaPagingItemReader #doReadPage):
因此,当您将一个项目更新到processor或writer中时,该项目仍然由reader的entityManager管理。
当项读取器读取下一个数据块时,它将上下文刷新到数据库。
因此,如果我们看一下您的案例,在处理完第一个数据块之后,我们在数据库中有:
jpaPagingItemReader使用limit & offset来检索分页数据,因此阅读器创建的下一个select如下所示:
读取器将丢失ID为4、5、6的项,因为它们现在是数据库检索的第一行。
作为一种变通方法,您可以使用jdbc实现(org. springframework. batch. item. database. JdbcPagingItemReader),因为它不使用限制和偏移量。它基于排序列(通常是id列),因此您不会丢失任何数据。当然,您必须将数据更新到写入器中(使用JPA或纯JDBC实现)
阅读器将更加冗长:
lvjbypge2#
我也遇到过同样的情况,我的阅读器是一个JpaPagingItemReader,它查询在编写器中更新的字段,因此跳过了需要更新的一半项,这是由于页面窗口在前进,而已经读取的项不再处于阅读器范围内。
对我来说,最简单的解决方法是重写JpaPagingItemReader上的getPage方法,使其始终返回第一页。
1szpjjfi3#
需要注意的几件事:
1.从
JpaPagingItemReader
返回的所有实体都将被分离。我们可以通过以下两种方式之一来实现这一点。我们可以在查询页面之前创建一个事务,然后提交该事务(这将分离与该事务的EntityManager
关联的所有实体),或者显式调用entityManager.detach
。我们这样做是为了能够正确执行重试和跳过等功能。1.虽然你没有公布处理器中的所有代码,但我的直觉是,在
//do some stuff
部分,你的项目被重新附加,这就是为什么会发生更新。然而,没有看到代码,我不能肯定。1.在任何一种情况下,都应该使用显式的
ItemWriter
,事实上,我认为在使用java config时不需要ItemWriter
是一个bug(我们对XML是这样做的)。1.对于丢失记录的特定问题,您需要记住,任何
*PagingItemReader
都不使用游标。它们都对每一页数据执行独立的查询。因此,如果您更新每一页之间的底层数据,它可能会对将来页中返回的项产生影响。例如,如果我的寻呼查询指定where val1 > 4
并且我具有val1从1到5的记录,在块2中,该项现在符合条件,因此可以返回。如果需要更新where子句中的值(从而影响将要处理的数据集的内容),最好添加某种类型的processed标志,您可以通过它来查询。oknwwptz4#
我也遇到过基于pageSize跳过行的问题。例如,如果我将pageSize设置为2,它将读取2,忽略2,读取2,忽略2等。
我正在构建一个守护进程处理器来轮询“请求”数据库表中处于“等待处理”状态的记录。该守护进程被设计为永远在后台运行。
我有一个在@NamedQuery中定义的“status”字段,它将选择状态为“10”:等待处理的记录。在处理记录后,状态字段将更新为“20”:错误或“30”:成功。这原来是问题的原因-我正在更新查询中定义的字段。如果我引入一个“processedField”并更新它而不是“status”字段,那么就没有问题-所有记录都将被读取。
作为更新状态字段的一种可能的解决方案,我将MaxItemCount设置为与PageSize相同;这在步骤完成之前正确地更新了记录。2然后我继续执行这个步骤直到请求停止守护进程。3好的,可能不是最有效的方法(但我仍然受益于JPA提供的易用性),但我认为使用JdbcPagingItemReader可能会更好(如上所述-谢谢!)。欢迎对这个批处理数据库轮询问题的最佳方法发表意见:)