java 如何从S3处理一个大文件并将其用于Spring批处理

8wtpewkr  于 2023-02-02  发布在  Java
关注(0)|答案(1)|浏览(156)

我的CSV文件包含数百万条记录,大小将在2GB左右。我的用例是从S3读取CSV文件并处理它。请在下面找到我的代码:
在下面的代码中,我从S3存储桶阅读一个文件,并直接在Spring批处理FlatFileItemReaderreader.setResource(new InputStreamResource(inputStream));中使用inputStream
根据这个实现,我在内存中保存2GB的内容并处理它,这不是一种有效的方法-有人能建议从S3桶中阅读一个大文件并在Spring批处理中处理它的有效方法吗?
事先感谢你的帮助!谢谢。

@Component
public class GetFileFromS3 {

    public S3ObjectInputStream dowloadFile(String keyName, String bucketName, String region) {
        try {
            AmazonS3 s3Client = AmazonS3ClientBuilder.standard().withClientConfiguration(new ClientConfiguration())
                    .withRegion(region).build();

            S3Object s3object = s3Client.getObject(bucketName, keyName);
            return s3object.getObjectContent();
        } catch (AmazonServiceException e) {
            e.printStackTrace();
        }
        return null;
    }

}



public class SpringBatch {

    @Autowired
    private GetFileFromS3 getFileFromS3;

 @Bean(name = "csvFile")
    public Step step1() {
        return stepBuilderFactory.get("step1").<Employee, Employee>chunk(10)
                .reader(reader())
                .processor(processor())
                .writer(writer())
                .build();
    }

    @Bean
    public FlatFileItemReader<Employee> reader() {
        S3ObjectInputStream inputStream = getFileFromS3.dowloadFile("employee.csv", "testBucket", "us-east-1");
        FlatFileItemReader<Employee> reader = new FlatFileItemReader<Employee>();
        reader.setResource(new InputStreamResource(inputStream));
        reader.setLinesToSkip(1);
        reader.setLineMapper(new DefaultLineMapper() {
            {
                setLineTokenizer(new DelimitedLineTokenizer() {
                    {
                        setNames(Employee.fields());
                    }
                });
                setFieldSetMapper(new BeanWrapperFieldSetMapper<Employee>() {
                    {
                        setTargetType(Employee.class);
                    }
                });
            }
        });
        return reader;
    }

    @Bean
    public ItemProcessor<Employee, Employee> processor() {
        return new ItemProcessor();
    }

    @Bean
    public ItemWriter<Employee> writer() {
        return new ItemWriter<Event>();
    }

    }
lx0bsm1f

lx0bsm1f1#

利用ResourceLoader,我们可以像读取其他资源一样读取ItemReader中S3中的文件,这将有助于分块读取S3中的文件,而不是将整个文件加载到内存中。
通过为ResourceLoaderAmazonS3 client注入依赖项,已更改读卡器配置,如下所示:
根据需要替换sourceBucketsourceObjectPrefix的值。

@Autowired
private ResourceLoader resourceLoader;

@Autowired
private AmazonS3 amazonS3Client;

// READER
@Bean(destroyMethod="")
@StepScope
public SynchronizedItemStreamReader<Employee> employeeDataReader() {
    SynchronizedItemStreamReader synchronizedItemStreamReader = new SynchronizedItemStreamReader();
    List<Resource> resourceList = new ArrayList<>();
    String sourceBucket = yourBucketName;
    String sourceObjectPrefix = yourSourceObjectPrefix;
    log.info("sourceObjectPrefix::"+sourceObjectPrefix);
    ListObjectsRequest listObjectsRequest = new ListObjectsRequest()
            .withBucketName(sourceBucket)
            .withPrefix(sourceObjectPrefix);
    ObjectListing sourceObjectsListing;
    do{
        sourceObjectsListing = amazonS3Client.listObjects(listObjectsRequest);
        for (S3ObjectSummary sourceFile : sourceObjectsListing.getObjectSummaries()){

            if(!(sourceFile.getSize() > 0)
                    || (!sourceFile.getKey().endsWith(DOT.concat("csv")))
            ){
                // Skip if file is empty (or) file extension is not "csv"
                continue;
            }
            log.info("Reading "+sourceFile.getKey());
            resourceList.add(resourceLoader.getResource("s3://".concat(sourceBucket).concat("/")
                    .concat(sourceFile.getKey())));
        }
        listObjectsRequest.setMarker(sourceObjectsListing.getNextMarker());
    }while(sourceObjectsListing.isTruncated());

    Resource[] resources = resourceList.toArray(new Resource[resourceList.size()]);
    MultiResourceItemReader<Employee> multiResourceItemReader = new MultiResourceItemReader<>();
    multiResourceItemReader.setName("employee-multiResource-Reader");
    multiResourceItemReader.setResources(resources);
    multiResourceItemReader.setDelegate(employeeFileItemReader());
    synchronizedItemStreamReader.setDelegate(multiResourceItemReader);
    return synchronizedItemStreamReader;
}

@Bean
@StepScope
public FlatFileItemReader<Employee> employeeFileItemReader()
{
    FlatFileItemReader<Employee> reader = new FlatFileItemReader<Employee>();
    reader.setLinesToSkip(1);
    reader.setLineMapper(new DefaultLineMapper() {
        {
            setLineTokenizer(new DelimitedLineTokenizer() {
                {
                    setNames(Employee.fields());
                }
            });
            setFieldSetMapper(new BeanWrapperFieldSetMapper<Employee>() {
                {
                    setTargetType(Employee.class);
                }
            });
        }
    });
    return reader;
}

已经使用了MultiResourceItemReader作为例子。即使在您正在寻找的特定S3路径中有多个CSV文件,这也可以工作。
如果在一个位置只处理一个CSV文件,则Resources[] resources包含一个条目时也可以隐式工作。

相关问题