Spring Batch如何在发送到ItemWriter之前过滤重复的项目

vaj7vani  于 2022-11-21  发布在  Spring
关注(0)|答案(3)|浏览(165)

我读取了一个平面文件(例如每个用户一行的.csv文件,例如:* 用户ID;数据1;日期2 *)。
但是如何处理阅读器中的重复用户项(其中没有previus readed用户的列表...)

stepBuilderFactory.get("createUserStep1")
.<User, User>chunk(1000)
.reader(flatFileItemReader) // FlatFileItemReader
.writer(itemWriter) // For example JDBC Writer
.build();
rggaifut

rggaifut1#

过滤通常是用ItemProcessor完成的。如果ItemProcessor返回null,则项目被过滤,不传递给ItemWriter。否则,就传递给ItemWriter。在您的例子中,可以在ItemProcessor中保存一个以前见过的用户列表。如果用户以前没有见过,则传递它。如果以前见过,return null。您可以在以下文档中阅读有关使用ItemProcessor进行过滤的更多信息:https://docs.spring.io/spring-batch/docs/current/reference/html/processor.html#filteringRecords

/**
* This implementation assumes that there is enough room in memory to store the duplicate
* Users.  Otherwise, you'd want to store them somewhere you can do a look-up on.
*/
public class UserFilterItemProcessor implements ItemProcessor<User, User> {

    // This assumes that User.equals() identifies the duplicates
    private Set<User> seenUsers = new HashSet<User>();

    public User process(User user) {
        if(seenUsers.contains(user)) {
            return null;
        }
        seenUsers.add(user);
        return user;
        
    }
}
b09cbbtk

b09cbbtk2#

如您所见http://docs.spring.io/spring-batch/trunk/reference/html/readersAndWriters.html#faultTolerant
回滚区块时,可能会重新处理在阅读过程中缓存的项。如果将步骤配置为容错(通常使用跳过或重试处理),则所使用的任何ItemProcessor都应以幂等的方式实现
这意味着,在Michael的示例中,第一次 * 处理 * 用户时,该用户缓存在集合中,如果 * 写入 * 项目失败,如果步骤具有容错性,则将对同一用户再次执行 Processor,并且此 Filter 将过滤掉该用户。
改进的代码:

/**
 * This implementation assumes that there is enough room in memory to store the duplicate
 * Users.  Otherwise, you'd want to store them somewhere you can do a look-up on.
 */
public class UserFilterItemProcessor implements ItemProcessor<User, User> {

    // This assumes that User.equals() identifies the duplicates
    private Set<User> seenUsers = new HashSet<User>();

    public User process(User user) {
        if(seenUsers.contains(user) && !user.hasBeenProcessed()) {
            return null;
        } else {
            seenUsers.add(user);
            user.setProcessed(true);
            return user;
        }
    }
}
p4tfgftt

p4tfgftt3#

可以覆盖Userequals()hashcode()方法,然后删除“包含”条件。

相关问题