// Key: Text (path of the file in the output zip)
// Value: BytesWritable - binary content of the image to save
public class ZipFileOutputFormat extends FileOutputFormat<Text, BytesWritable> {
@Override
public RecordWriter<Text, BytesWritable> getRecordWriter(
TaskAttemptContext job) throws IOException, InterruptedException {
Path file = getDefaultWorkFile(job, ".zip");
FileSystem fs = file.getFileSystem(job.getConfiguration());
return new ZipRecordWriter(fs.create(file, false));
}
public static class ZipRecordWriter extends
RecordWriter<Text, BytesWritable> {
protected ZipOutputStream zos;
public ZipRecordWriter(FSDataOutputStream os) {
zos = new ZipOutputStream(os);
}
@Override
public void write(Text key, BytesWritable value) throws IOException,
InterruptedException {
// TODO: create new ZipEntry & add to the ZipOutputStream (zos)
}
@Override
public void close(TaskAttemptContext context) throws IOException,
InterruptedException {
zos.close();
}
}
}
1条答案
按热度按时间ogsagwnx1#
如果你觉得合适(或者通过google你可以找到一个实现),你可以写一个fileoutputformat,它用一个zipoutputstream Package fsdataoutputstream,为每个reducer提供一个zip文件(这样就省去了编写seq文件提取程序的工作量)。
不要害怕编写自己的outputformat,其实并不难(而且比编写自定义的inputformat容易得多,因为自定义的inputformat需要担心拆分)。实际上,这里有一个起点-您只需要实现write方法: