我在做一个关于 Spring 的poc SkipListener
以及 ItemWriteListener
因为我的生意需要它,我只需要这样。 Spring 批次 ItemWriteListener
以及 SkipListener
在我看来,这是行不通的。
这是我开发的一些poc代码,但是 @BeforeWrite
不会将任何内容保存到数据库中。
myskiplistener.java文件
public class MySkipListener implements SkipListener<Integer, Integer> {
@Override
public void onSkipInRead(Throwable t) {
System.out.println("@@@MySkipListener| On Skip in Read Error : " + t.getMessage());
}
@Override
public void onSkipInWrite(Integer item, Throwable t) {
System.out.println("@@@MySkipListener | Skipped in write due to : " + t.getMessage());
}
@Override
public void onSkipInProcess(Integer item, Throwable t) {
System.out.println("@@@MySkipListener | Skipped in process due to: " + t.getMessage());
}
}
mysteplistener.java文件
public class MyStepListener implements StepExecutionListener {
@Override
public void beforeStep(StepExecution stepExecution) {
System.out.println("beforeStep");
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
System.out.println("afterStep");
return ExitStatus.COMPLETED;
}
}
myitemwritelistener.java文件
public class MyItemWriteListener {
@Autowired
private JdbcTemplate jdbcTemplate;
@BeforeWrite
@Transactional(propagation = Propagation.REQUIRES_NEW)
public void beforeWrite(List<? extends Integer> items) {
System.out.println("########### ItemWriteListener | beforeWrite " + items);
// MUST FOR ME to save data into DB here !!!
jdbcTemplate.update("INSERT INTO `test`.`mytest`(`name`) VALUES( 'A')");
}
}
我的工作.java
import java.util.Arrays;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MyJob {
@Autowired
private JobBuilderFactory jobs;
@Autowired
private StepBuilderFactory steps;
@Bean
public ItemReader<Integer> itemReader() {
return new ListItemReader<>(Arrays.asList(1, 2, 3));
}
@Bean
public ItemWriter<Integer> itemWriter() {
return items -> {
for (Integer item : items) {
if (item.equals(1)) {
throw new Exception("No 1 here!");
}
System.out.println("item = " + item);
}
};
}
@Bean
public Step step() {
return steps.get("step")
.<Integer, Integer>chunk(5)
.reader(itemReader())
.writer(itemWriter())
.faultTolerant()
.skip(Exception.class)
.skipLimit(10)
.listener(mySkipListener())
.listener(myStepListener())
.listener(myItemWriteListener())
.build();
}
@Bean
public Job job() {
return jobs.get("job")
.start(step())
.build();
}
@Bean
public MySkipListener mySkipListener() {
return new MySkipListener();
}
@Bean
public MyStepListener myStepListener() {
return new MyStepListener();
}
@Bean
public MyItemWriteListener myItemWriteListener() {
return new MyItemWriteListener();
}
}
2条答案
按热度按时间zqry0prt1#
因为当你使用
@BeforeWrite
为了声明侦听器方法,它将在 Package 此方法的场景后面创建一个springaop代理,但此代理不是springbean,因此它不知道如何对@Transactional
因此它没有效果。试着用
ItemWriteListener
:另一方面,我不会为需要批处理的项(即对象读取)控制与事务相关的内容
ItemReader
)因为我自己spring-batch
已经帮助他们管理事务。每当处理一个新的项目块时,它就已经开始了一个新的事务。当它重试或跳过一个失败项时,自己管理它会把与回滚相关的事情搞砸。lh80um4z2#
我开发了poc代码,但是@beforewrite没有将任何内容保存到db中。有什么快速的帮助吗?
原因在itemwritelistener的javadoc中有解释:
来自的所有方法
ItemWriterListener
以及SkipListener
在spring批处理驱动的事务范围内执行。ChunkListener
我想这就是你要找的。ChunkListener#beforeChunk
在事务内部调用ChunkListener#afterChunk
以及ChunkListener#afterChunkError
在事务外部调用。这是为了在需要时允许在自己的事务中进行补偿操作。在这种情况下,由您来管理这个单独事务的生命周期。以下是javadoc的摘录: