我每天都会收到很多别人发来的gzip文件(*.gz),在放到hdfs中分析之前,我需要检查所有文件的完整性(损坏的文件会被删除),如果我在本地机器上用gzip-t文件名检查的话,是可以的,但是整个过程太慢了,因为文件量很大,而且大多数文件都足够大,使得本地验证成为一项耗时的工作。
因此,我转而使用hadoop作业来执行并行验证,每个文件都将在Map器中进行验证,并将损坏的文件路径输出到一个文件中,以下是我的代码:
在hadoop作业设置中:
Job job = new Job(getConf());
job.setJarByClass(HdfsFileValidateJob.class);
job.setMapperClass(HdfsFileValidateMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
job.setNumReduceTasks(0);
job.setInputFormatClass(JustBytesInputFormat.class);
在mapper中:
public class HdfsFileValidateMapper extends Mapper<JustBytesWritable, NullWritable, Text, NullWritable> {
private static final Logger LOG = LoggerFactory.getLogger(HdfsFileValidateJob.class);
private ByteArrayOutputStream bos;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
/* specify a split size(=HDFS block size here) for the ByteArrayOutputStream, which prevents frequently allocating
* memory for it when writing data in [map] method */
InputSplit inputSplit = context.getInputSplit();
bos = new ByteArrayOutputStream((int) ((FileSplit) inputSplit).getLength());
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
InputSplit inputSplit = context.getInputSplit();
String filePath = ((FileSplit) inputSplit).getPath().toUri().getPath(); // e.g. "/user/hadoop/abc.txt"
bos.flush();
byte[] mergedArray = bos.toByteArray(); // the byte array which stores the data of the whole file
if (!testUnGZip(mergedArray)) { // broken file
context.write(new Text(filePath), NullWritable.get());
}
bos.close();
}
@Override
public void map(JustBytesWritable key, NullWritable value, Context context) throws IOException, InterruptedException {
bos.write(key.getBytes());
}
/**
* Test whether we can un-gzip a piece of data.
*
* @param data The data to be un-gzipped.
* @return true for successfully un-gzipped the data, false otherwise.
*/
private static boolean testUnGZip(byte[] data) {
int numBytes2Read = 0;
ByteArrayInputStream bis = null;
GZIPInputStream gzip = null;
try {
bis = new ByteArrayInputStream(data);
gzip = new GZIPInputStream(bis);
byte[] buf = new byte[1024];
int num;
while ((num = gzip.read(buf, 0, buf.length)) != -1) {
numBytes2Read += num;
if (numBytes2Read % (1024 * 1024) == 0) {
LOG.info(String.format("Number of bytes read: %d", numBytes2Read));
}
}
} catch (Exception e) {
return false;
} finally {
if (gzip != null) {
try {
gzip.close();
} catch (IOException e) {
LOG.error("Error while closing GZIPInputStream");
}
}
if (bis != null) {
try {
bis.close();
} catch (IOException e) {
LOG.error("Error while closing ByteArrayInputStream");
}
}
}
return true;
}
}
其中我使用了两个名为justbytesinputformat和justbyteswritable的类,可以在这里找到:https://issues.apache.org/jira/secure/attachment/12570327/justbytes.jar
通常情况下,这个解决方案可以很好地工作,但是当单个gzip文件足够大(例如1.5g)时,hadoop作业将由于java堆空间问题而失败,原因很明显:对于每个文件,我首先将所有数据收集到内存缓冲区中,最后进行一次性验证,所以文件大小不能太大。
所以我修改了部分代码:
private boolean testUnGzipFail = false;
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
InputSplit inputSplit = context.getInputSplit();
String filePath = ((FileSplit) inputSplit).getPath().toUri().getPath(); // e.g. "/user/hadoop/abc.txt"
if (testUnGzipFail) { // broken file
context.write(new Text(filePath), NullWritable.get());
}
}
@Override
public void map(JustBytesWritable key, NullWritable value, Context context) throws IOException, InterruptedException {
if (!testUnGZip(key.getBytes())) {
testUnGzipFail = true;
}
}
这个版本修复了hadoop作业失败的问题,但是它根本不能正常工作!在我的e2e测试中,一个非常好的gzip文件(大小:1.5g)将被视为损坏的文件!
所以我的问题是:如何正确地进行验证,并避免将单个文件的所有内容读入内存的问题?
如有任何意见,我们将不胜感激,提前谢谢。
1条答案
按热度按时间tkclm6bt1#
我的第一个解决方案是
gzip -t
并行;gzip
可能比java更快,当文件很大时,创建进程的额外开销应该可以忽略不计。你的解决方法很慢。首先,当每个文件只需要几kb时,就可以将大量的千兆字节数据加载到ram中。而不是
JustBytesInputFormat
,您应该流式传输数据。想办法通过考试InputStream
至testUnGZip()
而不是整个文件内容。如果文件在硬盘上是一个真实的文件,尝试使用nioapi读取;这将允许内存Map文件,从而使读取速度更快。